mirror of https://github.com/k3s-io/k3s
commit
6aa3a74cf9
|
@ -293,7 +293,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, f
|
|||
return err
|
||||
}
|
||||
|
||||
// To avoid situation when List is proceesed before the underlying
|
||||
// To avoid situation when List is processed before the underlying
|
||||
// watchCache is propagated for the first time, we acquire and immediately
|
||||
// release the 'usable' lock.
|
||||
// We don't need to hold it all the time, because watchCache is thread-safe
|
||||
|
@ -399,6 +399,11 @@ func filterFunction(key string, keyFunc func(runtime.Object) (string, error), fi
|
|||
|
||||
// Returns resource version to which the underlying cache is synced.
|
||||
func (c *Cacher) LastSyncResourceVersion() (uint64, error) {
|
||||
// To avoid situation when LastSyncResourceVersion is processed before the
|
||||
// underlying watchCache is propagated, we acquire 'usable' lock.
|
||||
c.usable.RLock()
|
||||
defer c.usable.RUnlock()
|
||||
|
||||
c.RLock()
|
||||
defer c.RUnlock()
|
||||
|
||||
|
|
|
@ -178,8 +178,16 @@ func TestWatch(t *testing.T) {
|
|||
podFooBis := makeTestPod("foo")
|
||||
podFooBis.Spec.NodeName = "anotherFakeNode"
|
||||
|
||||
// initialVersion is used to initate the watcher at the beginning of the world,
|
||||
// which is not defined precisely in etcd.
|
||||
initialVersion, err := cacher.LastSyncResourceVersion()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
startVersion := strconv.Itoa(int(initialVersion))
|
||||
|
||||
// Set up Watch for object "podFoo".
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", "1", storage.Everything)
|
||||
watcher, err := cacher.Watch(context.TODO(), "pods/ns/foo", startVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
@ -227,15 +235,23 @@ func TestWatcherTimeout(t *testing.T) {
|
|||
cacher := newTestCacher(etcdStorage)
|
||||
defer cacher.Stop()
|
||||
|
||||
// initialVersion is used to initate the watcher at the beginning of the world,
|
||||
// which is not defined precisely in etcd.
|
||||
initialVersion, err := cacher.LastSyncResourceVersion()
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
startVersion := strconv.Itoa(int(initialVersion))
|
||||
|
||||
// Create a watcher that will not be reading any result.
|
||||
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
|
||||
watcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
defer watcher.Stop()
|
||||
|
||||
// Create a second watcher that will be reading result.
|
||||
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", "1", storage.Everything)
|
||||
readingWatcher, err := cacher.WatchList(context.TODO(), "pods/ns", startVersion, storage.Everything)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected error: %v", err)
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue