diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index b7960417a4..98b2533971 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -51,6 +51,10 @@ type Reflector struct { // the beginning of the next one. period time.Duration resyncPeriod time.Duration + // lastSyncResourceVersion is the resource version token last + // observed when doing a sync with the underlying store + // it is not thread safe as it is not synchronized with access to the store + lastSyncResourceVersion string } // NewNamespaceKeyedIndexerAndReflector creates an Indexer and a Reflector @@ -130,6 +134,7 @@ func (r *Reflector) listAndWatch() { glog.Errorf("Unable to sync list result: %v", err) return } + r.lastSyncResourceVersion = resourceVersion for { w, err := r.listerWatcher.Watch(resourceVersion) @@ -203,6 +208,7 @@ loop: glog.Errorf("unable to understand watch event %#v", event) } *resourceVersion = meta.ResourceVersion() + r.lastSyncResourceVersion = *resourceVersion eventCount++ } } @@ -215,3 +221,9 @@ loop: glog.V(4).Infof("Watch close - %v total %v items received", r.expectedType, eventCount) return nil } + +// LastSyncResourceVersion is the resource version observed when last sync with the underlying store +// The value returned is not synchronized with access to the underlying store and is not thread-safe +func (r *Reflector) LastSyncResourceVersion() string { + return r.lastSyncResourceVersion +} diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index 5996329bd3..7337a7b1f5 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -112,6 +112,11 @@ func TestReflector_watchHandler(t *testing.T) { if e, a := "32", resumeRV; e != a { t.Errorf("expected %v, got %v", e, a) } + + // last sync resource version should be the last version synced with store + if e, a := "32", g.LastSyncResourceVersion(); e != a { + t.Errorf("expected %v, got %v", e, a) + } } func TestReflector_watchHandlerTimeout(t *testing.T) {