From 4c3f509d94d63db655da8f339c363bbc3300ee53 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 15:42:05 -0700 Subject: [PATCH] Make cache.Reflector more injectable. Add test for resource version state keeping. --- pkg/client/cache/reflector.go | 44 +++++++++------ pkg/client/cache/reflector_test.go | 90 ++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 48 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index bb7be90660..14b97903b4 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -21,7 +21,6 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" @@ -41,42 +40,49 @@ type Store interface { // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { - kubeClient *client.Client - resource string + // The type of object we expect to place in the store. expectedType reflect.Type - store Store + // The destination to sync up with the watch source + store Store + // watchCreater is called to initiate watches. + watchFactory WatchFactory + // loopDelay controls timing between one watch ending and + // the beginning of the next one. + loopDelay time.Duration } +// WatchFactory should begin a watch at the specified version. +type WatchFactory func(resourceVersion uint64) (watch.Interface, error) + // NewReflector makes a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. -// TODO: define a query so you only locally cache a subset of items. -func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector { +func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector { gc := &Reflector{ - resource: resource, - kubeClient: kubeClient, + watchFactory: watchFactory, store: store, expectedType: reflect.TypeOf(expectedType), + loopDelay: time.Second, } return gc } +// Run starts a watch and handles watch events. Will restart the watch if it is closed. +// Run starts a goroutine and returns immediately. func (gc *Reflector) Run() { + var resourceVersion uint64 go util.Forever(func() { - w, err := gc.startWatch() + w, err := gc.watchFactory(resourceVersion) if err != nil { - glog.Errorf("failed to watch %v: %v", gc.resource, err) + glog.Errorf("failed to watch %v: %v", gc.expectedType, err) return } - gc.watchHandler(w) - }, 5*time.Second) + gc.watchHandler(w, &resourceVersion) + }, gc.loopDelay) } -func (gc *Reflector) startWatch() (watch.Interface, error) { - return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch() -} - -func (gc *Reflector) watchHandler(w watch.Interface) { +// watchHandler watches w and keeps *resourceVersion up to date. +func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { for { event, ok := <-w.ResultChan() if !ok { @@ -102,5 +108,9 @@ func (gc *Reflector) watchHandler(w watch.Interface) { default: glog.Errorf("unable to understand watch event %#v", event) } + next := jsonBase.ResourceVersion() + 1 + if next > *resourceVersion { + *resourceVersion = next + } } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index ec32a7727a..fa2b702a64 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -17,29 +17,27 @@ limitations under the License. package cache import ( - "net/http" - "net/http/httptest" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestReflector_watchHandler(t *testing.T) { s := NewStore() - g := NewReflector("foo", nil, &api.Pod{}, s) + g := NewReflector(nil, &api.Pod{}, s) fw := watch.NewFake() s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) go func() { fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) - fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) fw.Stop() }() - g.watchHandler(fw) + var resumeRV uint64 + g.watchHandler(fw, &resumeRV) table := []struct { ID string @@ -49,7 +47,7 @@ func TestReflector_watchHandler(t *testing.T) { {"foo", 0, false}, {"rejected", 0, false}, {"bar", 55, true}, - {"baz", 0, true}, + {"baz", 32, true}, } for _, item := range table { obj, exists := s.Get(item.ID) @@ -63,32 +61,62 @@ func TestReflector_watchHandler(t *testing.T) { t.Errorf("%v: expected %v, got %v", item.ID, e, a) } } + + // RV should stay 1 higher than the highest id we see. + if e, a := uint64(56), resumeRV; e != a { + t.Errorf("expected %v, got %v", e, a) + } } -func TestReflector_startWatch(t *testing.T) { - table := []struct{ resource, path string }{ - {"pods", "/api/v1beta1/pods/watch"}, - {"services", "/api/v1beta1/services/watch"}, - } - for _, testItem := range table { - got := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusNotFound) - if req.URL.Path == testItem.path { - close(got) - return - } - t.Errorf("unexpected path %v", req.URL.Path) - })) - s := NewStore() - c := client.New(srv.URL, nil) - g := NewReflector(testItem.resource, c, &api.Pod{}, s) - _, err := g.startWatch() - // We're just checking that it watches the right path. - if err == nil { - t.Errorf("unexpected non-error") +func TestReflector_Run(t *testing.T) { + createdFakes := make(chan *watch.FakeWatcher) + + // Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we + // inject an error at 2. + expectedRVs := []uint64{0, 3} + watchStarter := func(rv uint64) (watch.Interface, error) { + fw := watch.NewFake() + if e, a := expectedRVs[0], rv; e != a { + t.Errorf("Expected rv %v, but got %v", e, a) } - <-got + expectedRVs = expectedRVs[1:] + // channel is not buffered because the for loop below needs to block. But + // we don't want to block here, so report the new fake via a go routine. + go func() { createdFakes <- fw }() + return fw, nil + } + s := NewFIFO() + r := NewReflector(watchStarter, &api.Pod{}, s) + r.loopDelay = 0 + r.Run() + + ids := []string{"foo", "bar", "baz", "qux", "zoo"} + var fw *watch.FakeWatcher + for i, id := range ids { + if fw == nil { + fw = <-createdFakes + } + sendingRV := uint64(i + 1) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}}) + if sendingRV == 2 { + // Inject a failure. + fw.Stop() + fw = nil + } + } + + // Verify we received the right ids with the right resource versions. + for i, id := range ids { + pod := s.Pop().(*api.Pod) + if e, a := id, pod.ID; e != a { + t.Errorf("%v: Expected %v, got %v", i, e, a) + } + if e, a := uint64(i+1), pod.ResourceVersion; e != a { + t.Errorf("%v: Expected %v, got %v", i, e, a) + } + } + + if len(expectedRVs) != 0 { + t.Error("called watchStarter an unexpected number of times") } }