Merge pull request #805 from lavalamp/serverWatch

Improve watch
pull/6/head
Clayton Coleman 2014-08-08 17:30:28 -04:00
commit 71c6e082d4
21 changed files with 474 additions and 142 deletions

View File

@ -247,7 +247,7 @@ func executeAPIRequest(method string, s *kube_client.Client) bool {
r := s.Verb(verb).
Path(path).
ParseSelector(*selector)
ParseSelectorParam("labels", *selector)
if setBody {
if version != 0 {
data := readConfig(storage)

View File

@ -64,11 +64,11 @@ type SimpleRESTStorage struct {
updated *Simple
created *Simple
// Valid if WatchAll or WatchSingle is called
fakeWatch *watch.FakeWatcher
// Set if WatchSingle is called
requestedID string
// These are set when Watch is called
fakeWatch *watch.FakeWatcher
requestedLabelSelector labels.Selector
requestedFieldSelector labels.Selector
requestedResourceVersion uint64
// If non-nil, called inside the WorkFunc when answering update, delete, create.
// obj receives the original input to the update, delete, or create call.
@ -95,7 +95,7 @@ func (storage *SimpleRESTStorage) Delete(id string) (<-chan interface{}, error)
if storage.injectedFunction != nil {
return storage.injectedFunction(id)
}
return api.Status{Status: api.StatusSuccess}, nil
return &api.Status{Status: api.StatusSuccess}, nil
}), nil
}
@ -130,18 +130,11 @@ func (storage *SimpleRESTStorage) Update(obj interface{}) (<-chan interface{}, e
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchAll() (watch.Interface, error) {
if err := storage.errors["watchAll"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
return storage.fakeWatch, nil
}
// Implement ResourceWatcher.
func (storage *SimpleRESTStorage) WatchSingle(id string) (watch.Interface, error) {
storage.requestedID = id
if err := storage.errors["watchSingle"]; err != nil {
func (storage *SimpleRESTStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
storage.requestedLabelSelector = label
storage.requestedFieldSelector = field
storage.requestedResourceVersion = resourceVersion
if err := storage.errors["watch"]; err != nil {
return nil, err
}
storage.fakeWatch = watch.NewFake()
@ -164,17 +157,17 @@ func TestNotFound(t *testing.T) {
Path string
}
cases := map[string]T{
"PATCH method": T{"PATCH", "/prefix/version/foo"},
"GET long prefix": T{"GET", "/prefix/"},
"GET missing storage": T{"GET", "/prefix/version/blah"},
"GET with extra segment": T{"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": T{"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": T{"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": T{"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": T{"PUT", "/prefix/version/foo"},
"PUT with extra segment": T{"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": T{"GET", "/prefix/version/watch/"},
"watch with bad method": T{"POST", "/prefix/version/watch/foo/bar"},
"PATCH method": {"PATCH", "/prefix/version/foo"},
"GET long prefix": {"GET", "/prefix/"},
"GET missing storage": {"GET", "/prefix/version/blah"},
"GET with extra segment": {"GET", "/prefix/version/foo/bar/baz"},
"POST with extra segment": {"POST", "/prefix/version/foo/bar"},
"DELETE without extra segment": {"DELETE", "/prefix/version/foo"},
"DELETE with extra segment": {"DELETE", "/prefix/version/foo/bar/baz"},
"PUT without extra segment": {"PUT", "/prefix/version/foo"},
"PUT with extra segment": {"PUT", "/prefix/version/foo/bar/baz"},
"watch missing storage": {"GET", "/prefix/version/watch/"},
"watch with bad method": {"POST", "/prefix/version/watch/foo/bar"},
}
handler := New(map[string]RESTStorage{
"foo": &SimpleRESTStorage{},

View File

@ -29,14 +29,17 @@ type RESTStorage interface {
New() interface{}
// List selects resources in the storage which match to the selector.
// TODO: add field selector in addition to label selector.
List(labels.Selector) (interface{}, error)
// Get finds a resource in the storage by id and returns it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Get(id string) (interface{}, error)
// Delete finds a resource in the storage and deletes it.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the returned error value err when the specified resource is not found.
// Although it can return an arbitrary error value, IsNotFound(err) is true for the
// returned error value err when the specified resource is not found.
Delete(id string) (<-chan interface{}, error)
Create(interface{}) (<-chan interface{}, error)
@ -46,7 +49,9 @@ type RESTStorage interface {
// ResourceWatcher should be implemented by all RESTStorage objects that
// want to offer the ability to watch for changes through the watch api.
type ResourceWatcher interface {
// TODO: take a query, like List, to filter out unwanted events.
WatchAll() (watch.Interface, error)
WatchSingle(id string) (watch.Interface, error)
// 'label' selects on labels; 'field' selects on the object's fields. Not all fields
// are supported; an error should be returned if 'field' tries to select on a field that
// isn't supported. 'resourceVersion' allows for continuing/starting a watch at a
// particular version.
Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
}

View File

@ -19,10 +19,13 @@ package apiserver
import (
"encoding/json"
"net/http"
"net/url"
"strconv"
"code.google.com/p/go.net/websocket"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
@ -30,6 +33,23 @@ type WatchHandler struct {
storage map[string]RESTStorage
}
func getWatchParams(query url.Values) (label, field labels.Selector, resourceVersion uint64) {
if s, err := labels.ParseSelector(query.Get("labels")); err != nil {
label = labels.Everything()
} else {
label = s
}
if s, err := labels.ParseSelector(query.Get("fields")); err != nil {
field = labels.Everything()
} else {
field = s
}
if rv, err := strconv.ParseUint(query.Get("resourceVersion"), 10, 64); err == nil {
resourceVersion = rv
}
return label, field, resourceVersion
}
// handleWatch processes a watch request
func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
parts := splitPath(req.URL.Path)
@ -41,13 +61,8 @@ func (h *WatchHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
notFound(w, req)
}
if watcher, ok := storage.(ResourceWatcher); ok {
var watching watch.Interface
var err error
if id := req.URL.Query().Get("id"); id != "" {
watching, err = watcher.WatchSingle(id)
} else {
watching, err = watcher.WatchAll()
}
label, field, resourceVersion := getWatchParams(req.URL.Query())
watching, err := watcher.Watch(label, field, resourceVersion)
if err != nil {
internalError(err, w)
return

View File

@ -40,6 +40,7 @@ var watchTestTable = []struct {
func TestWatchWebsocket(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
_ = ResourceWatcher(simpleStorage) // Give compile error if this doesn't work.
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix/version")
@ -48,17 +49,13 @@ func TestWatchWebsocket(t *testing.T) {
dest, _ := url.Parse(server.URL)
dest.Scheme = "ws" // Required by websocket, though the server never sees it.
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
dest.RawQuery = ""
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
try := func(action watch.EventType, object interface{}) {
// Send
simpleStorage.fakeWatch.Action(action, object)
@ -98,7 +95,7 @@ func TestWatchHTTP(t *testing.T) {
dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
dest.RawQuery = "id=myID"
dest.RawQuery = ""
request, err := http.NewRequest("GET", dest.String(), nil)
if err != nil {
@ -114,10 +111,6 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected response %#v", response)
}
if a, e := simpleStorage.requestedID, "myID"; a != e {
t.Fatalf("Expected %v, got %v", e, a)
}
decoder := json.NewDecoder(response.Body)
try := func(action watch.EventType, object interface{}) {
@ -148,3 +141,65 @@ func TestWatchHTTP(t *testing.T) {
t.Errorf("Unexpected non-error")
}
}
func TestWatchParamParsing(t *testing.T) {
simpleStorage := &SimpleRESTStorage{}
handler := New(map[string]RESTStorage{
"foo": simpleStorage,
}, codec, "/prefix/version")
server := httptest.NewServer(handler)
dest, _ := url.Parse(server.URL)
dest.Path = "/prefix/version/watch/foo"
table := []struct {
rawQuery string
resourceVersion uint64
labelSelector string
fieldSelector string
}{
{
rawQuery: "resourceVersion=1234",
resourceVersion: 1234,
labelSelector: "",
fieldSelector: "",
}, {
rawQuery: "resourceVersion=314159&fields=Host%3D&labels=name%3Dfoo",
resourceVersion: 314159,
labelSelector: "name=foo",
fieldSelector: "Host=",
}, {
rawQuery: "fields=ID%3dfoo&resourceVersion=1492",
resourceVersion: 1492,
labelSelector: "",
fieldSelector: "ID=foo",
}, {
rawQuery: "",
resourceVersion: 0,
labelSelector: "",
fieldSelector: "",
},
}
for _, item := range table {
simpleStorage.requestedLabelSelector = nil
simpleStorage.requestedFieldSelector = nil
simpleStorage.requestedResourceVersion = 5 // Prove this is set in all cases
dest.RawQuery = item.rawQuery
resp, err := http.Get(dest.String())
if err != nil {
t.Errorf("%v: unexpected error: %v", item.rawQuery, err)
continue
}
resp.Body.Close()
if e, a := item.resourceVersion, simpleStorage.requestedResourceVersion; e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.labelSelector, simpleStorage.requestedLabelSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
if e, a := item.fieldSelector, simpleStorage.requestedFieldSelector.String(); e != a {
t.Errorf("%v: expected %v, got %v", item.rawQuery, e, a)
}
}
}

View File

@ -28,11 +28,14 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/version"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Interface holds the methods for clients of Kubenetes,
// an interface to allow mock testing.
// TODO: split this up by resource?
// TODO: these should return/take pointers.
type Interface interface {
ListPods(selector labels.Selector) (api.PodList, error)
GetPod(name string) (api.Pod, error)
@ -45,6 +48,7 @@ type Interface interface {
CreateReplicationController(api.ReplicationController) (api.ReplicationController, error)
UpdateReplicationController(api.ReplicationController) (api.ReplicationController, error)
DeleteReplicationController(string) error
WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
GetService(name string) (api.Service, error)
CreateService(api.Service) (api.Service, error)
@ -169,7 +173,7 @@ func (c *Client) makeURL(path string) string {
// ListPods takes a selector, and returns the list of pods that match that selector
func (c *Client) ListPods(selector labels.Selector) (result api.PodList, err error) {
err = c.Get().Path("pods").Selector(selector).Do().Into(&result)
err = c.Get().Path("pods").SelectorParam("labels", selector).Do().Into(&result)
return
}
@ -202,7 +206,7 @@ func (c *Client) UpdatePod(pod api.Pod) (result api.Pod, err error) {
// ListReplicationControllers takes a selector, and returns the list of replication controllers that match that selector
func (c *Client) ListReplicationControllers(selector labels.Selector) (result api.ReplicationControllerList, err error) {
err = c.Get().Path("replicationControllers").Selector(selector).Do().Into(&result)
err = c.Get().Path("replicationControllers").SelectorParam("labels", selector).Do().Into(&result)
return
}
@ -233,6 +237,17 @@ func (c *Client) DeleteReplicationController(name string) error {
return c.Delete().Path("replicationControllers").Path(name).Do().Error()
}
// WatchReplicationControllers returns a watch.Interface that watches the requested controllers.
func (c *Client) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return c.Get().
Path("watch").
Path("replicationControllers").
UintParam("resourceVersion", resourceVersion).
SelectorParam("labels", label).
SelectorParam("fields", field).
Watch()
}
// GetService returns information about a particular service.
func (c *Client) GetService(name string) (result api.Service, err error) {
err = c.Get().Path("services").Path(name).Do().Into(&result)

View File

@ -42,7 +42,7 @@ func TestListEmptyPods(t *testing.T) {
Request: testRequest{Method: "GET", Path: "/pods"},
Response: Response{StatusCode: 200, Body: api.PodList{}},
}
podList, err := c.Setup().ListPods(nil)
podList, err := c.Setup().ListPods(labels.Everything())
c.Validate(t, podList, err)
}
@ -65,7 +65,7 @@ func TestListPods(t *testing.T) {
},
},
}
receivedPodList, err := c.Setup().ListPods(nil)
receivedPodList, err := c.Setup().ListPods(labels.Everything())
c.Validate(t, receivedPodList, err)
}
@ -191,7 +191,7 @@ func TestListControllers(t *testing.T) {
},
},
}
receivedControllerList, err := c.Setup().ListReplicationControllers(nil)
receivedControllerList, err := c.Setup().ListReplicationControllers(labels.Everything())
c.Validate(t, receivedControllerList, err)
}

105
pkg/client/fake.go Normal file
View File

@ -0,0 +1,105 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// FakeClient implements Interface. Meant to be embedded into a struct to get a default
// implementation. This makes faking out just the method you want to test easier.
type FakeClient struct {
// FakeClient by default keeps a simple list of the methods that have been called.
Actions []string
}
func (client *FakeClient) ListPods(selector labels.Selector) (api.PodList, error) {
client.Actions = append(client.Actions, "list-pods")
return api.PodList{}, nil
}
func (client *FakeClient) GetPod(name string) (api.Pod, error) {
client.Actions = append(client.Actions, "get-pod")
return api.Pod{}, nil
}
func (client *FakeClient) DeletePod(name string) error {
client.Actions = append(client.Actions, "delete-pod")
return nil
}
func (client *FakeClient) CreatePod(pod api.Pod) (api.Pod, error) {
client.Actions = append(client.Actions, "create-pod")
return api.Pod{}, nil
}
func (client *FakeClient) UpdatePod(pod api.Pod) (api.Pod, error) {
client.Actions = append(client.Actions, "update-pod")
return api.Pod{}, nil
}
func (client *FakeClient) ListReplicationControllers(selector labels.Selector) (api.ReplicationControllerList, error) {
client.Actions = append(client.Actions, "list-controllers")
return api.ReplicationControllerList{}, nil
}
func (client *FakeClient) GetReplicationController(name string) (api.ReplicationController, error) {
client.Actions = append(client.Actions, "get-controller")
return api.ReplicationController{}, nil
}
func (client *FakeClient) CreateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) {
client.Actions = append(client.Actions, "create-controller")
return api.ReplicationController{}, nil
}
func (client *FakeClient) UpdateReplicationController(controller api.ReplicationController) (api.ReplicationController, error) {
client.Actions = append(client.Actions, "update-controller")
return api.ReplicationController{}, nil
}
func (client *FakeClient) DeleteReplicationController(controller string) error {
client.Actions = append(client.Actions, "delete-controller")
return nil
}
func (client *FakeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
client.Actions = append(client.Actions, "watch-controllers")
return watch.NewFake(), nil
}
func (client *FakeClient) GetService(name string) (api.Service, error) {
client.Actions = append(client.Actions, "get-controller")
return api.Service{}, nil
}
func (client *FakeClient) CreateService(controller api.Service) (api.Service, error) {
client.Actions = append(client.Actions, "create-service")
return api.Service{}, nil
}
func (client *FakeClient) UpdateService(controller api.Service) (api.Service, error) {
client.Actions = append(client.Actions, "update-service")
return api.Service{}, nil
}
func (client *FakeClient) DeleteService(controller string) error {
client.Actions = append(client.Actions, "delete-service")
return nil
}

37
pkg/client/fake_test.go Normal file
View File

@ -0,0 +1,37 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package client
import (
"testing"
)
// This test file just ensures that FakeClient and structs it is embedded in
// implement Interface.
func TestFakeImplementsInterface(t *testing.T) {
_ = Interface(&FakeClient{})
}
type MyFake struct {
*FakeClient
}
func TestEmbeddedFakeImplementsInterface(t *testing.T) {
_ = Interface(MyFake{&FakeClient{}})
_ = Interface(&MyFake{&FakeClient{}})
}

View File

@ -24,27 +24,34 @@ import (
"net/http"
"net/url"
"path"
"strconv"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
// Server contains info locating a kubernetes api server.
// Example usage:
// specialParams lists parameters that are handled specially and which users of Request
// are therefore not allowed to set manually.
var specialParams = util.NewStringSet("sync", "timeout")
// Verb begins a request with a verb (GET, POST, PUT, DELETE)
//
// Example usage of Client's request building interface:
// auth, err := LoadAuth(filename)
// c := New(url, auth)
// resp, err := c.Verb("GET").
// Path("pods").
// Selector("area=staging").
// SelectorParam("labels", "area=staging").
// Timeout(10*time.Second).
// Do()
// list, ok := resp.(api.PodList)
// Verb begins a request with a verb (GET, POST, PUT, DELETE)
// if err != nil { ... }
// list, ok := resp.(*api.PodList)
//
func (c *Client) Verb(verb string) *Request {
return &Request{
verb: verb,
@ -52,26 +59,27 @@ func (c *Client) Verb(verb string) *Request {
path: "/api/v1beta1",
sync: c.Sync,
timeout: c.Timeout,
params: map[string]string{},
pollPeriod: c.PollPeriod,
}
}
// Post begins a POST request.
// Post begins a POST request. Short for c.Verb("POST").
func (c *Client) Post() *Request {
return c.Verb("POST")
}
// Put begins a PUT request.
// Put begins a PUT request. Short for c.Verb("PUT").
func (c *Client) Put() *Request {
return c.Verb("PUT")
}
// Get begins a GET request.
// Get begins a GET request. Short for c.Verb("GET").
func (c *Client) Get() *Request {
return c.Verb("GET")
}
// Delete begins a DELETE request.
// Delete begins a DELETE request. Short for c.Verb("DELETE").
func (c *Client) Delete() *Request {
return c.Verb("DELETE")
}
@ -90,6 +98,7 @@ type Request struct {
verb string
path string
body io.Reader
params map[string]string
selector labels.Selector
timeout time.Duration
sync bool
@ -105,7 +114,7 @@ func (r *Request) Path(item string) *Request {
return r
}
// Sync sets sync/async call status.
// Sync sets sync/async call status by setting the "sync" parameter to "true"/"false"
func (r *Request) Sync(sync bool) *Request {
if r.err != nil {
return r
@ -123,25 +132,48 @@ func (r *Request) AbsPath(path string) *Request {
return r
}
// ParseSelector parses the given string as a resource label selector. Optional.
func (r *Request) ParseSelector(item string) *Request {
// ParseSelectorParam parses the given string as a resource label selector.
// This is a convenience function so you don't have to first check that it's a
// validly formatted selector.
func (r *Request) ParseSelectorParam(paramName, item string) *Request {
if r.err != nil {
return r
}
r.selector, r.err = labels.ParseSelector(item)
return r
sel, err := labels.ParseSelector(item)
if err != nil {
r.err = err
return r
}
return r.setParam(paramName, sel.String())
}
// Selector makes the request use the given selector.
func (r *Request) Selector(s labels.Selector) *Request {
// SelectorParam adds the given selector as a query parameter with the name paramName.
func (r *Request) SelectorParam(paramName string, s labels.Selector) *Request {
if r.err != nil {
return r
}
r.selector = s
return r.setParam(paramName, s.String())
}
// UintParam creates a query parameter with the given value.
func (r *Request) UintParam(paramName string, u uint64) *Request {
if r.err != nil {
return r
}
return r.setParam(paramName, strconv.FormatUint(u, 10))
}
func (r *Request) setParam(paramName, value string) *Request {
if specialParams.Has(paramName) {
r.err = fmt.Errorf("must set %v through the corresponding function, not directly.", paramName)
return r
}
r.params[paramName] = value
return r
}
// Timeout makes the request use the given duration as a timeout. Optional.
// Timeout makes the request use the given duration as a timeout. Sets the "timeout"
// parameter. Ignored if sync=false.
func (r *Request) Timeout(d time.Duration) *Request {
if r.err != nil {
return r
@ -153,6 +185,7 @@ func (r *Request) Timeout(d time.Duration) *Request {
// Body makes the request use obj as the body. Optional.
// If obj is a string, try to read a file of that name.
// If obj is a []byte, send it directly.
// If obj is an io.Reader, use it directly.
// Otherwise, assume obj is an api type and marshall it correctly.
func (r *Request) Body(obj interface{}) *Request {
if r.err != nil {
@ -169,7 +202,7 @@ func (r *Request) Body(obj interface{}) *Request {
case []byte:
r.body = bytes.NewBuffer(t)
case io.Reader:
r.body = obj.(io.Reader)
r.body = t
default:
data, err := api.Encode(obj)
if err != nil {
@ -197,9 +230,11 @@ func (r *Request) PollPeriod(d time.Duration) *Request {
func (r *Request) finalURL() string {
finalURL := r.c.host + r.path
query := url.Values{}
if r.selector != nil {
query.Add("labels", r.selector.String())
for key, value := range r.params {
query.Add(key, value)
}
// sync and timeout are handled specially here, to allow setting them
// in any order.
if r.sync {
query.Add("sync", "true")
if r.timeout != 0 {

View File

@ -49,7 +49,7 @@ func TestDoRequestNewWay(t *testing.T) {
obj, err := s.Verb("POST").
Path("foo/bar").
Path("baz").
ParseSelector("name=foo").
ParseSelectorParam("labels", "name=foo").
Timeout(time.Second).
Body([]byte(reqBody)).
Do().Get()
@ -87,7 +87,7 @@ func TestDoRequestNewWayReader(t *testing.T) {
obj, err := s.Verb("POST").
Path("foo/bar").
Path("baz").
Selector(labels.Set{"name": "foo"}.AsSelector()).
SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()).
Sync(false).
Timeout(time.Second).
Body(bytes.NewBuffer(reqBodyExpected)).
@ -127,7 +127,7 @@ func TestDoRequestNewWayObj(t *testing.T) {
obj, err := s.Verb("POST").
Path("foo/bar").
Path("baz").
Selector(labels.Set{"name": "foo"}.AsSelector()).
SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()).
Timeout(time.Second).
Body(reqObj).
Do().Get()
@ -180,7 +180,7 @@ func TestDoRequestNewWayFile(t *testing.T) {
obj, err := s.Verb("POST").
Path("foo/bar").
Path("baz").
ParseSelector("name=foo").
ParseSelectorParam("labels", "name=foo").
Timeout(time.Second).
Body(file.Name()).
Do().Get()
@ -244,6 +244,45 @@ func TestSync(t *testing.T) {
}
}
func TestUintParam(t *testing.T) {
table := []struct {
name string
testVal uint64
expectStr string
}{
{"foo", 31415, "?foo=31415"},
{"bar", 42, "?bar=42"},
{"baz", 0, "?baz=0"},
}
for _, item := range table {
c := New("", nil)
r := c.Get().AbsPath("").UintParam(item.name, item.testVal)
if e, a := item.expectStr, r.finalURL(); e != a {
t.Errorf("expected %v, got %v", e, a)
}
}
}
func TestUnacceptableParamNames(t *testing.T) {
table := []struct {
name string
testVal string
expectSuccess bool
}{
{"sync", "foo", false},
{"timeout", "42", false},
}
for _, item := range table {
c := New("", nil)
r := c.Get().setParam(item.name, item.testVal)
if e, a := item.expectSuccess, r.err == nil; e != a {
t.Errorf("expected %v, got %v (%v)", e, a, r.err)
}
}
}
func TestSetPollPeriod(t *testing.T) {
c := New("", nil)
r := c.Get()

View File

@ -24,7 +24,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog"
)
@ -37,9 +36,6 @@ type ReplicationManager struct {
// To allow injection of syncReplicationController for testing.
syncHandler func(controllerSpec api.ReplicationController) error
// To allow injection of watch creation.
watchMaker func() (watch.Interface, error)
}
// PodControlInterface is an interface that knows how to add or delete pods
@ -85,28 +81,23 @@ func MakeReplicationManager(kubeClient client.Interface) *ReplicationManager {
},
}
rm.syncHandler = rm.syncReplicationController
rm.watchMaker = rm.makeAPIWatch
return rm
}
// Run begins watching and syncing.
func (rm *ReplicationManager) Run(period time.Duration) {
rm.syncTime = time.Tick(period)
go util.Forever(func() { rm.watchControllers() }, period)
resourceVersion := uint64(0)
go util.Forever(func() { rm.watchControllers(&resourceVersion) }, period)
}
// makeAPIWatch starts watching via the apiserver.
func (rm *ReplicationManager) makeAPIWatch() (watch.Interface, error) {
// TODO: Fix this ugly type assertion.
return rm.kubeClient.(*client.Client).
Get().
Path("watch").
Path("replicationControllers").
Watch()
}
func (rm *ReplicationManager) watchControllers() {
watching, err := rm.watchMaker()
// resourceVersion is a pointer to the resource version to use/update.
func (rm *ReplicationManager) watchControllers(resourceVersion *uint64) {
watching, err := rm.kubeClient.WatchReplicationControllers(
labels.Everything(),
labels.Everything(),
*resourceVersion,
)
if err != nil {
glog.Errorf("Unexpected failure to watch: %v", err)
time.Sleep(5 * time.Second)
@ -128,6 +119,8 @@ func (rm *ReplicationManager) watchControllers() {
if rc, ok := event.Object.(*api.ReplicationController); !ok {
glog.Errorf("unexpected object: %#v", event.Object)
} else {
// If we get disconnected, start where we left off.
*resourceVersion = rc.ResourceVersion + 1
rm.syncHandler(*rc)
}
}

View File

@ -28,6 +28,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/tools"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -305,7 +306,7 @@ func TestSyncronize(t *testing.T) {
w.WriteHeader(http.StatusNotFound)
t.Errorf("Unexpected request for %v", req.RequestURI)
})
testServer := httptest.NewTLSServer(mux)
testServer := httptest.NewServer(mux)
client := client.New(testServer.URL, nil)
manager := MakeReplicationManager(client)
fakePodControl := FakePodControl{}
@ -316,13 +317,18 @@ func TestSyncronize(t *testing.T) {
validateSyncReplication(t, &fakePodControl, 7, 0)
}
func TestWatchControllers(t *testing.T) {
fakeWatcher := watch.NewFake()
manager := MakeReplicationManager(nil)
manager.watchMaker = func() (watch.Interface, error) {
return fakeWatcher, nil
}
type FakeWatcher struct {
w *watch.FakeWatcher
*client.FakeClient
}
func (fw FakeWatcher) WatchReplicationControllers(l, f labels.Selector, rv uint64) (watch.Interface, error) {
return fw.w, nil
}
func TestWatchControllers(t *testing.T) {
client := FakeWatcher{watch.NewFake(), &client.FakeClient{}}
manager := MakeReplicationManager(client)
var testControllerSpec api.ReplicationController
received := make(chan struct{})
manager.syncHandler = func(controllerSpec api.ReplicationController) error {
@ -333,11 +339,12 @@ func TestWatchControllers(t *testing.T) {
return nil
}
go manager.watchControllers()
resourceVersion := uint64(0)
go manager.watchControllers(&resourceVersion)
// Test normal case
testControllerSpec.ID = "foo"
fakeWatcher.Add(&testControllerSpec)
client.w.Add(&testControllerSpec)
select {
case <-received:

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
type Action struct {
@ -90,6 +91,11 @@ func (client *FakeKubeClient) DeleteReplicationController(controller string) err
return nil
}
func (client *FakeKubeClient) WatchReplicationControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
client.actions = append(client.actions, Action{action: "watch-controllers"})
return watch.NewFake(), nil
}
func (client *FakeKubeClient) GetService(name string) (api.Service, error) {
client.actions = append(client.actions, Action{action: "get-service", value: name})
return api.Service{}, nil

View File

@ -17,7 +17,6 @@ limitations under the License.
package registry
import (
"errors"
"fmt"
"time"
@ -138,12 +137,6 @@ func (storage *ControllerRegistryStorage) waitForController(ctrl api.Replication
// WatchAll returns ReplicationController events via a watch.Interface, implementing
// apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchAll() (watch.Interface, error) {
return storage.registry.WatchControllers()
}
// WatchSingle returns events for a single ReplicationController via a watch.Interface,
// implementing apiserver.ResourceWatcher.
func (storage *ControllerRegistryStorage) WatchSingle(id string) (watch.Interface, error) {
return nil, errors.New("unimplemented")
func (storage *ControllerRegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return storage.registry.WatchControllers(label, field, resourceVersion)
}

View File

@ -29,6 +29,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// TODO: Why do we have this AND MemoryRegistry?
type MockControllerRegistry struct {
err error
controllers []api.ReplicationController
@ -49,10 +50,12 @@ func (registry *MockControllerRegistry) CreateController(controller api.Replicat
func (registry *MockControllerRegistry) UpdateController(controller api.ReplicationController) error {
return registry.err
}
func (registry *MockControllerRegistry) DeleteController(ID string) error {
return registry.err
}
func (registry *MockControllerRegistry) WatchControllers() (watch.Interface, error) {
func (registry *MockControllerRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, registry.err
}

View File

@ -205,9 +205,13 @@ func (registry *EtcdRegistry) ListControllers() ([]api.ReplicationController, er
}
// WatchControllers begins watching for new, changed, or deleted controllers.
// TODO: Add id/selector parameters?
func (registry *EtcdRegistry) WatchControllers() (watch.Interface, error) {
return registry.helper.WatchList("/registry/controllers", tools.Everything)
func (registry *EtcdRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !field.Empty() {
return nil, fmt.Errorf("no field selector implemented for controllers")
}
return registry.helper.WatchList("/registry/controllers", resourceVersion, func(obj interface{}) bool {
return label.Matches(labels.Set(obj.(*api.ReplicationController).Labels))
})
}
func makeControllerKey(id string) string {

View File

@ -39,7 +39,7 @@ type PodRegistry interface {
// ControllerRegistry is an interface for things that know how to store ReplicationControllers.
type ControllerRegistry interface {
ListControllers() ([]api.ReplicationController, error)
WatchControllers() (watch.Interface, error)
WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error)
GetController(controllerID string) (*api.ReplicationController, error)
CreateController(controller api.ReplicationController) error
UpdateController(controller api.ReplicationController) error

View File

@ -89,7 +89,7 @@ func (registry *MemoryRegistry) ListControllers() ([]api.ReplicationController,
return result, nil
}
func (registry *MemoryRegistry) WatchControllers() (watch.Interface, error) {
func (registry *MemoryRegistry) WatchControllers(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, errors.New("unimplemented")
}

View File

@ -296,18 +296,19 @@ func Everything(interface{}) bool {
// WatchList begins watching the specified key's items. Items are decoded into
// API objects, and any items passing 'filter' are sent down the returned
// watch.Interface.
func (h *EtcdHelper) WatchList(key string, filter FilterFunc) (watch.Interface, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updateds).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
w := newEtcdWatcher(true, filter, h.Codec)
go w.etcdWatch(h.Client, key)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
// Watch begins watching the specified key. Events are decoded into
// API objects and sent down the returned watch.Interface.
func (h *EtcdHelper) Watch(key string) (watch.Interface, error) {
func (h *EtcdHelper) Watch(key string, resourceVersion uint64) (watch.Interface, error) {
w := newEtcdWatcher(false, nil, h.Codec)
go w.etcdWatch(h.Client, key)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
}
@ -350,10 +351,10 @@ func newEtcdWatcher(list bool, filter FilterFunc, encoding Codec) *etcdWatcher {
// etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called
// as a goroutine.
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string) {
func (w *etcdWatcher) etcdWatch(client EtcdGetSet, key string, resourceVersion uint64) {
defer util.HandleCrash()
defer close(w.etcdCallEnded)
_, err := client.Watch(key, 0, w.list, w.etcdIncoming, w.etcdStop)
_, err := client.Watch(key, resourceVersion, w.list, w.etcdIncoming, w.etcdStop)
if err != etcd.ErrWatchStoppedByUser {
glog.Errorf("etcd.Watch stopped unexpectedly: %v (%#v)", err, err)
}
@ -385,18 +386,20 @@ func (w *etcdWatcher) sendResult(res *etcd.Response) {
var action watch.EventType
var data []byte
switch res.Action {
case "create", "set":
case "create":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
// TODO: Is this conditional correct?
if res.EtcdIndex > 0 {
action = watch.Modified
} else {
action = watch.Added
action = watch.Added
case "set":
if res.Node == nil {
glog.Errorf("unexpected nil node: %#v", res)
return
}
data = []byte(res.Node.Value)
action = watch.Modified
case "delete":
if res.PrevNode == nil {
glog.Errorf("unexpected nil prev node: %#v", res)

View File

@ -325,6 +325,30 @@ func TestAtomicUpdate_CreateCollision(t *testing.T) {
}
}
func TestWatchInterpretation_ListCreate(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
return true
}, codec)
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}
podBytes, _ := codec.Encode(pod)
go w.sendResult(&etcd.Response{
Action: "create",
Node: &etcd.Node{
Value: string(podBytes),
},
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
t.Errorf("Expected %v, got %v", e, a)
}
}
func TestWatchInterpretation_ListAdd(t *testing.T) {
w := newEtcdWatcher(true, func(interface{}) bool {
t.Errorf("unexpected filter call")
@ -341,7 +365,7 @@ func TestWatchInterpretation_ListAdd(t *testing.T) {
})
got := <-w.outgoing
if e, a := watch.Added, got.Type; e != a {
if e, a := watch.Modified, got.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, got.Object; !reflect.DeepEqual(e, a) {
@ -420,7 +444,7 @@ func TestWatch(t *testing.T) {
fakeClient := MakeFakeEtcdClient(t)
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/some/key")
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
@ -438,7 +462,7 @@ func TestWatch(t *testing.T) {
}
event := <-watching.ResultChan()
if e, a := watch.Added, event.Type; e != a {
if e, a := watch.Modified, event.Type; e != a {
t.Errorf("Expected %v, got %v", e, a)
}
if e, a := pod, event.Object; !reflect.DeepEqual(e, a) {
@ -462,7 +486,7 @@ func TestWatchPurposefulShutdown(t *testing.T) {
h := EtcdHelper{fakeClient, codec, versioner}
// Test purposeful shutdown
watching, err := h.Watch("/some/key")
watching, err := h.Watch("/some/key", 0)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}