mirror of https://github.com/k3s-io/k3s
support continueToken for inconsistent list
parent
2d7b92ee74
commit
0a7286c6b2
|
@ -76,9 +76,10 @@ type ListMeta struct {
|
||||||
// continue may be set if the user set a limit on the number of items returned, and indicates that
|
// continue may be set if the user set a limit on the number of items returned, and indicates that
|
||||||
// the server has more data available. The value is opaque and may be used to issue another request
|
// the server has more data available. The value is opaque and may be used to issue another request
|
||||||
// to the endpoint that served this list to retrieve the next set of available objects. Continuing a
|
// to the endpoint that served this list to retrieve the next set of available objects. Continuing a
|
||||||
// list may not be possible if the server configuration has changed or more than a few minutes have
|
// consistent list may not be possible if the server configuration has changed or more than a few
|
||||||
// passed. The resourceVersion field returned when using this continue value will be identical to
|
// minutes have passed. The resourceVersion field returned when using this continue value will be
|
||||||
// the value in the first response.
|
// identical to the value in the first response, unless you have received this token from an error
|
||||||
|
// message.
|
||||||
Continue string `json:"continue,omitempty" protobuf:"bytes,3,opt,name=continue"`
|
Continue string `json:"continue,omitempty" protobuf:"bytes,3,opt,name=continue"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -363,14 +364,20 @@ type ListOptions struct {
|
||||||
// updated during a chunked list the version of the object that was present at the time the first list
|
// updated during a chunked list the version of the object that was present at the time the first list
|
||||||
// result was calculated is returned.
|
// result was calculated is returned.
|
||||||
Limit int64 `json:"limit,omitempty" protobuf:"varint,7,opt,name=limit"`
|
Limit int64 `json:"limit,omitempty" protobuf:"varint,7,opt,name=limit"`
|
||||||
// The continue option should be set when retrieving more results from the server. Since this value
|
// The continue option should be set when retrieving more results from the server. Since this value is
|
||||||
// is server defined, clients may only use the continue value from a previous query result with
|
// server defined, clients may only use the continue value from a previous query result with identical
|
||||||
// identical query parameters (except for the value of continue) and the server may reject a continue
|
// query parameters (except for the value of continue) and the server may reject a continue value it
|
||||||
// value it does not recognize. If the specified continue value is no longer valid whether due to
|
// does not recognize. If the specified continue value is no longer valid whether due to expiration
|
||||||
// expiration (generally five to fifteen minutes) or a configuration change on the server the server
|
// (generally five to fifteen minutes) or a configuration change on the server, the server will
|
||||||
// will respond with a 410 ResourceExpired error indicating the client must restart their list without
|
// respond with a 410 ResourceExpired error together with a continue token. If the client needs a
|
||||||
// the continue field. This field is not supported when watch is true. Clients may start a watch from
|
// consistent list, it must restart their list without the continue field. Otherwise, the client may
|
||||||
// the last resourceVersion value returned by the server and not miss any modifications.
|
// send another list request with the token received with the 410 error, the server will respond with
|
||||||
|
// a list starting from the next key, but from the latest snapshot, which is inconsistent from the
|
||||||
|
// previous list results - objects that are created, modified, or deleted after the first list request
|
||||||
|
// will be included in the response, as long as their keys are after the "next key".
|
||||||
|
//
|
||||||
|
// This field is not supported when watch is true. Clients may start a watch from the last
|
||||||
|
// resourceVersion value returned by the server and not miss any modifications.
|
||||||
Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"`
|
Continue string `json:"continue,omitempty" protobuf:"bytes,8,opt,name=continue"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
|
|
||||||
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
|
||||||
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||||
)
|
)
|
||||||
|
|
||||||
func interpretWatchError(err error) error {
|
func interpretWatchError(err error) error {
|
||||||
|
@ -30,13 +31,41 @@ func interpretWatchError(err error) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func interpretListError(err error, paging bool) error {
|
const (
|
||||||
|
expired string = "The resourceVersion for the provided list is too old."
|
||||||
|
continueExpired string = "The provided continue parameter is too old " +
|
||||||
|
"to display a consistent list result. You can start a new list without " +
|
||||||
|
"the continue parameter."
|
||||||
|
inconsistentContinue string = "The provided continue parameter is too old " +
|
||||||
|
"to display a consistent list result. You can start a new list without " +
|
||||||
|
"the continue parameter, or use the continue token in this response to " +
|
||||||
|
"retrieve the remainder of the results. Continuing with the provided " +
|
||||||
|
"token results in an inconsistent list - objects that were created, " +
|
||||||
|
"modified, or deleted between the time the first chunk was returned " +
|
||||||
|
"and now may show up in the list."
|
||||||
|
)
|
||||||
|
|
||||||
|
func interpretListError(err error, paging bool, continueKey, keyPrefix string) error {
|
||||||
switch {
|
switch {
|
||||||
case err == etcdrpc.ErrCompacted:
|
case err == etcdrpc.ErrCompacted:
|
||||||
if paging {
|
if paging {
|
||||||
return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.")
|
return handleCompactedErrorForPaging(continueKey, keyPrefix)
|
||||||
}
|
}
|
||||||
return errors.NewResourceExpired("The resourceVersion for the provided list is too old.")
|
return errors.NewResourceExpired(expired)
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func handleCompactedErrorForPaging(continueKey, keyPrefix string) error {
|
||||||
|
// continueToken.ResoureVersion=-1 means that the apiserver can
|
||||||
|
// continue the list at the latest resource version. We don't use rv=0
|
||||||
|
// for this purpose to distinguish from a bad token that has empty rv.
|
||||||
|
newToken, err := encodeContinue(continueKey, keyPrefix, -1)
|
||||||
|
if err != nil {
|
||||||
|
utilruntime.HandleError(err)
|
||||||
|
return errors.NewResourceExpired(continueExpired)
|
||||||
|
}
|
||||||
|
statusError := errors.NewResourceExpired(inconsistentContinue)
|
||||||
|
statusError.ErrStatus.ListMeta.Continue = newToken
|
||||||
|
return statusError
|
||||||
|
}
|
||||||
|
|
|
@ -508,10 +508,11 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||||
options = append(options, clientv3.WithLimit(pred.Limit))
|
options = append(options, clientv3.WithLimit(pred.Limit))
|
||||||
}
|
}
|
||||||
|
|
||||||
var returnedRV int64
|
var returnedRV, continueRV int64
|
||||||
|
var continueKey string
|
||||||
switch {
|
switch {
|
||||||
case s.pagingEnabled && len(pred.Continue) > 0:
|
case s.pagingEnabled && len(pred.Continue) > 0:
|
||||||
continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
|
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
||||||
}
|
}
|
||||||
|
@ -524,9 +525,13 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||||
options = append(options, clientv3.WithRange(rangeEnd))
|
options = append(options, clientv3.WithRange(rangeEnd))
|
||||||
key = continueKey
|
key = continueKey
|
||||||
|
|
||||||
options = append(options, clientv3.WithRev(continueRV))
|
// If continueRV > 0, the LIST request needs a specific resource version.
|
||||||
returnedRV = continueRV
|
// continueRV==0 is invalid.
|
||||||
|
// If continueRV < 0, the request is for the latest resource version.
|
||||||
|
if continueRV > 0 {
|
||||||
|
options = append(options, clientv3.WithRev(continueRV))
|
||||||
|
returnedRV = continueRV
|
||||||
|
}
|
||||||
case s.pagingEnabled && pred.Limit > 0:
|
case s.pagingEnabled && pred.Limit > 0:
|
||||||
if len(resourceVersion) > 0 {
|
if len(resourceVersion) > 0 {
|
||||||
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
||||||
|
@ -563,7 +568,7 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
|
||||||
for {
|
for {
|
||||||
getResp, err := s.client.KV.Get(ctx, key, options...)
|
getResp, err := s.client.KV.Get(ctx, key, options...)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return interpretListError(err, len(pred.Continue) > 0)
|
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
||||||
}
|
}
|
||||||
hasMore = getResp.More
|
hasMore = getResp.More
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"reflect"
|
"reflect"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
|
@ -44,6 +45,7 @@ import (
|
||||||
"k8s.io/apiserver/pkg/apis/example"
|
"k8s.io/apiserver/pkg/apis/example"
|
||||||
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
examplev1 "k8s.io/apiserver/pkg/apis/example/v1"
|
||||||
"k8s.io/apiserver/pkg/storage"
|
"k8s.io/apiserver/pkg/storage"
|
||||||
|
"k8s.io/apiserver/pkg/storage/etcd"
|
||||||
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
storagetests "k8s.io/apiserver/pkg/storage/tests"
|
||||||
"k8s.io/apiserver/pkg/storage/value"
|
"k8s.io/apiserver/pkg/storage/value"
|
||||||
)
|
)
|
||||||
|
@ -1180,6 +1182,153 @@ func TestListContinuation(t *testing.T) {
|
||||||
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
|
||||||
t.Fatalf("Unexpected third page: %#v", out.Items)
|
t.Fatalf("Unexpected third page: %#v", out.Items)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestListInconsistentContinuation(t *testing.T) {
|
||||||
|
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
|
||||||
|
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
|
||||||
|
defer cluster.Terminate(t)
|
||||||
|
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// Setup storage with the following structure:
|
||||||
|
// /
|
||||||
|
// - one-level/
|
||||||
|
// | - test
|
||||||
|
// |
|
||||||
|
// - two-level/
|
||||||
|
// - 1/
|
||||||
|
// | - test
|
||||||
|
// |
|
||||||
|
// - 2/
|
||||||
|
// - test
|
||||||
|
//
|
||||||
|
preset := []struct {
|
||||||
|
key string
|
||||||
|
obj *example.Pod
|
||||||
|
storedObj *example.Pod
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
key: "/one-level/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "/two-level/1/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
key: "/two-level/2/test",
|
||||||
|
obj: &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "bar"}},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for i, ps := range preset {
|
||||||
|
preset[i].storedObj = &example.Pod{}
|
||||||
|
err := store.Create(ctx, ps.key, ps.obj, preset[i].storedObj, 0)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("Set failed: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
|
||||||
|
return storage.SelectionPredicate{
|
||||||
|
Limit: limit,
|
||||||
|
Continue: continueValue,
|
||||||
|
Label: labels.Everything(),
|
||||||
|
Field: fields.Everything(),
|
||||||
|
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
|
||||||
|
pod := obj.(*example.Pod)
|
||||||
|
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
out := &example.PodList{}
|
||||||
|
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil {
|
||||||
|
t.Fatalf("Unable to get initial list: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) == 0 {
|
||||||
|
t.Fatalf("No continuation token set")
|
||||||
|
}
|
||||||
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) {
|
||||||
|
t.Fatalf("Unexpected first page: %#v", out.Items)
|
||||||
|
}
|
||||||
|
|
||||||
|
continueFromSecondItem := out.Continue
|
||||||
|
|
||||||
|
// update /two-level/2/test/bar
|
||||||
|
oldName := preset[2].obj.Name
|
||||||
|
newPod := &example.Pod{
|
||||||
|
ObjectMeta: metav1.ObjectMeta{
|
||||||
|
Name: oldName,
|
||||||
|
Labels: map[string]string{
|
||||||
|
"state": "new",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := store.GuaranteedUpdate(ctx, preset[2].key, preset[2].storedObj, false, nil,
|
||||||
|
func(_ runtime.Object, _ storage.ResponseMeta) (runtime.Object, *uint64, error) {
|
||||||
|
return newPod, nil, nil
|
||||||
|
}, newPod); err != nil {
|
||||||
|
t.Fatalf("update failed: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// compact to latest revision.
|
||||||
|
versioner := etcd.APIObjectVersioner{}
|
||||||
|
lastRVString := preset[2].storedObj.ResourceVersion
|
||||||
|
lastRV, err := versioner.ParseResourceVersion(lastRVString)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if _, err := cluster.Client(0).KV.Compact(ctx, int64(lastRV), clientv3.WithCompactPhysical()); err != nil {
|
||||||
|
t.Fatalf("Unable to compact, %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// The old continue token should have expired
|
||||||
|
err = store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out)
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("unexpected no error")
|
||||||
|
}
|
||||||
|
if !strings.Contains(err.Error(), inconsistentContinue) {
|
||||||
|
t.Fatalf("unexpected error message %v", err)
|
||||||
|
}
|
||||||
|
status, ok := err.(apierrors.APIStatus)
|
||||||
|
if !ok {
|
||||||
|
t.Fatalf("expect error of implements the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||||
|
}
|
||||||
|
inconsistentContinueFromSecondItem := status.Status().ListMeta.Continue
|
||||||
|
if len(inconsistentContinueFromSecondItem) == 0 {
|
||||||
|
t.Fatalf("expect non-empty continue token")
|
||||||
|
}
|
||||||
|
|
||||||
|
out = &example.PodList{}
|
||||||
|
if err := store.List(ctx, "/", "0", pred(1, inconsistentContinueFromSecondItem), out); err != nil {
|
||||||
|
t.Fatalf("Unable to get second page: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) == 0 {
|
||||||
|
t.Fatalf("No continuation token set")
|
||||||
|
}
|
||||||
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) {
|
||||||
|
t.Fatalf("Unexpected second page: %#v", out.Items)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion != lastRVString {
|
||||||
|
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
||||||
|
}
|
||||||
|
continueFromThirdItem := out.Continue
|
||||||
|
out = &example.PodList{}
|
||||||
|
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
|
||||||
|
t.Fatalf("Unable to get second page: %v", err)
|
||||||
|
}
|
||||||
|
if len(out.Continue) != 0 {
|
||||||
|
t.Fatalf("Unexpected continuation token set")
|
||||||
|
}
|
||||||
|
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
|
||||||
|
t.Fatalf("Unexpected third page: %#v", out.Items)
|
||||||
|
}
|
||||||
|
if out.ResourceVersion != lastRVString {
|
||||||
|
t.Fatalf("Expected list resource version to be %s, got %s", lastRVString, out.ResourceVersion)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
|
||||||
|
|
|
@ -19,12 +19,17 @@ package apimachinery
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"reflect"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
|
||||||
"k8s.io/api/core/v1"
|
"k8s.io/api/core/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
|
"k8s.io/apimachinery/pkg/util/wait"
|
||||||
|
"k8s.io/apiserver/pkg/storage/storagebackend"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/kubernetes/test/e2e/framework"
|
"k8s.io/kubernetes/test/e2e/framework"
|
||||||
)
|
)
|
||||||
|
@ -34,11 +39,10 @@ const numberOfTotalResources = 400
|
||||||
var _ = SIGDescribe("Servers with support for API chunking", func() {
|
var _ = SIGDescribe("Servers with support for API chunking", func() {
|
||||||
f := framework.NewDefaultFramework("chunking")
|
f := framework.NewDefaultFramework("chunking")
|
||||||
|
|
||||||
It("should return chunks of results for list calls", func() {
|
BeforeEach(func() {
|
||||||
ns := f.Namespace.Name
|
ns := f.Namespace.Name
|
||||||
c := f.ClientSet
|
c := f.ClientSet
|
||||||
client := c.CoreV1().PodTemplates(ns)
|
client := c.CoreV1().PodTemplates(ns)
|
||||||
|
|
||||||
By("creating a large number of resources")
|
By("creating a large number of resources")
|
||||||
workqueue.Parallelize(20, numberOfTotalResources, func(i int) {
|
workqueue.Parallelize(20, numberOfTotalResources, func(i int) {
|
||||||
for tries := 3; tries >= 0; tries-- {
|
for tries := 3; tries >= 0; tries-- {
|
||||||
|
@ -61,7 +65,12 @@ var _ = SIGDescribe("Servers with support for API chunking", func() {
|
||||||
}
|
}
|
||||||
Fail("Unable to create template %d, exiting", i)
|
Fail("Unable to create template %d, exiting", i)
|
||||||
})
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
It("should return chunks of results for list calls", func() {
|
||||||
|
ns := f.Namespace.Name
|
||||||
|
c := f.ClientSet
|
||||||
|
client := c.CoreV1().PodTemplates(ns)
|
||||||
By("retrieving those results in paged fashion several times")
|
By("retrieving those results in paged fashion several times")
|
||||||
for i := 0; i < 3; i++ {
|
for i := 0; i < 3; i++ {
|
||||||
opts := metav1.ListOptions{}
|
opts := metav1.ListOptions{}
|
||||||
|
@ -81,9 +90,7 @@ var _ = SIGDescribe("Servers with support for API chunking", func() {
|
||||||
if len(lastRV) == 0 {
|
if len(lastRV) == 0 {
|
||||||
lastRV = list.ResourceVersion
|
lastRV = list.ResourceVersion
|
||||||
}
|
}
|
||||||
if lastRV != list.ResourceVersion {
|
Expect(list.ResourceVersion).To(Equal(lastRV))
|
||||||
Expect(list.ResourceVersion).To(Equal(lastRV))
|
|
||||||
}
|
|
||||||
for _, item := range list.Items {
|
for _, item := range list.Items {
|
||||||
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
|
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
|
||||||
found++
|
found++
|
||||||
|
@ -101,4 +108,81 @@ var _ = SIGDescribe("Servers with support for API chunking", func() {
|
||||||
Expect(err).ToNot(HaveOccurred())
|
Expect(err).ToNot(HaveOccurred())
|
||||||
Expect(list.Items).To(HaveLen(numberOfTotalResources))
|
Expect(list.Items).To(HaveLen(numberOfTotalResources))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
It("should support continue listing from the last key if the original version has been compacted away, though the list is inconsistent", func() {
|
||||||
|
ns := f.Namespace.Name
|
||||||
|
c := f.ClientSet
|
||||||
|
client := c.CoreV1().PodTemplates(ns)
|
||||||
|
|
||||||
|
By("retrieving the first page")
|
||||||
|
oneTenth := int64(numberOfTotalResources / 10)
|
||||||
|
opts := metav1.ListOptions{}
|
||||||
|
opts.Limit = oneTenth
|
||||||
|
list, err := client.List(opts)
|
||||||
|
// TODO: kops PR job is still using etcd2, which prevents this feature from working. Remove this check when kops is upgraded to etcd3
|
||||||
|
if len(list.Items) > int(opts.Limit) {
|
||||||
|
framework.Skipf("ERROR: This cluster does not support chunking, which means it is running etcd2 and not supported.")
|
||||||
|
}
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
firstToken := list.Continue
|
||||||
|
firstRV := list.ResourceVersion
|
||||||
|
framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, firstToken)
|
||||||
|
|
||||||
|
By("retrieving the second page until the token expires")
|
||||||
|
opts.Continue = firstToken
|
||||||
|
var inconsistentToken string
|
||||||
|
wait.Poll(20*time.Second, 2*storagebackend.DefaultCompactInterval, func() (bool, error) {
|
||||||
|
_, err := client.List(opts)
|
||||||
|
if err == nil {
|
||||||
|
framework.Logf("Token %s has not expired yet", firstToken)
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err != nil && !errors.IsResourceExpired(err) {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
framework.Logf("got error %s", err)
|
||||||
|
status, ok := err.(errors.APIStatus)
|
||||||
|
if !ok {
|
||||||
|
return false, fmt.Errorf("expect error to implement the APIStatus interface, got %v", reflect.TypeOf(err))
|
||||||
|
}
|
||||||
|
inconsistentToken = status.Status().ListMeta.Continue
|
||||||
|
if len(inconsistentToken) == 0 {
|
||||||
|
return false, fmt.Errorf("expect non empty continue token")
|
||||||
|
}
|
||||||
|
framework.Logf("Retrieved inconsistent continue %s", inconsistentToken)
|
||||||
|
return true, nil
|
||||||
|
})
|
||||||
|
|
||||||
|
By("retrieving the second page again with the token received with the error message")
|
||||||
|
opts.Continue = inconsistentToken
|
||||||
|
list, err = client.List(opts)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
Expect(list.ResourceVersion).ToNot(Equal(firstRV))
|
||||||
|
Expect(len(list.Items)).To(BeNumerically("==", opts.Limit))
|
||||||
|
found := oneTenth
|
||||||
|
for _, item := range list.Items {
|
||||||
|
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
|
||||||
|
found++
|
||||||
|
}
|
||||||
|
|
||||||
|
By("retrieving all remaining pages")
|
||||||
|
opts.Continue = list.Continue
|
||||||
|
lastRV := list.ResourceVersion
|
||||||
|
for {
|
||||||
|
list, err := client.List(opts)
|
||||||
|
Expect(err).ToNot(HaveOccurred())
|
||||||
|
framework.Logf("Retrieved %d/%d results with rv %s and continue %s", len(list.Items), opts.Limit, list.ResourceVersion, list.Continue)
|
||||||
|
Expect(len(list.Items)).To(BeNumerically("<=", opts.Limit))
|
||||||
|
Expect(list.ResourceVersion).To(Equal(lastRV))
|
||||||
|
for _, item := range list.Items {
|
||||||
|
Expect(item.Name).To(Equal(fmt.Sprintf("template-%04d", found)))
|
||||||
|
found++
|
||||||
|
}
|
||||||
|
if len(list.Continue) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
opts.Continue = list.Continue
|
||||||
|
}
|
||||||
|
Expect(found).To(BeNumerically("==", numberOfTotalResources))
|
||||||
|
})
|
||||||
})
|
})
|
||||||
|
|
Loading…
Reference in New Issue