mirror of https://github.com/k3s-io/k3s
795 lines
25 KiB
Go
795 lines
25 KiB
Go
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package etcd3
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/base64"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"k8s.io/klog"
|
|
"path"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/ibuildthecloud/kvsql/clientv3"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
"k8s.io/apimachinery/pkg/conversion"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
|
"k8s.io/apimachinery/pkg/watch"
|
|
"k8s.io/apiserver/pkg/storage"
|
|
"k8s.io/apiserver/pkg/storage/etcd"
|
|
"k8s.io/apiserver/pkg/storage/value"
|
|
utiltrace "k8s.io/apiserver/pkg/util/trace"
|
|
)
|
|
|
|
// authenticatedDataString satisfies the value.Context interface. It uses the key to
|
|
// authenticate the stored data. This does not defend against reuse of previously
|
|
// encrypted values under the same key, but will prevent an attacker from using an
|
|
// encrypted value from a different key. A stronger authenticated data segment would
|
|
// include the etcd3 Version field (which is incremented on each write to a key and
|
|
// reset when the key is deleted), but an attacker with write access to etcd can
|
|
// force deletion and recreation of keys to weaken that angle.
|
|
type authenticatedDataString string
|
|
|
|
// AuthenticatedData implements the value.Context interface.
|
|
func (d authenticatedDataString) AuthenticatedData() []byte {
|
|
return []byte(string(d))
|
|
}
|
|
|
|
var _ value.Context = authenticatedDataString("")
|
|
|
|
type store struct {
|
|
client *clientv3.Client
|
|
// getOpts contains additional options that should be passed
|
|
// to all Get() calls.
|
|
getOps []clientv3.OpOption
|
|
codec runtime.Codec
|
|
versioner storage.Versioner
|
|
transformer value.Transformer
|
|
pathPrefix string
|
|
watcher *watcher
|
|
pagingEnabled bool
|
|
leaseManager *leaseManager
|
|
}
|
|
|
|
type objState struct {
|
|
obj runtime.Object
|
|
meta *storage.ResponseMeta
|
|
rev int64
|
|
data []byte
|
|
stale bool
|
|
}
|
|
|
|
// New returns an etcd3 implementation of storage.Interface.
|
|
func New(c *clientv3.Client, codec runtime.Codec, prefix string, transformer value.Transformer, pagingEnabled bool) storage.Interface {
|
|
return newStore(c, true, pagingEnabled, codec, prefix, transformer)
|
|
}
|
|
|
|
func newStore(c *clientv3.Client, quorumRead, pagingEnabled bool, codec runtime.Codec, prefix string, transformer value.Transformer) *store {
|
|
versioner := etcd.APIObjectVersioner{}
|
|
result := &store{
|
|
client: c,
|
|
codec: codec,
|
|
versioner: versioner,
|
|
transformer: transformer,
|
|
pagingEnabled: pagingEnabled,
|
|
// for compatibility with etcd2 impl.
|
|
// no-op for default prefix of '/registry'.
|
|
// keeps compatibility with etcd2 impl for custom prefixes that don't start with '/'
|
|
pathPrefix: path.Join("/", prefix),
|
|
watcher: newWatcher(c, codec, versioner, transformer),
|
|
leaseManager: newDefaultLeaseManager(c),
|
|
}
|
|
if !quorumRead {
|
|
// In case of non-quorum reads, we can set WithSerializable()
|
|
// options for all Get operations.
|
|
result.getOps = append(result.getOps, clientv3.WithSerializable())
|
|
}
|
|
return result
|
|
}
|
|
|
|
// Versioner implements storage.Interface.Versioner.
|
|
func (s *store) Versioner() storage.Versioner {
|
|
return s.versioner
|
|
}
|
|
|
|
// Get implements storage.Interface.Get.
|
|
func (s *store) Get(ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {
|
|
key = path.Join(s.pathPrefix, key)
|
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(getResp.Kvs) == 0 {
|
|
if ignoreNotFound {
|
|
return runtime.SetZeroValue(out)
|
|
}
|
|
return storage.NewKeyNotFoundError(key, 0)
|
|
}
|
|
kv := getResp.Kvs[0]
|
|
|
|
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
|
if err != nil {
|
|
return storage.NewInternalError(err.Error())
|
|
}
|
|
|
|
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
|
}
|
|
|
|
// Create implements storage.Interface.Create.
|
|
func (s *store) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
|
if version, err := s.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
|
return errors.New("resourceVersion should not be set on objects to be created")
|
|
}
|
|
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
|
return fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
|
}
|
|
data, err := runtime.Encode(s.codec, obj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
key = path.Join(s.pathPrefix, key)
|
|
|
|
opts, err := s.ttlOpts(ctx, int64(ttl))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
newData, err := s.transformer.TransformToStorage(data, authenticatedDataString(key))
|
|
if err != nil {
|
|
return storage.NewInternalError(err.Error())
|
|
}
|
|
|
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
|
notFound(key),
|
|
).Then(
|
|
clientv3.OpPut(key, string(newData), opts...),
|
|
).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !txnResp.Succeeded {
|
|
return storage.NewKeyExistsError(key, 0)
|
|
}
|
|
|
|
if out != nil {
|
|
putResp := txnResp.Responses[0].GetResponsePut()
|
|
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Delete implements storage.Interface.Delete.
|
|
func (s *store) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions) error {
|
|
v, err := conversion.EnforcePtr(out)
|
|
if err != nil {
|
|
panic("unable to convert output object to pointer")
|
|
}
|
|
key = path.Join(s.pathPrefix, key)
|
|
if preconditions == nil {
|
|
return s.unconditionalDelete(ctx, key, out)
|
|
}
|
|
return s.conditionalDelete(ctx, key, out, v, preconditions)
|
|
}
|
|
|
|
func (s *store) unconditionalDelete(ctx context.Context, key string, out runtime.Object) error {
|
|
// We need to do get and delete in single transaction in order to
|
|
// know the value and revision before deleting it.
|
|
txnResp, err := s.client.KV.Txn(ctx).If().Then(
|
|
clientv3.OpGet(key),
|
|
clientv3.OpDelete(key),
|
|
).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
getResp := txnResp.Responses[0].GetResponseRange()
|
|
if len(getResp.Kvs) == 0 {
|
|
return storage.NewKeyNotFoundError(key, 0)
|
|
}
|
|
|
|
kv := getResp.Kvs[0]
|
|
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(key))
|
|
if err != nil {
|
|
return storage.NewInternalError(err.Error())
|
|
}
|
|
return decode(s.codec, s.versioner, data, out, kv.ModRevision)
|
|
}
|
|
|
|
func (s *store) conditionalDelete(ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions *storage.Preconditions) error {
|
|
getResp, err := s.client.KV.Get(ctx, key)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for {
|
|
origState, err := s.getState(getResp, key, v, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err := preconditions.Check(key, origState.obj); err != nil {
|
|
return err
|
|
}
|
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
|
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
|
).Then(
|
|
clientv3.OpDelete(key),
|
|
).Else(
|
|
clientv3.OpGet(key),
|
|
).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !txnResp.Succeeded {
|
|
getResp = (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
|
klog.V(4).Infof("deletion of %s failed because of a conflict, going to retry", key)
|
|
continue
|
|
}
|
|
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
|
}
|
|
}
|
|
|
|
// GuaranteedUpdate implements storage.Interface.GuaranteedUpdate.
|
|
func (s *store) GuaranteedUpdate(
|
|
ctx context.Context, key string, out runtime.Object, ignoreNotFound bool,
|
|
preconditions *storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion ...runtime.Object) error {
|
|
trace := utiltrace.New(fmt.Sprintf("GuaranteedUpdate etcd3: %s", reflect.TypeOf(out).String()))
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
v, err := conversion.EnforcePtr(out)
|
|
if err != nil {
|
|
panic("unable to convert output object to pointer")
|
|
}
|
|
key = path.Join(s.pathPrefix, key)
|
|
|
|
getCurrentState := func() (*objState, error) {
|
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return s.getState(getResp, key, v, ignoreNotFound)
|
|
}
|
|
|
|
var origState *objState
|
|
var mustCheckData bool
|
|
if len(suggestion) == 1 && suggestion[0] != nil {
|
|
origState, err = s.getStateFromObject(suggestion[0])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mustCheckData = true
|
|
} else {
|
|
origState, err = getCurrentState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
trace.Step("initial value restored")
|
|
|
|
transformContext := authenticatedDataString(key)
|
|
for {
|
|
if err := preconditions.Check(key, origState.obj); err != nil {
|
|
return err
|
|
}
|
|
|
|
ret, ttl, err := s.updateState(origState, tryUpdate)
|
|
if err != nil {
|
|
// It's possible we were working with stale data
|
|
if mustCheckData && apierrors.IsConflict(err) {
|
|
// Actually fetch
|
|
origState, err = getCurrentState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mustCheckData = false
|
|
// Retry
|
|
continue
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
data, err := runtime.Encode(s.codec, ret)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !origState.stale && bytes.Equal(data, origState.data) {
|
|
// if we skipped the original Get in this loop, we must refresh from
|
|
// etcd in order to be sure the data in the store is equivalent to
|
|
// our desired serialization
|
|
if mustCheckData {
|
|
origState, err = getCurrentState()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
mustCheckData = false
|
|
if !bytes.Equal(data, origState.data) {
|
|
// original data changed, restart loop
|
|
continue
|
|
}
|
|
}
|
|
// recheck that the data from etcd is not stale before short-circuiting a write
|
|
if !origState.stale {
|
|
return decode(s.codec, s.versioner, origState.data, out, origState.rev)
|
|
}
|
|
}
|
|
|
|
newData, err := s.transformer.TransformToStorage(data, transformContext)
|
|
if err != nil {
|
|
return storage.NewInternalError(err.Error())
|
|
}
|
|
|
|
opts, err := s.ttlOpts(ctx, int64(ttl))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace.Step("Transaction prepared")
|
|
|
|
txnResp, err := s.client.KV.Txn(ctx).If(
|
|
clientv3.Compare(clientv3.ModRevision(key), "=", origState.rev),
|
|
).Then(
|
|
clientv3.OpPut(key, string(newData), opts...),
|
|
).Else(
|
|
clientv3.OpGet(key),
|
|
).Commit()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace.Step("Transaction committed")
|
|
if !txnResp.Succeeded {
|
|
getResp := (*clientv3.GetResponse)(txnResp.Responses[0].GetResponseRange())
|
|
klog.V(4).Infof("GuaranteedUpdate of %s failed because of a conflict, going to retry", key)
|
|
origState, err = s.getState(getResp, key, v, ignoreNotFound)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
trace.Step("Retry value restored")
|
|
mustCheckData = false
|
|
continue
|
|
}
|
|
putResp := txnResp.Responses[0].GetResponsePut()
|
|
|
|
return decode(s.codec, s.versioner, data, out, putResp.Header.Revision)
|
|
}
|
|
}
|
|
|
|
// GetToList implements storage.Interface.GetToList.
|
|
func (s *store) GetToList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v, err := conversion.EnforcePtr(listPtr)
|
|
if err != nil || v.Kind() != reflect.Slice {
|
|
panic("need ptr to slice")
|
|
}
|
|
|
|
key = path.Join(s.pathPrefix, key)
|
|
getResp, err := s.client.KV.Get(ctx, key, s.getOps...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if len(getResp.Kvs) > 0 {
|
|
data, _, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
|
if err != nil {
|
|
return storage.NewInternalError(err.Error())
|
|
}
|
|
if err := appendListItem(v, data, uint64(getResp.Kvs[0].ModRevision), pred, s.codec, s.versioner); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
// update version with cluster level revision
|
|
return s.versioner.UpdateList(listObj, uint64(getResp.Header.Revision), "")
|
|
}
|
|
|
|
func (s *store) Count(key string) (int64, error) {
|
|
key = path.Join(s.pathPrefix, key)
|
|
getResp, err := s.client.KV.Get(context.Background(), key, clientv3.WithRange(clientv3.GetPrefixRangeEnd(key)), clientv3.WithCountOnly())
|
|
if err != nil {
|
|
return 0, err
|
|
}
|
|
return getResp.Count, nil
|
|
}
|
|
|
|
// continueToken is a simple structured object for encoding the state of a continue token.
|
|
// TODO: if we change the version of the encoded from, we can't start encoding the new version
|
|
// until all other servers are upgraded (i.e. we need to support rolling schema)
|
|
// This is a public API struct and cannot change.
|
|
type continueToken struct {
|
|
APIVersion string `json:"v"`
|
|
ResourceVersion int64 `json:"rv"`
|
|
StartKey string `json:"start"`
|
|
}
|
|
|
|
// parseFrom transforms an encoded predicate from into a versioned struct.
|
|
// TODO: return a typed error that instructs clients that they must relist
|
|
func decodeContinue(continueValue, keyPrefix string) (fromKey string, rv int64, err error) {
|
|
data, err := base64.RawURLEncoding.DecodeString(continueValue)
|
|
if err != nil {
|
|
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
|
|
}
|
|
var c continueToken
|
|
if err := json.Unmarshal(data, &c); err != nil {
|
|
return "", 0, fmt.Errorf("continue key is not valid: %v", err)
|
|
}
|
|
switch c.APIVersion {
|
|
case "meta.k8s.io/v1":
|
|
if c.ResourceVersion == 0 {
|
|
return "", 0, fmt.Errorf("continue key is not valid: incorrect encoded start resourceVersion (version meta.k8s.io/v1)")
|
|
}
|
|
if len(c.StartKey) == 0 {
|
|
return "", 0, fmt.Errorf("continue key is not valid: encoded start key empty (version meta.k8s.io/v1)")
|
|
}
|
|
// defend against path traversal attacks by clients - path.Clean will ensure that startKey cannot
|
|
// be at a higher level of the hierarchy, and so when we append the key prefix we will end up with
|
|
// continue start key that is fully qualified and cannot range over anything less specific than
|
|
// keyPrefix.
|
|
key := c.StartKey
|
|
if !strings.HasPrefix(key, "/") {
|
|
key = "/" + key
|
|
}
|
|
cleaned := path.Clean(key)
|
|
if cleaned != key {
|
|
return "", 0, fmt.Errorf("continue key is not valid: %s", c.StartKey)
|
|
}
|
|
return keyPrefix + cleaned[1:], c.ResourceVersion, nil
|
|
default:
|
|
return "", 0, fmt.Errorf("continue key is not valid: server does not recognize this encoded version %q", c.APIVersion)
|
|
}
|
|
}
|
|
|
|
// encodeContinue returns a string representing the encoded continuation of the current query.
|
|
func encodeContinue(key, keyPrefix string, resourceVersion int64) (string, error) {
|
|
nextKey := strings.TrimPrefix(key, keyPrefix)
|
|
if nextKey == key {
|
|
return "", fmt.Errorf("unable to encode next field: the key and key prefix do not match")
|
|
}
|
|
out, err := json.Marshal(&continueToken{APIVersion: "meta.k8s.io/v1", ResourceVersion: resourceVersion, StartKey: nextKey})
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return base64.RawURLEncoding.EncodeToString(out), nil
|
|
}
|
|
|
|
// List implements storage.Interface.List.
|
|
func (s *store) List(ctx context.Context, key, resourceVersion string, pred storage.SelectionPredicate, listObj runtime.Object) error {
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
v, err := conversion.EnforcePtr(listPtr)
|
|
if err != nil || v.Kind() != reflect.Slice {
|
|
panic("need ptr to slice")
|
|
}
|
|
|
|
if s.pathPrefix != "" {
|
|
key = path.Join(s.pathPrefix, key)
|
|
}
|
|
// We need to make sure the key ended with "/" so that we only get children "directories".
|
|
// e.g. if we have key "/a", "/a/b", "/ab", getting keys with prefix "/a" will return all three,
|
|
// while with prefix "/a/" will return only "/a/b" which is the correct answer.
|
|
if !strings.HasSuffix(key, "/") {
|
|
key += "/"
|
|
}
|
|
keyPrefix := key
|
|
|
|
// set the appropriate clientv3 options to filter the returned data set
|
|
var paging bool
|
|
options := make([]clientv3.OpOption, 0, 4)
|
|
if s.pagingEnabled && pred.Limit > 0 {
|
|
paging = true
|
|
options = append(options, clientv3.WithLimit(pred.Limit))
|
|
}
|
|
|
|
var returnedRV, continueRV int64
|
|
var continueKey string
|
|
switch {
|
|
case s.pagingEnabled && len(pred.Continue) > 0:
|
|
continueKey, continueRV, err = decodeContinue(pred.Continue, keyPrefix)
|
|
if err != nil {
|
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid continue token: %v", err))
|
|
}
|
|
|
|
if len(resourceVersion) > 0 && resourceVersion != "0" {
|
|
return apierrors.NewBadRequest("specifying resource version is not allowed when using continue")
|
|
}
|
|
|
|
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
|
options = append(options, clientv3.WithRange(rangeEnd))
|
|
key = continueKey
|
|
|
|
// If continueRV > 0, the LIST request needs a specific resource version.
|
|
// continueRV==0 is invalid.
|
|
// If continueRV < 0, the request is for the latest resource version.
|
|
if continueRV > 0 {
|
|
options = append(options, clientv3.WithRev(continueRV))
|
|
returnedRV = continueRV
|
|
}
|
|
case s.pagingEnabled && pred.Limit > 0:
|
|
if len(resourceVersion) > 0 {
|
|
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
|
}
|
|
if fromRV > 0 {
|
|
options = append(options, clientv3.WithRev(int64(fromRV)))
|
|
}
|
|
returnedRV = int64(fromRV)
|
|
}
|
|
|
|
rangeEnd := clientv3.GetPrefixRangeEnd(keyPrefix)
|
|
options = append(options, clientv3.WithRange(rangeEnd))
|
|
|
|
default:
|
|
if len(resourceVersion) > 0 {
|
|
fromRV, err := s.versioner.ParseResourceVersion(resourceVersion)
|
|
if err != nil {
|
|
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err))
|
|
}
|
|
if fromRV > 0 {
|
|
options = append(options, clientv3.WithRev(int64(fromRV)))
|
|
}
|
|
returnedRV = int64(fromRV)
|
|
}
|
|
|
|
options = append(options, clientv3.WithPrefix())
|
|
}
|
|
|
|
// loop until we have filled the requested limit from etcd or there are no more results
|
|
var lastKey []byte
|
|
var hasMore bool
|
|
for {
|
|
getResp, err := s.client.KV.Get(ctx, key, options...)
|
|
if err != nil {
|
|
return interpretListError(err, len(pred.Continue) > 0, continueKey, keyPrefix)
|
|
}
|
|
hasMore = getResp.More
|
|
|
|
if len(getResp.Kvs) == 0 && getResp.More {
|
|
return fmt.Errorf("no results were found, but etcd indicated there were more values remaining")
|
|
}
|
|
|
|
// avoid small allocations for the result slice, since this can be called in many
|
|
// different contexts and we don't know how significantly the result will be filtered
|
|
if pred.Empty() {
|
|
growSlice(v, len(getResp.Kvs))
|
|
} else {
|
|
growSlice(v, 2048, len(getResp.Kvs))
|
|
}
|
|
|
|
// take items from the response until the bucket is full, filtering as we go
|
|
for _, kv := range getResp.Kvs {
|
|
if paging && int64(v.Len()) >= pred.Limit {
|
|
hasMore = true
|
|
break
|
|
}
|
|
lastKey = kv.Key
|
|
|
|
data, _, err := s.transformer.TransformFromStorage(kv.Value, authenticatedDataString(kv.Key))
|
|
if err != nil {
|
|
utilruntime.HandleError(fmt.Errorf("unable to transform key %q: %v", kv.Key, err))
|
|
continue
|
|
}
|
|
|
|
if err := appendListItem(v, data, uint64(kv.ModRevision), pred, s.codec, s.versioner); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// indicate to the client which resource version was returned
|
|
if returnedRV == 0 {
|
|
returnedRV = getResp.Header.Revision
|
|
}
|
|
|
|
// no more results remain or we didn't request paging
|
|
if !hasMore || !paging {
|
|
break
|
|
}
|
|
// we're paging but we have filled our bucket
|
|
if int64(v.Len()) >= pred.Limit {
|
|
break
|
|
}
|
|
key = string(lastKey) + "\x00"
|
|
}
|
|
|
|
// instruct the client to begin querying from immediately after the last key we returned
|
|
// we never return a key that the client wouldn't be allowed to see
|
|
if hasMore {
|
|
// we want to start immediately after the last key
|
|
next, err := encodeContinue(string(lastKey)+"\x00", keyPrefix, returnedRV)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return s.versioner.UpdateList(listObj, uint64(returnedRV), next)
|
|
}
|
|
|
|
// no continuation
|
|
return s.versioner.UpdateList(listObj, uint64(returnedRV), "")
|
|
}
|
|
|
|
// growSlice takes a slice value and grows its capacity up
|
|
// to the maximum of the passed sizes or maxCapacity, whichever
|
|
// is smaller. Above maxCapacity decisions about allocation are left
|
|
// to the Go runtime on append. This allows a caller to make an
|
|
// educated guess about the potential size of the total list while
|
|
// still avoiding overly aggressive initial allocation. If sizes
|
|
// is empty maxCapacity will be used as the size to grow.
|
|
func growSlice(v reflect.Value, maxCapacity int, sizes ...int) {
|
|
cap := v.Cap()
|
|
max := cap
|
|
for _, size := range sizes {
|
|
if size > max {
|
|
max = size
|
|
}
|
|
}
|
|
if len(sizes) == 0 || max > maxCapacity {
|
|
max = maxCapacity
|
|
}
|
|
if max <= cap {
|
|
return
|
|
}
|
|
if v.Len() > 0 {
|
|
extra := reflect.MakeSlice(v.Type(), 0, max)
|
|
reflect.Copy(extra, v)
|
|
v.Set(extra)
|
|
} else {
|
|
extra := reflect.MakeSlice(v.Type(), 0, max)
|
|
v.Set(extra)
|
|
}
|
|
}
|
|
|
|
// Watch implements storage.Interface.Watch.
|
|
func (s *store) Watch(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
|
return s.watch(ctx, key, resourceVersion, pred, false)
|
|
}
|
|
|
|
// WatchList implements storage.Interface.WatchList.
|
|
func (s *store) WatchList(ctx context.Context, key string, resourceVersion string, pred storage.SelectionPredicate) (watch.Interface, error) {
|
|
return s.watch(ctx, key, resourceVersion, pred, true)
|
|
}
|
|
|
|
func (s *store) watch(ctx context.Context, key string, rv string, pred storage.SelectionPredicate, recursive bool) (watch.Interface, error) {
|
|
rev, err := s.versioner.ParseResourceVersion(rv)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
key = path.Join(s.pathPrefix, key)
|
|
return s.watcher.Watch(ctx, key, int64(rev), recursive, pred)
|
|
}
|
|
|
|
func (s *store) getState(getResp *clientv3.GetResponse, key string, v reflect.Value, ignoreNotFound bool) (*objState, error) {
|
|
state := &objState{
|
|
obj: reflect.New(v.Type()).Interface().(runtime.Object),
|
|
meta: &storage.ResponseMeta{},
|
|
}
|
|
if len(getResp.Kvs) == 0 {
|
|
if !ignoreNotFound {
|
|
return nil, storage.NewKeyNotFoundError(key, 0)
|
|
}
|
|
if err := runtime.SetZeroValue(state.obj); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
data, stale, err := s.transformer.TransformFromStorage(getResp.Kvs[0].Value, authenticatedDataString(key))
|
|
if err != nil {
|
|
return nil, storage.NewInternalError(err.Error())
|
|
}
|
|
state.rev = getResp.Kvs[0].ModRevision
|
|
state.meta.ResourceVersion = uint64(state.rev)
|
|
state.data = data
|
|
state.stale = stale
|
|
if err := decode(s.codec, s.versioner, state.data, state.obj, state.rev); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
return state, nil
|
|
}
|
|
|
|
func (s *store) getStateFromObject(obj runtime.Object) (*objState, error) {
|
|
state := &objState{
|
|
obj: obj,
|
|
meta: &storage.ResponseMeta{},
|
|
}
|
|
|
|
rv, err := s.versioner.ObjectResourceVersion(obj)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("couldn't get resource version: %v", err)
|
|
}
|
|
state.rev = int64(rv)
|
|
state.meta.ResourceVersion = uint64(state.rev)
|
|
|
|
// Compute the serialized form - for that we need to temporarily clean
|
|
// its resource version field (those are not stored in etcd).
|
|
if err := s.versioner.PrepareObjectForStorage(obj); err != nil {
|
|
return nil, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
|
}
|
|
state.data, err = runtime.Encode(s.codec, obj)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
s.versioner.UpdateObject(state.obj, uint64(rv))
|
|
return state, nil
|
|
}
|
|
|
|
func (s *store) updateState(st *objState, userUpdate storage.UpdateFunc) (runtime.Object, uint64, error) {
|
|
ret, ttlPtr, err := userUpdate(st.obj, *st.meta)
|
|
if err != nil {
|
|
return nil, 0, err
|
|
}
|
|
|
|
if err := s.versioner.PrepareObjectForStorage(ret); err != nil {
|
|
return nil, 0, fmt.Errorf("PrepareObjectForStorage failed: %v", err)
|
|
}
|
|
var ttl uint64
|
|
if ttlPtr != nil {
|
|
ttl = *ttlPtr
|
|
}
|
|
return ret, ttl, nil
|
|
}
|
|
|
|
// ttlOpts returns client options based on given ttl.
|
|
// ttl: if ttl is non-zero, it will attach the key to a lease with ttl of roughly the same length
|
|
func (s *store) ttlOpts(ctx context.Context, ttl int64) ([]clientv3.OpOption, error) {
|
|
if ttl == 0 {
|
|
return nil, nil
|
|
}
|
|
id, err := s.leaseManager.GetLease(ctx, ttl)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return []clientv3.OpOption{clientv3.WithLease(id)}, nil
|
|
}
|
|
|
|
// decode decodes value of bytes into object. It will also set the object resource version to rev.
|
|
// On success, objPtr would be set to the object.
|
|
func decode(codec runtime.Codec, versioner storage.Versioner, value []byte, objPtr runtime.Object, rev int64) error {
|
|
if _, err := conversion.EnforcePtr(objPtr); err != nil {
|
|
panic("unable to convert output object to pointer")
|
|
}
|
|
_, _, err := codec.Decode(value, nil, objPtr)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// being unable to set the version does not prevent the object from being extracted
|
|
versioner.UpdateObject(objPtr, uint64(rev))
|
|
return nil
|
|
}
|
|
|
|
// appendListItem decodes and appends the object (if it passes filter) to v, which must be a slice.
|
|
func appendListItem(v reflect.Value, data []byte, rev uint64, pred storage.SelectionPredicate, codec runtime.Codec, versioner storage.Versioner) error {
|
|
obj, _, err := codec.Decode(data, nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
// being unable to set the version does not prevent the object from being extracted
|
|
versioner.UpdateObject(obj, rev)
|
|
if matched, err := pred.Matches(obj); err == nil && matched {
|
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func notFound(key string) clientv3.Cmp {
|
|
return clientv3.Compare(clientv3.ModRevision(key), "=", 0)
|
|
}
|