Server side implementation of paging for etcd3

Add a feature gate in the apiserver to control whether paging can be
used. Add controls to the storage factory that allow it to be disabled
per resource. Use a JSON encoded continuation token that can be
versioned. Create a 410 error if the continuation token is expired.

Adds GetContinue() to ListMeta.
pull/6/head
Clayton Coleman 2017-08-10 22:31:51 -04:00
parent 500b130ff0
commit 8952a0cb72
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
29 changed files with 542 additions and 99 deletions

View File

@ -420,6 +420,7 @@ func TestGCListWatcher(t *testing.T) {
t.Fatal(err)
}
lw := listWatcher(client, podResource)
lw.DisablePaging = true
if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil {
t.Fatal(err)
}

View File

@ -178,6 +178,7 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
genericfeatures.AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
genericfeatures.APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
genericfeatures.Initializers: {Default: false, PreRelease: utilfeature.Alpha},
genericfeatures.APIListChunking: {Default: false, PreRelease: utilfeature.Alpha},
// inherited features from apiextensions-apiserver, relisted here to get a conflict if it is changed
// unintentionally on either side:

View File

@ -174,6 +174,17 @@ func NewGone(message string) *StatusError {
}}
}
// NewResourceExpired creates an error that indicates that the requested resource content has expired from
// the server (usually due to a resourceVersion that is too old).
func NewResourceExpired(message string) *StatusError {
return &StatusError{metav1.Status{
Status: metav1.StatusFailure,
Code: http.StatusGone,
Reason: metav1.StatusReasonExpired,
Message: message,
}}
}
// NewInvalid returns an error indicating the item is invalid and cannot be processed.
func NewInvalid(qualifiedKind schema.GroupKind, name string, errs field.ErrorList) *StatusError {
causes := make([]metav1.StatusCause, 0, len(errs))
@ -394,12 +405,28 @@ func IsInvalid(err error) bool {
return reasonForError(err) == metav1.StatusReasonInvalid
}
// IsGone is true if the error indicates the requested resource is no longer available.
func IsGone(err error) bool {
return reasonForError(err) == metav1.StatusReasonGone
}
// IsResourceExpired is true if the error indicates the resource has expired and the current action is
// no longer possible.
func IsResourceExpired(err error) bool {
return reasonForError(err) == metav1.StatusReasonExpired
}
// IsMethodNotSupported determines if the err is an error which indicates the provided action could not
// be performed because it is not supported by the server.
func IsMethodNotSupported(err error) bool {
return reasonForError(err) == metav1.StatusReasonMethodNotAllowed
}
// IsServiceUnavailable is true if the error indicates the underlying service is no longer available.
func IsServiceUnavailable(err error) bool {
return reasonForError(err) == metav1.StatusReasonServiceUnavailable
}
// IsBadRequest determines if err is an error which indicates that the request is invalid.
func IsBadRequest(err error) bool {
return reasonForError(err) == metav1.StatusReasonBadRequest

View File

@ -90,6 +90,8 @@ type ListInterface interface {
SetResourceVersion(version string)
GetSelfLink() string
SetSelfLink(selfLink string)
GetContinue() string
SetContinue(c string)
}
// Type exposes the type and APIVersion of versioned or internal API objects.
@ -105,6 +107,8 @@ func (meta *ListMeta) GetResourceVersion() string { return meta.ResourceV
func (meta *ListMeta) SetResourceVersion(version string) { meta.ResourceVersion = version }
func (meta *ListMeta) GetSelfLink() string { return meta.SelfLink }
func (meta *ListMeta) SetSelfLink(selfLink string) { meta.SelfLink = selfLink }
func (meta *ListMeta) GetContinue() string { return meta.Continue }
func (meta *ListMeta) SetContinue(c string) { meta.Continue = c }
func (obj *TypeMeta) GetObjectKind() schema.ObjectKind { return obj }

View File

@ -468,6 +468,14 @@ func (u *Unstructured) SetSelfLink(selfLink string) {
u.setNestedField(selfLink, "metadata", "selfLink")
}
func (u *Unstructured) GetContinue() string {
return getNestedString(u.Object, "metadata", "continue")
}
func (u *Unstructured) SetContinue(c string) {
u.setNestedField(c, "metadata", "continue")
}
func (u *Unstructured) GetCreationTimestamp() metav1.Time {
var timestamp metav1.Time
timestamp.UnmarshalQueryParameter(getNestedString(u.Object, "metadata", "creationTimestamp"))
@ -652,6 +660,14 @@ func (u *UnstructuredList) SetSelfLink(selfLink string) {
u.setNestedField(selfLink, "metadata", "selfLink")
}
func (u *UnstructuredList) GetContinue() string {
return getNestedString(u.Object, "metadata", "continue")
}
func (u *UnstructuredList) SetContinue(c string) {
u.setNestedField(c, "metadata", "continue")
}
func (u *UnstructuredList) SetGroupVersionKind(gvk schema.GroupVersionKind) {
u.SetAPIVersion(gvk.GroupVersion().String())
u.SetKind(gvk.Kind)

View File

@ -199,6 +199,7 @@ type InternalTypeMeta struct {
CreationTimestamp metav1.Time `json:"creationTimestamp,omitempty"`
SelfLink string `json:"selfLink,omitempty"`
ResourceVersion string `json:"resourceVersion,omitempty"`
Continue string `json:"next,omitempty"`
APIVersion string `json:"apiVersion,omitempty"`
Labels map[string]string `json:"labels,omitempty"`
Annotations map[string]string `json:"annotations,omitempty"`
@ -210,6 +211,8 @@ func (m *InternalTypeMeta) GetResourceVersion() string { return m.ResourceVers
func (m *InternalTypeMeta) SetResourceVersion(rv string) { m.ResourceVersion = rv }
func (m *InternalTypeMeta) GetSelfLink() string { return m.SelfLink }
func (m *InternalTypeMeta) SetSelfLink(link string) { m.SelfLink = link }
func (m *InternalTypeMeta) GetContinue() string { return m.Continue }
func (m *InternalTypeMeta) SetContinue(c string) { m.Continue = c }
type MyAPIObject struct {
TypeMeta InternalTypeMeta `json:",inline"`

View File

@ -54,6 +54,13 @@ const (
// Allow asynchronous coordination of object creation.
// Auto-enabled by the Initializers admission plugin.
Initializers utilfeature.Feature = "Initializers"
// owner: @smarterclayton
// alpha: v1.8
//
// Allow API clients to retrieve resource lists in chunks rather than
// all at once.
APIListChunking utilfeature.Feature = "APIListChunking"
)
func init() {
@ -68,4 +75,5 @@ var defaultKubernetesFeatureGates = map[utilfeature.Feature]utilfeature.FeatureS
AdvancedAuditing: {Default: false, PreRelease: utilfeature.Alpha},
APIResponseCompression: {Default: false, PreRelease: utilfeature.Alpha},
Initializers: {Default: false, PreRelease: utilfeature.Alpha},
APIListChunking: {Default: false, PreRelease: utilfeature.Alpha},
}

View File

@ -293,6 +293,8 @@ func (e *Store) ListPredicate(ctx genericapirequest.Context, p storage.Selection
options = &metainternalversion.ListOptions{ResourceVersion: ""}
}
p.IncludeUninitialized = options.IncludeUninitialized
p.Limit = options.Limit
p.Continue = options.Continue
list := e.NewListFunc()
qualifiedResource := e.qualifiedResourceFromContext(ctx)
if name, ok := p.MatchesSingle(); ok {

View File

@ -34,6 +34,8 @@ import (
)
type EtcdOptions struct {
// The value of Paging on StorageConfig will be overriden by the
// calculated feature gate value.
StorageConfig storagebackend.Config
EncryptionProviderConfigFilepath string

View File

@ -43,8 +43,10 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/serializer/recognizer:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/apiserver/pkg/features:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library",
"//vendor/k8s.io/apiserver/pkg/storage/value:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
],
)

View File

@ -27,8 +27,10 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/storage/storagebackend"
"k8s.io/apiserver/pkg/storage/value"
utilfeature "k8s.io/apiserver/pkg/util/feature"
)
// Backend describes the storage servers, the information here should be enough
@ -112,6 +114,8 @@ type groupResourceOverrides struct {
decoderDecoratorFn func([]runtime.Decoder) []runtime.Decoder
// transformer is optional and shall encrypt that resource at rest.
transformer value.Transformer
// disablePaging will prevent paging on the provided resource.
disablePaging bool
}
// Apply overrides the provided config and options if the override has a value in that position
@ -138,6 +142,9 @@ func (o groupResourceOverrides) Apply(config *storagebackend.Config, options *St
if o.transformer != nil {
config.Transformer = o.transformer
}
if o.disablePaging {
config.Paging = false
}
}
var _ StorageFactory = &DefaultStorageFactory{}
@ -157,6 +164,7 @@ var specialDefaultResourcePrefixes = map[schema.GroupResource]string{
}
func NewDefaultStorageFactory(config storagebackend.Config, defaultMediaType string, defaultSerializer runtime.StorageSerializer, resourceEncodingConfig ResourceEncodingConfig, resourceConfig APIResourceConfigSource) *DefaultStorageFactory {
config.Paging = utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if len(defaultMediaType) == 0 {
defaultMediaType = runtime.ContentTypeJSON
}
@ -185,6 +193,14 @@ func (s *DefaultStorageFactory) SetEtcdPrefix(groupResource schema.GroupResource
s.Overrides[groupResource] = overrides
}
// SetDisableAPIListChunking allows a specific resource to disable paging at the storage layer, to prevent
// exposure of key names in continuations. This may be overriden by feature gates.
func (s *DefaultStorageFactory) SetDisableAPIListChunking(groupResource schema.GroupResource) {
overrides := s.Overrides[groupResource]
overrides.disablePaging = true
s.Overrides[groupResource] = overrides
}
// SetResourceEtcdPrefix sets the prefix for a resource, but not the base-dir. You'll end up in `etcdPrefix/resourceEtcdPrefix`.
func (s *DefaultStorageFactory) SetResourceEtcdPrefix(groupResource schema.GroupResource, prefix string) {
overrides := s.Overrides[groupResource]

View File

@ -64,6 +64,8 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/validation/field:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/watch:go_default_library",
"//vendor/k8s.io/apiserver/pkg/features:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/trace:go_default_library",
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
],

View File

@ -37,6 +37,8 @@ import (
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/features"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utiltrace "k8s.io/apiserver/pkg/util/trace"
"k8s.io/client-go/tools/cache"
)
@ -406,9 +408,11 @@ func (c *Cacher) Get(ctx context.Context, key string, resourceVersion string, ob
// Implements storage.Interface.
func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
// storage (for backward compatibility). If a continuation or limit is
// requested, serve it from the underlying storage as well.
return c.storage.GetToList(ctx, key, resourceVersion, pred, listObj)
}
@ -459,7 +463,7 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
}
}
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
return err
}
}
@ -468,9 +472,11 @@ func (c *Cacher) GetToList(ctx context.Context, key string, resourceVersion stri
// Implements storage.Interface.
func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, pred SelectionPredicate, listObj runtime.Object) error {
if resourceVersion == "" {
pagingEnabled := utilfeature.DefaultFeatureGate.Enabled(features.APIListChunking)
if resourceVersion == "" || (pagingEnabled && (len(pred.Continue) > 0 || pred.Limit > 0)) {
// If resourceVersion is not specified, serve it from underlying
// storage (for backward compatibility).
// storage (for backward compatibility). If a continuation or limit is
// requested, serve it from the underlying storage as well.
return c.storage.List(ctx, key, resourceVersion, pred, listObj)
}
@ -527,7 +533,7 @@ func (c *Cacher) List(ctx context.Context, key string, resourceVersion string, p
}
trace.Step(fmt.Sprintf("Filtered %d items", listVal.Len()))
if c.versioner != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion); err != nil {
if err := c.versioner.UpdateList(listObj, readResourceVersion, ""); err != nil {
return err
}
}

View File

@ -43,7 +43,7 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, resourceVersion uin
}
// UpdateList implements Versioner
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error {
func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64, nextKey string) error {
listAccessor, err := meta.ListAccessor(obj)
if err != nil || listAccessor == nil {
return err
@ -53,6 +53,7 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6
versionString = strconv.FormatUint(resourceVersion, 10)
}
listAccessor.SetResourceVersion(versionString)
listAccessor.SetContinue(nextKey)
return nil
}

View File

@ -365,7 +365,7 @@ func (h *etcdHelper) GetToList(ctx context.Context, key string, resourceVersion
return err
}
trace.Step("Object decoded")
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
if err := h.versioner.UpdateList(listObj, response.Index, ""); err != nil {
return err
}
return nil
@ -445,7 +445,7 @@ func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion strin
return err
}
trace.Step("Node list decoded")
if err := h.versioner.UpdateList(listObj, index); err != nil {
if err := h.versioner.UpdateList(listObj, index, ""); err != nil {
return err
}
return nil

View File

@ -322,6 +322,7 @@ func NewUnsecuredEtcd3TestClientServer(t *testing.T, scheme *runtime.Scheme) (*E
ServerList: server.V3Client.Endpoints(),
DeserializationCacheSize: etcdtest.DeserializationCacheSize,
Copier: scheme,
Paging: true,
}
return server, config
}

View File

@ -41,6 +41,7 @@ go_library(
name = "go_default_library",
srcs = [
"compact.go",
"errors.go",
"event.go",
"store.go",
"watcher.go",
@ -51,8 +52,8 @@ go_library(
"//vendor/github.com/coreos/etcd/mvcc/mvccpb:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",

View File

@ -0,0 +1,42 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package etcd3
import (
"k8s.io/apimachinery/pkg/api/errors"
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
)
func interpretWatchError(err error) error {
switch {
case err == etcdrpc.ErrCompacted:
return errors.NewResourceExpired("The resourceVersion for the provided watch is too old.")
}
return err
}
func interpretListError(err error, paging bool) error {
switch {
case err == etcdrpc.ErrCompacted:
if paging {
return errors.NewResourceExpired("The provided from parameter is too old to display a consistent list result. You must start a new list without the from.")
}
return errors.NewResourceExpired("The resourceVersion for the provided list is too old.")
}
return err
}

View File

@ -18,10 +18,13 @@ package etcd3
import (
"bytes"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"path"
"reflect"
"strconv"
"strings"
"time"
@ -60,12 +63,13 @@ type store struct {
client *clientv3.Client
// getOpts contains additional options that should be passed
// to all Get() calls.
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
getOps []clientv3.OpOption
codec runtime.Codec
versioner storage.Versioner
transformer value.Transformer
pathPrefix string
watcher *watcher
pagingEnabled bool
}
type elemForDecode struct {
@ -82,23 +86,24 @@ type objState struct {
}
// New returns an etcd3 implementation of storage.Interface.
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
return newStore(c, true, codec, prefix, transformer)
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, true, pagingEnabled, codec, prefix, transformer)
}
// NewWithNoQuorumRead returns etcd3 implementation of storage.Interface
// where Get operations don't require quorum read.
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer) storage.Interface {
return newStore(c, false, codec, prefix, transformer)
func NewWithNoQuorumRead(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
return newStore(c, false, pagingEnabled, codec, prefix, transformer)
}
func newStore(c *clientv3.Client, quorumRead bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
versioner := etcd.APIObjectVersioner{}
result := &store{
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
client: c,
codec: codec,
versioner: versioner,
transformer: transformer,
pagingEnabled: pagingEnabled,
// for compatibility with etcd2 impl.
// no-op for default prefix of '/registry'.
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
@ -386,7 +391,66 @@ func (s *store) GetToList(ctx context.Context, key string, resourceVersion strin
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
}
// continueToken is a simple structured object for encoding the state of a continue token.
// TODO: if we change the version of the encoded from, we can't start encoding the new version
// until all other servers are upgraded (i.e. we need to support rolling schema)
// This is a public API struct and cannot change.
type continueToken struct {
APIVersion string `json:"v"`
ResourceVersion int64 `json:"rv"`
StartKey string `json:"start"`
}
// parseFrom transforms an encoded predicate from into a versioned struct.
// TODO: return a typed error that instructs clients that they must relist
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
data, err := base64.RawURLEncoding.DecodeString(continueValue)
if err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
var c continueToken
if err := json.Unmarshal(data, &c); err != nil {
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
}
switch c.APIVersion {
case "v1alpha1":
if c.ResourceVersion == 0 {
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version v1alpha1)")
}
if len(c.StartKey) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version v1alpha1)")
}
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
// continue start key that is fully qualified and cannot range over anything less specific than
// keyPrefix.
cleaned := path.Clean(c.StartKey)
if cleaned != c.StartKey || cleaned == "." || cleaned == "/" {
return "", 0, fmt.Errorf("continue key is not valid: %s", cleaned)
}
if len(cleaned) == 0 {
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version 0)")
}
return keyPrefix + cleaned, c.ResourceVersion, nil
default:
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
}
}
// encodeContinue returns a string representing the encoded continuation of the current query.
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
nextKey := strings.TrimPrefix(key, keyPrefix)
if nextKey == key {
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
}
out, err := json.Marshal(&continueToken{APIVersion: "v1alpha1", ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
return "", err
}
return base64.RawURLEncoding.EncodeToString(out), nil
}
// List implements storage.Interface.List.
@ -402,16 +466,50 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
if !strings.HasSuffix(key, "/") {
key += "/"
}
getResp, err := s.client.KV.Get(ctx, key, clientv3.WithPrefix())
keyPrefix := key
// set the appropriate clientv3 options to filter the returned data set
options := make([]clientv3.OpOption, 0, 4)
if s.pagingEnabled && pred.Limit > 0 {
options = append(options, clientv3.WithLimit(pred.Limit))
}
var returnedRV int64
switch {
case s.pagingEnabled && len(pred.Continue) > 0:
continueKey, continueRV, err := decodeContinue(pred.Continue, keyPrefix)
if err != nil {
return err
}
options = append(options, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)))
key = continueKey
options = append(options, clientv3.WithRev(continueRV))
returnedRV = continueRV
case len(resourceVersion) > 0:
fromRV, err := strconv.ParseInt(resourceVersion, 10, 64)
if err != nil {
return fmt.Errorf("invalid resource version: %v", err)
}
options = append(options, clientv3.WithPrefix(), clientv3.WithRev(fromRV))
returnedRV = fromRV
default:
options = append(options, clientv3.WithPrefix())
}
getResp, err := s.client.KV.Get(ctx, key, options...)
if err != nil {
return err
return interpretListError(err, len(pred.Continue) > 0)
}
elems := make([]*elemForDecode, 0, len(getResp.Kvs))
for _, kv := range getResp.Kvs {
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
if err != nil {
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", key, err))
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
continue
}
@ -420,11 +518,31 @@ func (s *store) List(ctx context.Context, key, resourceVersion string, pred stor
rev: uint64(kv.ModRevision),
})
}
if err := decodeList(elems, storage.SimpleFilter(pred), listPtr, s.codec, s.versioner); err != nil {
return err
}
// update version with cluster level revision
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision))
// indicate to the client which resource version was returned
if returnedRV == 0 {
returnedRV = getResp.Header.Revision
}
switch {
case !getResp.More:
// no continuation
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
case len(getResp.Kvs) == 0:
return fmt.Errorf("no results were found, but etcd indicated there were more values")
default:
// we want to start immediately after the last key
// TODO: this reveals info about certain keys
key := string(getResp.Kvs[len(getResp.Kvs)-1].Key)
next, err := encodeContinue(key+"\x00", keyPrefix, returnedRV)
if err != nil {
return err
}
return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
}
}
// Watch implements storage.Interface.Watch.
@ -548,8 +666,8 @@ func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objP
// decodeList decodes a list of values into a list of objects, with resource version set to corresponding rev.
// On success, ListPtr would be set to the list of objects.
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, ListPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(ListPtr)
func decodeList(elems []*elemForDecode, filter storage.FilterFunc, listPtr interface{}, codec runtime.Codec, versioner storage.Versioner) error {
v, err := conversion.EnforcePtr(listPtr)
if err != nil || v.Kind() != reflect.Slice {
panic("need ptr to slice")
}

View File

@ -18,12 +18,17 @@ package etcd3
import (
"bytes"
"encoding/base64"
"encoding/json"
"fmt"
"reflect"
"strconv"
"sync"
"testing"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
apitesting "k8s.io/apimachinery/pkg/api/testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
@ -37,10 +42,6 @@ import (
"k8s.io/apiserver/pkg/storage"
storagetests "k8s.io/apiserver/pkg/storage/tests"
"k8s.io/apiserver/pkg/storage/value"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/integration"
"golang.org/x/net/context"
)
var scheme = runtime.NewScheme()
@ -587,7 +588,7 @@ func TestTransformationFailure(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
preset := []struct {
@ -667,7 +668,8 @@ func TestList(t *testing.T) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
disablePagingStore := newStore(cluster.RandClient(), false, false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
// Setup storage with the following structure:
@ -704,40 +706,106 @@ func TestList(t *testing.T) {
}
}
list := &example.PodList{}
store.List(ctx, "/two-level", "0", storage.Everything, list)
continueRV, _ := strconv.Atoi(list.ResourceVersion)
secondContinuation, err := encodeContinue("/two-level/2", "/two-level/", int64(continueRV))
if err != nil {
t.Fatal(err)
}
tests := []struct {
prefix string
pred storage.SelectionPredicate
expectedOut []*example.Pod
}{{ // test List on existing key
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
}, { // test List on non-existing key
prefix: "/non-existing/",
pred: storage.Everything,
expectedOut: nil,
}, { // test List with pod name matching
prefix: "/one-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
disablePaging bool
prefix string
pred storage.SelectionPredicate
expectedOut []*example.Pod
expectContinue bool
}{
{ // test List on existing key
prefix: "/one-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[0].storedObj},
},
expectedOut: nil,
}, { // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
}}
{ // test List on non-existing key
prefix: "/non-existing/",
pred: storage.Everything,
expectedOut: nil,
},
{ // test List with pod name matching
prefix: "/one-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.ParseSelectorOrDie("metadata.name!=" + preset[0].storedObj.Name),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: nil,
},
{ // test List with limit
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[1].storedObj},
expectContinue: true,
},
{ // test List with limit when paging disabled
disablePaging: true,
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
expectContinue: false,
},
{ // test List with pregenerated continue token
prefix: "/two-level/",
pred: storage.SelectionPredicate{
Label: labels.Everything(),
Field: fields.Everything(),
Limit: 1,
Continue: secondContinuation,
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
},
expectedOut: []*example.Pod{preset[2].storedObj},
},
{ // test List with multiple levels of directories and expect flattened result
prefix: "/two-level/",
pred: storage.Everything,
expectedOut: []*example.Pod{preset[1].storedObj, preset[2].storedObj},
},
}
for i, tt := range tests {
out := &example.PodList{}
err := store.List(ctx, tt.prefix, "0", tt.pred, out)
var err error
if tt.disablePaging {
err = disablePagingStore.List(ctx, tt.prefix, "0", tt.pred, out)
} else {
err = store.List(ctx, tt.prefix, "0", tt.pred, out)
}
if err != nil {
t.Fatalf("List failed: %v", err)
t.Fatalf("#%d: List failed: %v", i, err)
}
if (len(out.Continue) > 0) != tt.expectContinue {
t.Errorf("#%d: unexpected continue token: %v", i, out.Continue)
}
if len(tt.expectedOut) != len(out.Items) {
t.Errorf("#%d: length of list want=%d, get=%d", i, len(tt.expectedOut), len(out.Items))
@ -750,12 +818,75 @@ func TestList(t *testing.T) {
}
}
}
// test continuations
out := &example.PodList{}
pred := func(limit int64, continueValue string) storage.SelectionPredicate {
return storage.SelectionPredicate{
Limit: limit,
Continue: continueValue,
Label: labels.Everything(),
Field: fields.Everything(),
GetAttrs: func(obj runtime.Object) (labels.Set, fields.Set, bool, error) {
pod := obj.(*example.Pod)
return nil, fields.Set{"metadata.name": pod.Name}, pod.Initializers != nil, nil
},
}
}
if err := store.List(ctx, "/", "0", pred(1, ""), out); err != nil {
t.Fatalf("Unable to get initial list: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[0].storedObj) {
t.Fatalf("Unexpected first page: %#v", out.Items)
}
continueFromSecondItem := out.Continue
// no limit, should get two items
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(0, continueFromSecondItem), out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
if !reflect.DeepEqual(out.Items, []example.Pod{*preset[1].storedObj, *preset[2].storedObj}) {
key, rv, err := decodeContinue(continueFromSecondItem, "/")
t.Logf("continue token was %d %s %v", rv, key, err)
t.Fatalf("Unexpected second page: %#v", out.Items)
}
// limit, should get two more pages
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromSecondItem), out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) == 0 {
t.Fatalf("No continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[1].storedObj) {
t.Fatalf("Unexpected second page: %#v", out.Items)
}
continueFromThirdItem := out.Continue
out = &example.PodList{}
if err := store.List(ctx, "/", "0", pred(1, continueFromThirdItem), out); err != nil {
t.Fatalf("Unable to get second page: %v", err)
}
if len(out.Continue) != 0 {
t.Fatalf("Unexpected continuation token set")
}
if len(out.Items) != 1 || !reflect.DeepEqual(&out.Items[0], preset[2].storedObj) {
t.Fatalf("Unexpected third page: %#v", out.Items)
}
}
func testSetup(t *testing.T) (context.Context, *store, *integration.ClusterV3) {
codec := apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
store := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
store := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte(defaultTestPrefix)})
ctx := context.Background()
return ctx, store, cluster
}
@ -787,9 +918,57 @@ func TestPrefix(t *testing.T) {
"/registry": "/registry",
}
for configuredPrefix, effectivePrefix := range testcases {
store := newStore(cluster.RandClient(), false, codec, configuredPrefix, transformer)
store := newStore(cluster.RandClient(), false, true, codec, configuredPrefix, transformer)
if store.pathPrefix != effectivePrefix {
t.Errorf("configured prefix of %s, expected effective prefix of %s, got %s", configuredPrefix, effectivePrefix, store.pathPrefix)
}
}
}
func encodeContinueOrDie(apiVersion string, resourceVersion int64, nextKey string) string {
out, err := json.Marshal(&continueToken{APIVersion: apiVersion, ResourceVersion: resourceVersion, StartKey: nextKey})
if err != nil {
panic(err)
}
return base64.RawURLEncoding.EncodeToString(out)
}
func Test_decodeContinue(t *testing.T) {
type args struct {
continueValue string
keyPrefix string
}
tests := []struct {
name string
args args
wantFromKey string
wantRv int64
wantErr bool
}{
{name: "valid", args: args{continueValue: encodeContinueOrDie("v1alpha1", 1, "key"), keyPrefix: "/test/"}, wantRv: 1, wantFromKey: "/test/key"},
{name: "empty version", args: args{continueValue: encodeContinueOrDie("", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "invalid version", args: args{continueValue: encodeContinueOrDie("v1", 1, "key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - local", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - double parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "./../key"), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - after parent", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "key/../.."), keyPrefix: "/test/"}, wantErr: true},
{name: "path traversal - separator", args: args{continueValue: encodeContinueOrDie("v1alpha", 1, "/"), keyPrefix: "/test/"}, wantErr: true},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
gotFromKey, gotRv, err := decodeContinue(tt.args.continueValue, tt.args.keyPrefix)
if (err != nil) != tt.wantErr {
t.Errorf("decodeContinue() error = %v, wantErr %v", err, tt.wantErr)
return
}
if gotFromKey != tt.wantFromKey {
t.Errorf("decodeContinue() gotFromKey = %v, want %v", gotFromKey, tt.wantFromKey)
}
if gotRv != tt.wantRv {
t.Errorf("decodeContinue() gotRv = %v, want %v", gotRv, tt.wantRv)
}
})
}
}

View File

@ -19,20 +19,18 @@ package etcd3
import (
"errors"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"sync"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/apiserver/pkg/storage"
"k8s.io/apiserver/pkg/storage/value"
"github.com/coreos/etcd/clientv3"
etcdrpc "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
"github.com/golang/glog"
"golang.org/x/net/context"
)
@ -139,7 +137,7 @@ func (wc *watchChan) run() {
if err == context.Canceled {
break
}
errResult := parseError(err)
errResult := transformErrorToEvent(err)
if errResult != nil {
// error result is guaranteed to be received by user before closing ResultChan.
select {
@ -319,28 +317,15 @@ func (wc *watchChan) transform(e *event) (res *watch.Event) {
return res
}
func parseError(err error) *watch.Event {
var status *metav1.Status
switch {
case err == etcdrpc.ErrCompacted:
status = &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Code: http.StatusGone,
Reason: metav1.StatusReasonExpired,
}
default:
status = &metav1.Status{
Status: metav1.StatusFailure,
Message: err.Error(),
Code: http.StatusInternalServerError,
Reason: metav1.StatusReasonInternalError,
}
func transformErrorToEvent(err error) *watch.Event {
err = interpretWatchError(err)
if _, ok := err.(apierrs.APIStatus); !ok {
err = apierrs.NewInternalError(err)
}
status := err.(apierrs.APIStatus).Status()
return &watch.Event{
Type: watch.Error,
Object: status,
Object: &status,
}
}

View File

@ -226,13 +226,13 @@ func TestWatchError(t *testing.T) {
codec := &testCodec{apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion)}
cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1})
defer cluster.Terminate(t)
invalidStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
invalidStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")})
ctx := context.Background()
w, err := invalidStore.Watch(ctx, "/abc", "0", storage.Everything)
if err != nil {
t.Fatalf("Watch failed: %v", err)
}
validStore := newStore(cluster.RandClient(), false, codec, "", prefixTransformer{prefix: []byte("test!")})
validStore := newStore(cluster.RandClient(), false, true, codec, "", prefixTransformer{prefix: []byte("test!")})
validStore.GuaranteedUpdate(ctx, "/abc", &example.Pod{}, true, nil, storage.SimpleUpdate(
func(runtime.Object) (runtime.Object, error) {
return &example.Pod{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}, nil

View File

@ -36,8 +36,9 @@ type Versioner interface {
UpdateObject(obj runtime.Object, resourceVersion uint64) error
// UpdateList sets the resource version into an API list object. Returns an error if the object
// cannot be updated correctly. May return nil if the requested object does not need metadata
// from database.
UpdateList(obj runtime.Object, resourceVersion uint64) error
// from database. continueValue is optional and indicates that more results are available if
// the client passes that value to the server in a subsequent call.
UpdateList(obj runtime.Object, resourceVersion uint64, continueValue string) error
// PrepareObjectForStorage should set SelfLink and ResourceVersion to the empty value. Should
// return an error if the specified object cannot be updated.
PrepareObjectForStorage(obj runtime.Object) error

View File

@ -76,6 +76,8 @@ type SelectionPredicate struct {
IncludeUninitialized bool
GetAttrs AttrFunc
IndexFields []string
Limit int64
Continue string
}
// Matches returns true if the given object's labels and fields (as
@ -118,6 +120,9 @@ func (s *SelectionPredicate) MatchesObjectAttributes(l labels.Set, f fields.Set,
// MatchesSingle will return (name, true) if and only if s.Field matches on the object's
// name.
func (s *SelectionPredicate) MatchesSingle() (string, bool) {
if len(s.Continue) > 0 {
return "", false
}
// TODO: should be namespace.name
if name, ok := s.Field.RequiresExactMatch("metadata.name"); ok {
return name, true

View File

@ -41,6 +41,11 @@ type Config struct {
CAFile string
// Quorum indicates that whether read operations should be quorum-level consistent.
Quorum bool
// Paging indicates whether the server implementation should allow paging (if it is
// supported). This is generally configured by feature gating, or by a specific
// resource type not wishing to allow paging, and is not intended for end users to
// set.
Paging bool
// DeserializationCacheSize is the size of cache of deserialized objects.
// Currently this is only supported in etcd2.
// We will drop the cache once using protobuf.

View File

@ -61,7 +61,7 @@ func newETCD3Storage(c storagebackend.Config) (storage.Interface, DestroyFunc, e
transformer = value.IdentityTransformer
}
if c.Quorum {
return etcd3.New(client, c.Codec, c.Prefix, transformer), destroyFunc, nil
return etcd3.New(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer), destroyFunc, nil
return etcd3.NewWithNoQuorumRead(client, c.Codec, c.Prefix, transformer, c.Paging), destroyFunc, nil
}

View File

@ -93,7 +93,7 @@ func AddObjectMetaFieldsSet(source fields.Set, objectMeta *metav1.ObjectMeta, ha
func newEtcdTestStorage(t *testing.T, prefix string) (*etcdtesting.EtcdTestServer, storage.Interface) {
server, _ := etcdtesting.NewUnsecuredEtcd3TestClientServer(t, scheme)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer)
storage := etcd3.New(server.V3Client, apitesting.TestCodec(codecs, examplev1.SchemeGroupVersion), prefix, value.IdentityTransformer, true)
return server, storage
}

View File

@ -30,14 +30,20 @@ import (
const defaultPageSize = 500
// ListPageFunc returns a list object for the given list options.
type ListPageFunc func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error)
// SimplePageFunc adapts a context-less list function into one that accepts a context.
func SimplePageFunc(fn func(opts metav1.ListOptions) (runtime.Object, error)) ListPageFunc {
return func(ctx context.Context, opts metav1.ListOptions) (runtime.Object, error) {
return fn(opts)
}
}
// ListPager assists client code in breaking large list queries into multiple
// smaller chunks of PageSize or smaller. PageFn is expected to accept a
// metav1.ListOptions that supports paging and return a list. The pager does
// not alter the field or label selectors on the initial options list.
type ListPager struct {
PageSize int64
PageFn ListPageFunc
@ -45,6 +51,8 @@ type ListPager struct {
FullListIfExpired bool
}
// New creates a new pager from the provided pager function using the default
// options.
func New(fn ListPageFunc) *ListPager {
return &ListPager{
PageSize: defaultPageSize,
@ -53,6 +61,9 @@ func New(fn ListPageFunc) *ListPager {
}
}
// List returns a single list object, but attempts to retrieve smaller chunks from the
// server to reduce the impact on the server. If the chunk attempt fails, it will load
// the full list instead.
func (p *ListPager) List(ctx context.Context, options metav1.ListOptions) (runtime.Object, error) {
if options.Limit == 0 {
options.Limit = p.PageSize

View File

@ -238,6 +238,10 @@
"ImportPath": "github.com/spf13/pflag",
"Rev": "9ff6c6923cfffbcd502984b8e0c80539a94968b7"
},
{
"ImportPath": "golang.org/x/net/context",
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"
},
{
"ImportPath": "golang.org/x/net/http2",
"Rev": "1c05540f6879653db88113bc4a2b70aec4bd491f"