From 8b571bb63bd8a9a6a37db6046a6ab35d3b047bf4 Mon Sep 17 00:00:00 2001 From: Clayton Coleman Date: Sun, 3 Sep 2017 15:04:54 -0400 Subject: [PATCH] Disable default paging in list watches For 1.8 this will be off by default. In 1.9 it will be on by default. Add tests and rename some fields to use the `chunking` terminology. Note that the pager may be used for other things besides chunking. --- pkg/client/tests/listwatch_test.go | 2 +- .../garbagecollector/garbagecollector_test.go | 2 +- .../k8s.io/client-go/tools/cache/listwatch.go | 12 +- .../src/k8s.io/client-go/tools/pager/BUILD | 15 ++ .../src/k8s.io/client-go/tools/pager/pager.go | 8 +- .../client-go/tools/pager/pager_test.go | 206 ++++++++++++++++++ 6 files changed, 237 insertions(+), 8 deletions(-) create mode 100644 staging/src/k8s.io/client-go/tools/pager/pager_test.go diff --git a/pkg/client/tests/listwatch_test.go b/pkg/client/tests/listwatch_test.go index fe6c3351f2..b3fd42d619 100644 --- a/pkg/client/tests/listwatch_test.go +++ b/pkg/client/tests/listwatch_test.go @@ -104,7 +104,7 @@ func TestListWatchesCanList(t *testing.T) { defer server.Close() client := clientset.NewForConfigOrDie(&restclient.Config{Host: server.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) lw := NewListWatchFromClient(client.Core().RESTClient(), item.resource, item.namespace, item.fieldSelector) - lw.DisablePaging = true + lw.DisableChunking = true // This test merely tests that the correct request is made. lw.List(metav1.ListOptions{}) handler.ValidateRequest(t, item.location, "GET", nil) diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 0d25e4f2a5..b6b2dc4788 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -420,7 +420,7 @@ func TestGCListWatcher(t *testing.T) { t.Fatal(err) } lw := listWatcher(client, podResource) - lw.DisablePaging = true + lw.DisableChunking = true if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil { t.Fatal(err) } diff --git a/staging/src/k8s.io/client-go/tools/cache/listwatch.go b/staging/src/k8s.io/client-go/tools/cache/listwatch.go index cab48ae00a..55a90b631d 100644 --- a/staging/src/k8s.io/client-go/tools/cache/listwatch.go +++ b/staging/src/k8s.io/client-go/tools/cache/listwatch.go @@ -49,9 +49,11 @@ type WatchFunc func(options metav1.ListOptions) (watch.Interface, error) // It is a convenience function for users of NewReflector, etc. // ListFunc and WatchFunc must not be nil type ListWatch struct { - ListFunc ListFunc - WatchFunc WatchFunc - DisablePaging bool + ListFunc ListFunc + WatchFunc WatchFunc + // DisableChunking requests no chunking for this list watcher. It has no effect in Kubernetes 1.8, but in + // 1.9 will allow a controller to opt out of chunking. + DisableChunking bool } // Getter interface knows how to access Get method from RESTClient. @@ -91,7 +93,9 @@ func timeoutFromListOptions(options metav1.ListOptions) time.Duration { // List a set of apiserver resources func (lw *ListWatch) List(options metav1.ListOptions) (runtime.Object, error) { - if !lw.DisablePaging { + // chunking will become the default for list watchers starting in Kubernetes 1.9, unless + // otherwise disabled. + if false && !lw.DisableChunking { return pager.New(pager.SimplePageFunc(lw.ListFunc)).List(context.TODO(), options) } return lw.ListFunc(options) diff --git a/staging/src/k8s.io/client-go/tools/pager/BUILD b/staging/src/k8s.io/client-go/tools/pager/BUILD index fa07ba8c94..e7b95c40da 100644 --- a/staging/src/k8s.io/client-go/tools/pager/BUILD +++ b/staging/src/k8s.io/client-go/tools/pager/BUILD @@ -5,6 +5,7 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( @@ -34,3 +35,17 @@ filegroup( tags = ["automanaged"], visibility = ["//visibility:public"], ) + +go_test( + name = "go_default_test", + srcs = ["pager_test.go"], + library = ":go_default_library", + deps = [ + "//vendor/golang.org/x/net/context:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/internalversion:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1alpha1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + ], +) diff --git a/staging/src/k8s.io/client-go/tools/pager/pager.go b/staging/src/k8s.io/client-go/tools/pager/pager.go index a4a04cdc32..2e0874e0e5 100644 --- a/staging/src/k8s.io/client-go/tools/pager/pager.go +++ b/staging/src/k8s.io/client-go/tools/pager/pager.go @@ -52,7 +52,8 @@ type ListPager struct { } // New creates a new pager from the provided pager function using the default -// options. +// options. It will fall back to a full list if an expiration error is encountered +// as a last resort. func New(fn ListPageFunc) *ListPager { return &ListPager{ PageSize: defaultPageSize, @@ -61,9 +62,12 @@ func New(fn ListPageFunc) *ListPager { } } +// TODO: introduce other types of paging functions - such as those that retrieve from a list +// of namespaces. + // List returns a single list object, but attempts to retrieve smaller chunks from the // server to reduce the impact on the server. If the chunk attempt fails, it will load -// the full list instead. +// the full list instead. The Limit field on options, if unset, will default to the page size. func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { if options.Limit == 0 { options.Limit = p.PageSize diff --git a/staging/src/k8s.io/client-go/tools/pager/pager_test.go b/staging/src/k8s.io/client-go/tools/pager/pager_test.go new file mode 100644 index 0000000000..6e3e9444ab --- /dev/null +++ b/staging/src/k8s.io/client-go/tools/pager/pager_test.go @@ -0,0 +1,206 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package pager + +import ( + "fmt" + "reflect" + "testing" + + "golang.org/x/net/context" + "k8s.io/apimachinery/pkg/api/errors" + metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1alpha1 "k8s.io/apimachinery/pkg/apis/meta/v1alpha1" + "k8s.io/apimachinery/pkg/runtime" +) + +func list(count int, rv string) *metainternalversion.List { + var list metainternalversion.List + for i := 0; i < count; i++ { + list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%d", i), + }, + }) + } + list.ResourceVersion = rv + return &list +} + +type testPager struct { + t *testing.T + rv string + index int + remaining int + last int + continuing bool + done bool + expectPage int64 +} + +func (p *testPager) reset() { + p.continuing = false + p.remaining += p.index + p.index = 0 + p.last = 0 + p.done = false +} + +func (p *testPager) PagedList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.done { + p.t.Errorf("did not expect additional call to paged list") + return nil, fmt.Errorf("unexpected list call") + } + expectedContinue := fmt.Sprintf("%s:%d", p.rv, p.last) + if options.Limit != p.expectPage || (p.continuing && options.Continue != expectedContinue) { + p.t.Errorf("invariant violated, expected limit %d and continue %s, got %#v", p.expectPage, expectedContinue, options) + return nil, fmt.Errorf("invariant violated") + } + var list metainternalversion.List + total := options.Limit + if total == 0 { + total = int64(p.remaining) + } + for i := int64(0); i < total; i++ { + if p.remaining <= 0 { + break + } + list.Items = append(list.Items, &metav1alpha1.PartialObjectMetadata{ + ObjectMeta: metav1.ObjectMeta{ + Name: fmt.Sprintf("%d", p.index), + }, + }) + p.remaining-- + p.index++ + } + p.last = p.index + if p.remaining > 0 { + list.Continue = fmt.Sprintf("%s:%d", p.rv, p.last) + p.continuing = true + } else { + p.done = true + } + list.ResourceVersion = p.rv + return &list, nil +} + +func (p *testPager) ExpiresOnSecondPage(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.continuing { + p.done = true + return nil, errors.NewResourceExpired("this list has expired") + } + return p.PagedList(ctx, options) +} + +func (p *testPager) ExpiresOnSecondPageThenFullList(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) { + if p.continuing { + p.reset() + p.expectPage = 0 + return nil, errors.NewResourceExpired("this list has expired") + } + return p.PagedList(ctx, options) +} + +func TestListPager_List(t *testing.T) { + type fields struct { + PageSize int64 + PageFn ListPageFunc + FullListIfExpired bool + } + type args struct { + ctx context.Context + options metav1.ListOptions + } + tests := []struct { + name string + fields fields + args args + want runtime.Object + wantErr bool + isExpired bool + }{ + { + name: "empty page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 0, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(0, "rv:20"), + }, + { + name: "one page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 9, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(9, "rv:20"), + }, + { + name: "one full page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 10, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(10, "rv:20"), + }, + { + name: "two pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 11, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(11, "rv:20"), + }, + { + name: "three pages", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).PagedList}, + args: args{}, + want: list(21, "rv:20"), + }, + { + name: "expires on second page", + fields: fields{PageSize: 10, PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPage}, + args: args{}, + wantErr: true, + isExpired: true, + }, + { + name: "expires on second page and then lists", + fields: fields{ + FullListIfExpired: true, + PageSize: 10, + PageFn: (&testPager{t: t, expectPage: 10, remaining: 21, rv: "rv:20"}).ExpiresOnSecondPageThenFullList, + }, + args: args{}, + want: list(21, "rv:20"), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &ListPager{ + PageSize: tt.fields.PageSize, + PageFn: tt.fields.PageFn, + FullListIfExpired: tt.fields.FullListIfExpired, + } + got, err := p.List(tt.args.ctx, tt.args.options) + if (err != nil) != tt.wantErr { + t.Errorf("ListPager.List() error = %v, wantErr %v", err, tt.wantErr) + return + } + if tt.isExpired != errors.IsResourceExpired(err) { + t.Errorf("ListPager.List() error = %v, isExpired %v", err, tt.isExpired) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("ListPager.List() = %v, want %v", got, tt.want) + } + }) + } +}