2014-06-17 23:23:52 +00:00
|
|
|
/*
|
2015-05-01 16:19:44 +00:00
|
|
|
Copyright 2014 The Kubernetes Authors All rights reserved.
|
2014-06-17 23:23:52 +00:00
|
|
|
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
you may not use this file except in compliance with the License.
|
|
|
|
You may obtain a copy of the License at
|
|
|
|
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
See the License for the specific language governing permissions and
|
|
|
|
limitations under the License.
|
|
|
|
*/
|
|
|
|
|
2015-07-30 11:27:18 +00:00
|
|
|
package etcd
|
2014-06-17 23:23:52 +00:00
|
|
|
|
|
|
|
import (
|
2014-08-03 17:07:40 +00:00
|
|
|
"errors"
|
2014-06-17 23:23:52 +00:00
|
|
|
"fmt"
|
2015-12-10 14:03:59 +00:00
|
|
|
"net"
|
2015-12-08 20:01:29 +00:00
|
|
|
"net/http"
|
2015-03-11 17:10:09 +00:00
|
|
|
"path"
|
2014-06-17 23:23:52 +00:00
|
|
|
"reflect"
|
2015-03-11 17:10:09 +00:00
|
|
|
"strings"
|
2015-05-06 11:23:22 +00:00
|
|
|
"time"
|
2014-06-17 23:23:52 +00:00
|
|
|
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api"
|
2015-11-12 10:45:42 +00:00
|
|
|
"k8s.io/kubernetes/pkg/api/meta"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/conversion"
|
|
|
|
"k8s.io/kubernetes/pkg/runtime"
|
|
|
|
"k8s.io/kubernetes/pkg/storage"
|
2015-11-10 11:23:51 +00:00
|
|
|
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
|
2015-11-23 19:32:50 +00:00
|
|
|
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
|
2015-08-05 22:03:47 +00:00
|
|
|
"k8s.io/kubernetes/pkg/util"
|
|
|
|
"k8s.io/kubernetes/pkg/watch"
|
2014-10-27 17:04:39 +00:00
|
|
|
|
2015-12-10 14:03:59 +00:00
|
|
|
etcd "github.com/coreos/etcd/client"
|
2014-10-27 17:04:39 +00:00
|
|
|
"github.com/golang/glog"
|
2015-10-09 14:49:01 +00:00
|
|
|
"golang.org/x/net/context"
|
2014-06-17 23:23:52 +00:00
|
|
|
)
|
|
|
|
|
2015-12-08 20:01:29 +00:00
|
|
|
// storage.Config object for etcd.
|
|
|
|
type EtcdConfig struct {
|
|
|
|
ServerList []string
|
|
|
|
Codec runtime.Codec
|
|
|
|
Prefix string
|
2016-01-26 06:17:11 +00:00
|
|
|
Quorum bool
|
2015-12-08 20:01:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// implements storage.Config
|
|
|
|
func (c *EtcdConfig) GetType() string {
|
|
|
|
return "etcd"
|
|
|
|
}
|
|
|
|
|
|
|
|
// implements storage.Config
|
|
|
|
func (c *EtcdConfig) NewStorage() (storage.Interface, error) {
|
2015-12-10 14:03:59 +00:00
|
|
|
cfg := etcd.Config{
|
|
|
|
Endpoints: c.ServerList,
|
|
|
|
// TODO: Determine if transport needs optimization
|
|
|
|
Transport: &http.Transport{
|
|
|
|
Proxy: http.ProxyFromEnvironment,
|
|
|
|
Dial: (&net.Dialer{
|
|
|
|
Timeout: 30 * time.Second,
|
|
|
|
KeepAlive: 30 * time.Second,
|
|
|
|
}).Dial,
|
|
|
|
TLSHandshakeTimeout: 10 * time.Second,
|
|
|
|
MaxIdleConnsPerHost: 500,
|
2015-12-08 20:01:29 +00:00
|
|
|
},
|
|
|
|
}
|
2015-12-10 14:03:59 +00:00
|
|
|
etcdClient, err := etcd.New(cfg)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2016-01-26 06:17:11 +00:00
|
|
|
return NewEtcdStorage(etcdClient, c.Codec, c.Prefix, c.Quorum), nil
|
2015-12-08 20:01:29 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
// Creates a new storage interface from the client
|
|
|
|
// TODO: deprecate in favor of storage.Config abstraction over time
|
2016-01-26 06:17:11 +00:00
|
|
|
func NewEtcdStorage(client etcd.Client, codec runtime.Codec, prefix string, quorum bool) storage.Interface {
|
2015-07-24 11:09:49 +00:00
|
|
|
return &etcdHelper{
|
2015-12-10 14:03:59 +00:00
|
|
|
etcdclient: client,
|
|
|
|
client: etcd.NewKeysAPI(client),
|
2015-07-24 11:09:49 +00:00
|
|
|
codec: codec,
|
|
|
|
versioner: APIObjectVersioner{},
|
|
|
|
copier: api.Scheme,
|
2015-10-27 20:35:07 +00:00
|
|
|
pathPrefix: path.Join("/", prefix),
|
2016-01-26 06:17:11 +00:00
|
|
|
quorum: quorum,
|
2015-07-21 09:19:11 +00:00
|
|
|
cache: util.NewCache(maxEtcdCacheEntries),
|
|
|
|
}
|
2015-05-08 07:16:24 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// etcdHelper is the reference implementation of storage.Interface.
|
2015-07-24 11:09:49 +00:00
|
|
|
type etcdHelper struct {
|
2015-12-10 14:03:59 +00:00
|
|
|
etcdclient etcd.Client
|
|
|
|
client etcd.KeysAPI
|
|
|
|
codec runtime.Codec
|
|
|
|
copier runtime.ObjectCopier
|
2015-07-21 09:19:11 +00:00
|
|
|
// optional, has to be set to perform any atomic operations
|
2015-07-30 07:45:06 +00:00
|
|
|
versioner storage.Versioner
|
2015-03-11 17:10:09 +00:00
|
|
|
// prefix for all etcd keys
|
2015-07-24 11:09:49 +00:00
|
|
|
pathPrefix string
|
2016-01-26 06:17:11 +00:00
|
|
|
// if true, perform quorum read
|
|
|
|
quorum bool
|
2015-04-24 11:07:32 +00:00
|
|
|
|
|
|
|
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
|
|
|
|
// to resourceVersion.
|
|
|
|
// This depends on etcd's indexes being globally unique across all objects/types. This will
|
|
|
|
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
|
|
|
|
// support multi-object transaction that will result in many objects with the same index.
|
|
|
|
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
|
|
|
|
// TODO: Measure how much this cache helps after the conversion code is optimized.
|
2015-05-08 07:16:24 +00:00
|
|
|
cache util.Cache
|
2015-03-05 05:17:29 +00:00
|
|
|
}
|
|
|
|
|
2015-07-21 09:19:11 +00:00
|
|
|
func init() {
|
|
|
|
metrics.Register()
|
2014-08-11 04:08:06 +00:00
|
|
|
}
|
|
|
|
|
2015-08-05 13:39:24 +00:00
|
|
|
// Codec provides access to the underlying codec being used by the implementation.
|
|
|
|
func (h *etcdHelper) Codec() runtime.Codec {
|
|
|
|
return h.codec
|
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) Backends(ctx context.Context) []string {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-12-10 14:03:59 +00:00
|
|
|
membersAPI := etcd.NewMembersAPI(h.etcdclient)
|
|
|
|
members, err := membersAPI.List(ctx)
|
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error obtaining etcd members list: %q", err)
|
|
|
|
return nil
|
|
|
|
}
|
2015-12-30 11:22:27 +00:00
|
|
|
mlist := []string{}
|
2015-12-10 14:03:59 +00:00
|
|
|
for _, member := range members {
|
|
|
|
mlist = append(mlist, member.ClientURLs...)
|
|
|
|
}
|
|
|
|
return mlist
|
2015-07-24 11:09:49 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
|
|
|
func (h *etcdHelper) Versioner() storage.Versioner {
|
2015-07-24 11:09:49 +00:00
|
|
|
return h.versioner
|
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) Create(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
2016-01-26 16:14:30 +00:00
|
|
|
trace := util.NewTrace("etcdHelper::Create " + getTypeName(obj))
|
|
|
|
defer trace.LogIfLong(250 * time.Millisecond)
|
2015-10-09 14:49:01 +00:00
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2016-01-22 05:11:30 +00:00
|
|
|
data, err := runtime.Encode(h.codec, obj)
|
2016-01-26 16:14:30 +00:00
|
|
|
trace.Step("Object encoded")
|
2014-06-17 23:23:52 +00:00
|
|
|
if err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-04-24 11:07:32 +00:00
|
|
|
}
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
|
|
|
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
2015-07-24 07:19:08 +00:00
|
|
|
return errors.New("resourceVersion may not be set on objects to be created")
|
2015-04-24 11:07:32 +00:00
|
|
|
}
|
|
|
|
}
|
2016-01-26 16:14:30 +00:00
|
|
|
trace.Step("Version checked")
|
2015-04-24 11:07:32 +00:00
|
|
|
|
2015-05-08 07:16:24 +00:00
|
|
|
startTime := time.Now()
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.SetOptions{
|
|
|
|
TTL: time.Duration(ttl) * time.Second,
|
|
|
|
PrevExist: etcd.PrevNoExist,
|
|
|
|
}
|
|
|
|
response, err := h.client.Set(ctx, key, string(data), &opts)
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
2016-01-26 16:14:30 +00:00
|
|
|
trace.Step("Object created")
|
2015-04-24 11:07:32 +00:00
|
|
|
if err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-04-24 11:07:32 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
if out != nil {
|
|
|
|
if _, err := conversion.EnforcePtr(out); err != nil {
|
|
|
|
panic("unable to convert output object to pointer")
|
|
|
|
}
|
|
|
|
_, _, err = h.extractObj(response, err, out, false, false)
|
2015-05-04 11:20:15 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-04-24 11:07:32 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) Set(ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
var response *etcd.Response
|
2016-01-22 05:11:30 +00:00
|
|
|
data, err := runtime.Encode(h.codec, obj)
|
2014-09-23 23:58:08 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-07-21 09:19:11 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2015-07-24 07:19:08 +00:00
|
|
|
|
|
|
|
create := true
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
|
|
|
if version, err := h.versioner.ObjectResourceVersion(obj); err == nil && version != 0 {
|
2015-07-24 07:19:08 +00:00
|
|
|
create = false
|
|
|
|
startTime := time.Now()
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.SetOptions{
|
|
|
|
TTL: time.Duration(ttl) * time.Second,
|
|
|
|
PrevIndex: version,
|
|
|
|
}
|
|
|
|
response, err = h.client.Set(ctx, key, string(data), &opts)
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(obj), startTime)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-04-13 10:33:37 +00:00
|
|
|
}
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
if create {
|
|
|
|
// Create will fail if a key already exists.
|
|
|
|
startTime := time.Now()
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.SetOptions{
|
|
|
|
TTL: time.Duration(ttl) * time.Second,
|
|
|
|
PrevExist: etcd.PrevNoExist,
|
|
|
|
}
|
|
|
|
response, err = h.client.Set(ctx, key, string(data), &opts)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("create", getTypeName(obj), startTime)
|
|
|
|
}
|
2015-04-13 10:33:37 +00:00
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
if out != nil {
|
|
|
|
if _, err := conversion.EnforcePtr(out); err != nil {
|
|
|
|
panic("unable to convert output object to pointer")
|
2015-04-16 22:07:11 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
_, _, err = h.extractObj(response, err, out, false, false)
|
2015-04-13 10:33:37 +00:00
|
|
|
}
|
|
|
|
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-04-13 10:33:37 +00:00
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) Delete(ctx context.Context, key string, out runtime.Object) error {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
|
|
|
if _, err := conversion.EnforcePtr(out); err != nil {
|
|
|
|
panic("unable to convert output object to pointer")
|
2015-04-13 10:33:37 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
|
|
|
|
startTime := time.Now()
|
2015-12-10 14:03:59 +00:00
|
|
|
response, err := h.client.Delete(ctx, key, nil)
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("delete", getTypeName(out), startTime)
|
2015-11-23 19:32:50 +00:00
|
|
|
if !etcdutil.IsEtcdNotFound(err) {
|
2015-07-24 07:19:08 +00:00
|
|
|
// if the object that existed prior to the delete is returned by etcd, update out.
|
|
|
|
if err != nil || response.PrevNode != nil {
|
|
|
|
_, _, err = h.extractObj(response, err, out, false, true)
|
2014-09-23 23:58:08 +00:00
|
|
|
}
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-12-04 08:58:24 +00:00
|
|
|
func (h *etcdHelper) Watch(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
|
2015-10-09 14:49:01 +00:00
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-12-04 08:58:24 +00:00
|
|
|
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-07-27 07:54:07 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2016-01-26 06:17:11 +00:00
|
|
|
w := newEtcdWatcher(false, h.quorum, nil, filter, h.codec, h.versioner, nil, h)
|
2015-12-10 14:03:59 +00:00
|
|
|
go w.etcdWatch(ctx, h.client, key, watchRV)
|
2015-07-27 07:54:07 +00:00
|
|
|
return w, nil
|
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-12-04 08:58:24 +00:00
|
|
|
func (h *etcdHelper) WatchList(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc) (watch.Interface, error) {
|
2015-10-09 14:49:01 +00:00
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-12-04 08:58:24 +00:00
|
|
|
watchRV, err := storage.ParseWatchResourceVersion(resourceVersion)
|
|
|
|
if err != nil {
|
|
|
|
return nil, err
|
|
|
|
}
|
2015-07-27 07:54:07 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2016-01-26 06:17:11 +00:00
|
|
|
w := newEtcdWatcher(true, h.quorum, exceptKey(key), filter, h.codec, h.versioner, nil, h)
|
2015-12-10 14:03:59 +00:00
|
|
|
go w.etcdWatch(ctx, h.client, key, watchRV)
|
2015-07-27 07:54:07 +00:00
|
|
|
return w, nil
|
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) Get(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) error {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-21 09:19:11 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2015-10-09 14:49:01 +00:00
|
|
|
_, _, _, err := h.bodyAndExtractObj(ctx, key, objPtr, ignoreNotFound)
|
2014-07-02 20:51:27 +00:00
|
|
|
return err
|
2014-06-27 19:54:45 +00:00
|
|
|
}
|
2014-06-27 17:55:05 +00:00
|
|
|
|
2015-05-23 00:17:49 +00:00
|
|
|
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
|
|
|
|
// about the response, like the current etcd index and the ttl.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) bodyAndExtractObj(ctx context.Context, key string, objPtr runtime.Object, ignoreNotFound bool) (body string, node *etcd.Node, res *etcd.Response, err error) {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-05-06 11:23:22 +00:00
|
|
|
startTime := time.Now()
|
2016-01-26 06:17:11 +00:00
|
|
|
|
|
|
|
opts := &etcd.GetOptions{
|
|
|
|
Quorum: h.quorum,
|
|
|
|
}
|
|
|
|
|
|
|
|
response, err := h.client.Get(ctx, key, opts)
|
2015-07-20 14:12:24 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("get", getTypeName(objPtr), startTime)
|
2014-06-18 20:10:19 +00:00
|
|
|
|
2015-11-23 19:32:50 +00:00
|
|
|
if err != nil && !etcdutil.IsEtcdNotFound(err) {
|
2015-05-23 00:17:49 +00:00
|
|
|
return "", nil, nil, err
|
2014-06-17 23:23:52 +00:00
|
|
|
}
|
2015-05-23 00:17:49 +00:00
|
|
|
body, node, err = h.extractObj(response, err, objPtr, ignoreNotFound, false)
|
|
|
|
return body, node, response, err
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
|
|
|
|
2015-07-24 11:09:49 +00:00
|
|
|
func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr runtime.Object, ignoreNotFound, prevNode bool) (body string, node *etcd.Node, err error) {
|
2015-02-11 23:35:05 +00:00
|
|
|
if response != nil {
|
|
|
|
if prevNode {
|
|
|
|
node = response.PrevNode
|
|
|
|
} else {
|
|
|
|
node = response.Node
|
|
|
|
}
|
|
|
|
}
|
|
|
|
if inErr != nil || node == nil || len(node.Value) == 0 {
|
2014-06-17 23:23:52 +00:00
|
|
|
if ignoreNotFound {
|
2014-10-28 08:02:29 +00:00
|
|
|
v, err := conversion.EnforcePtr(objPtr)
|
|
|
|
if err != nil {
|
2015-05-23 00:17:49 +00:00
|
|
|
return "", nil, err
|
2014-10-28 08:02:29 +00:00
|
|
|
}
|
|
|
|
v.Set(reflect.Zero(v.Type()))
|
2015-05-23 00:17:49 +00:00
|
|
|
return "", nil, nil
|
2015-02-11 23:35:05 +00:00
|
|
|
} else if inErr != nil {
|
2015-05-23 00:17:49 +00:00
|
|
|
return "", nil, inErr
|
2014-06-17 23:23:52 +00:00
|
|
|
}
|
2015-05-23 00:17:49 +00:00
|
|
|
return "", nil, fmt.Errorf("unable to locate a value on the response: %#v", response)
|
2014-06-27 03:24:10 +00:00
|
|
|
}
|
2015-02-11 23:35:05 +00:00
|
|
|
body = node.Value
|
2016-01-22 05:11:30 +00:00
|
|
|
out, gvk, err := h.codec.Decode([]byte(body), nil, objPtr)
|
|
|
|
if err != nil {
|
|
|
|
return body, nil, err
|
|
|
|
}
|
|
|
|
if out != objPtr {
|
|
|
|
return body, nil, fmt.Errorf("unable to decode object %s into %v", gvk.String(), reflect.TypeOf(objPtr))
|
|
|
|
}
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
|
|
|
_ = h.versioner.UpdateObject(objPtr, node.Expiration, node.ModifiedIndex)
|
2014-08-04 00:23:56 +00:00
|
|
|
// being unable to set the version does not prevent the object from being extracted
|
2014-07-02 20:51:27 +00:00
|
|
|
}
|
2015-05-23 00:17:49 +00:00
|
|
|
return body, node, err
|
2014-06-27 03:24:10 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) GetToList(ctx context.Context, key string, filter storage.FilterFunc, listObj runtime.Object) error {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-27 09:59:09 +00:00
|
|
|
trace := util.NewTrace("GetToList " + getTypeName(listObj))
|
2015-11-12 10:45:42 +00:00
|
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
2015-02-11 23:35:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
|
|
|
startTime := time.Now()
|
|
|
|
trace.Step("About to read etcd node")
|
2016-01-26 06:17:11 +00:00
|
|
|
|
|
|
|
opts := &etcd.GetOptions{
|
|
|
|
Quorum: h.quorum,
|
|
|
|
}
|
|
|
|
response, err := h.client.Get(ctx, key, opts)
|
|
|
|
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("get", getTypeName(listPtr), startTime)
|
|
|
|
trace.Step("Etcd node read")
|
|
|
|
if err != nil {
|
2015-11-23 19:32:50 +00:00
|
|
|
if etcdutil.IsEtcdNotFound(err) {
|
2015-07-24 07:19:08 +00:00
|
|
|
return nil
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
2015-03-11 17:10:09 +00:00
|
|
|
|
2015-07-24 07:19:08 +00:00
|
|
|
nodes := make([]*etcd.Node, 0)
|
|
|
|
nodes = append(nodes, response.Node)
|
|
|
|
|
2015-09-09 10:35:44 +00:00
|
|
|
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
2015-02-11 23:35:05 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
trace.Step("Object decoded")
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
2015-12-10 14:03:59 +00:00
|
|
|
if err := h.versioner.UpdateList(listObj, response.Index); err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-02-27 14:59:49 +00:00
|
|
|
}
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
return nil
|
2014-08-06 02:23:22 +00:00
|
|
|
}
|
|
|
|
|
2015-07-24 07:19:08 +00:00
|
|
|
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
|
2015-09-09 10:35:44 +00:00
|
|
|
func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, filter storage.FilterFunc, slicePtr interface{}) error {
|
2015-07-24 07:19:08 +00:00
|
|
|
trace := util.NewTrace("decodeNodeList " + getTypeName(slicePtr))
|
|
|
|
defer trace.LogIfLong(500 * time.Millisecond)
|
|
|
|
v, err := conversion.EnforcePtr(slicePtr)
|
|
|
|
if err != nil || v.Kind() != reflect.Slice {
|
|
|
|
// This should not happen at runtime.
|
|
|
|
panic("need ptr to slice")
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
for _, node := range nodes {
|
|
|
|
if node.Dir {
|
|
|
|
trace.Step("Decoding dir " + node.Key + " START")
|
2015-09-09 10:35:44 +00:00
|
|
|
if err := h.decodeNodeList(node.Nodes, filter, slicePtr); err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
trace.Step("Decoding dir " + node.Key + " END")
|
|
|
|
continue
|
|
|
|
}
|
2015-10-14 11:17:00 +00:00
|
|
|
if obj, found := h.getFromCache(node.ModifiedIndex, filter); found {
|
|
|
|
// obj != nil iff it matches the filter function.
|
|
|
|
if obj != nil {
|
2015-09-09 10:35:44 +00:00
|
|
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
} else {
|
2016-01-22 05:11:30 +00:00
|
|
|
obj, _, err := h.codec.Decode([]byte(node.Value), nil, reflect.New(v.Type().Elem()).Interface().(runtime.Object))
|
|
|
|
if err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
|
|
|
}
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
// being unable to set the version does not prevent the object from being extracted
|
2016-01-22 05:11:30 +00:00
|
|
|
_ = h.versioner.UpdateObject(obj, node.Expiration, node.ModifiedIndex)
|
2015-07-24 07:19:08 +00:00
|
|
|
}
|
2016-01-22 05:11:30 +00:00
|
|
|
if filter(obj) {
|
|
|
|
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem()))
|
2015-09-09 10:35:44 +00:00
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
if node.ModifiedIndex != 0 {
|
2016-01-22 05:11:30 +00:00
|
|
|
h.addToCache(node.ModifiedIndex, obj)
|
2015-07-24 07:19:08 +00:00
|
|
|
}
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
trace.Step(fmt.Sprintf("Decoded %v nodes", len(nodes)))
|
|
|
|
return nil
|
2015-02-11 23:35:05 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-12-04 08:58:24 +00:00
|
|
|
func (h *etcdHelper) List(ctx context.Context, key string, resourceVersion string, filter storage.FilterFunc, listObj runtime.Object) error {
|
2015-10-09 14:49:01 +00:00
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-07-27 09:59:09 +00:00
|
|
|
trace := util.NewTrace("List " + getTypeName(listObj))
|
2015-07-24 07:19:08 +00:00
|
|
|
defer trace.LogIfLong(time.Second)
|
2015-11-12 10:45:42 +00:00
|
|
|
listPtr, err := meta.GetItemsPtr(listObj)
|
2014-06-27 03:24:10 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
2014-06-17 23:23:52 +00:00
|
|
|
}
|
2015-07-21 09:19:11 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2015-07-24 07:19:08 +00:00
|
|
|
startTime := time.Now()
|
|
|
|
trace.Step("About to list etcd node")
|
2015-10-09 14:49:01 +00:00
|
|
|
nodes, index, err := h.listEtcdNode(ctx, key)
|
2015-07-24 07:19:08 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("list", getTypeName(listPtr), startTime)
|
|
|
|
trace.Step("Etcd node listed")
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-09-09 10:35:44 +00:00
|
|
|
if err := h.decodeNodeList(nodes, filter, listPtr); err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
trace.Step("Node list decoded")
|
2015-07-24 11:09:49 +00:00
|
|
|
if h.versioner != nil {
|
|
|
|
if err := h.versioner.UpdateList(listObj, index); err != nil {
|
2015-07-24 07:19:08 +00:00
|
|
|
return err
|
2015-02-27 14:59:49 +00:00
|
|
|
}
|
|
|
|
}
|
2015-07-24 07:19:08 +00:00
|
|
|
return nil
|
|
|
|
}
|
2015-02-27 14:59:49 +00:00
|
|
|
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) listEtcdNode(ctx context.Context, key string) ([]*etcd.Node, uint64, error) {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.GetOptions{
|
|
|
|
Recursive: true,
|
|
|
|
Sort: true,
|
2016-01-26 06:17:11 +00:00
|
|
|
Quorum: h.quorum,
|
2015-12-10 14:03:59 +00:00
|
|
|
}
|
|
|
|
result, err := h.client.Get(ctx, key, &opts)
|
2015-02-27 14:59:49 +00:00
|
|
|
if err != nil {
|
2015-11-23 19:32:50 +00:00
|
|
|
var index uint64
|
2015-12-10 14:03:59 +00:00
|
|
|
if etcdError, ok := err.(etcd.Error); ok {
|
2015-11-23 19:32:50 +00:00
|
|
|
index = etcdError.Index
|
2015-07-24 07:19:08 +00:00
|
|
|
}
|
|
|
|
nodes := make([]*etcd.Node, 0)
|
2015-11-23 19:32:50 +00:00
|
|
|
if etcdutil.IsEtcdNotFound(err) {
|
2015-07-24 07:19:08 +00:00
|
|
|
return nodes, index, nil
|
|
|
|
} else {
|
|
|
|
return nodes, index, err
|
2014-08-04 00:23:56 +00:00
|
|
|
}
|
2014-06-30 19:00:14 +00:00
|
|
|
}
|
2015-12-10 14:03:59 +00:00
|
|
|
return result.Node.Nodes, result.Index, nil
|
2014-06-17 23:23:52 +00:00
|
|
|
}
|
|
|
|
|
2015-07-30 07:45:06 +00:00
|
|
|
// Implements storage.Interface.
|
2015-10-09 14:49:01 +00:00
|
|
|
func (h *etcdHelper) GuaranteedUpdate(ctx context.Context, key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error {
|
|
|
|
if ctx == nil {
|
|
|
|
glog.Errorf("Context is nil")
|
|
|
|
}
|
2014-10-28 08:02:29 +00:00
|
|
|
v, err := conversion.EnforcePtr(ptrToType)
|
|
|
|
if err != nil {
|
2014-06-27 22:02:06 +00:00
|
|
|
// Panic is appropriate, because this is a programming error.
|
|
|
|
panic("need ptr to type")
|
|
|
|
}
|
2015-07-21 09:19:11 +00:00
|
|
|
key = h.prefixEtcdKey(key)
|
2014-06-27 17:55:05 +00:00
|
|
|
for {
|
2014-10-28 08:02:29 +00:00
|
|
|
obj := reflect.New(v.Type()).Interface().(runtime.Object)
|
2015-10-09 14:49:01 +00:00
|
|
|
origBody, node, res, err := h.bodyAndExtractObj(ctx, key, obj, ignoreNotFound)
|
2014-06-27 17:55:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
2015-07-30 07:27:38 +00:00
|
|
|
meta := storage.ResponseMeta{}
|
2015-05-23 00:17:49 +00:00
|
|
|
if node != nil {
|
|
|
|
meta.TTL = node.TTL
|
2015-06-19 00:42:01 +00:00
|
|
|
if node.Expiration != nil {
|
|
|
|
meta.Expiration = node.Expiration
|
|
|
|
}
|
|
|
|
meta.ResourceVersion = node.ModifiedIndex
|
2015-05-23 00:17:49 +00:00
|
|
|
}
|
2015-06-19 00:42:01 +00:00
|
|
|
// Get the object to be written by calling tryUpdate.
|
2015-05-23 00:17:49 +00:00
|
|
|
ret, newTTL, err := tryUpdate(obj, meta)
|
2014-06-27 17:55:05 +00:00
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2015-05-23 00:17:49 +00:00
|
|
|
index := uint64(0)
|
|
|
|
ttl := uint64(0)
|
|
|
|
if node != nil {
|
|
|
|
index = node.ModifiedIndex
|
2015-08-19 23:59:43 +00:00
|
|
|
if node.TTL != 0 {
|
2015-05-23 00:17:49 +00:00
|
|
|
ttl = uint64(node.TTL)
|
|
|
|
}
|
2015-08-19 23:59:43 +00:00
|
|
|
if node.Expiration != nil && ttl == 0 {
|
|
|
|
ttl = 1
|
|
|
|
}
|
2015-05-23 00:17:49 +00:00
|
|
|
} else if res != nil {
|
2015-12-10 14:03:59 +00:00
|
|
|
index = res.Index
|
2015-05-23 00:17:49 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
if newTTL != nil {
|
2015-08-19 23:59:43 +00:00
|
|
|
if ttl != 0 && *newTTL == 0 {
|
|
|
|
// TODO: remove this after we have verified this is no longer an issue
|
|
|
|
glog.V(4).Infof("GuaranteedUpdate is clearing TTL for %q, may not be intentional", key)
|
|
|
|
}
|
2015-05-23 00:17:49 +00:00
|
|
|
ttl = *newTTL
|
|
|
|
}
|
|
|
|
|
2016-01-22 05:11:30 +00:00
|
|
|
data, err := runtime.Encode(h.codec, ret)
|
2014-08-05 18:34:00 +00:00
|
|
|
if err != nil {
|
2014-07-30 10:05:10 +00:00
|
|
|
return err
|
|
|
|
}
|
2014-08-05 18:43:19 +00:00
|
|
|
|
|
|
|
// First time this key has been used, try creating new value.
|
|
|
|
if index == 0 {
|
2015-05-06 11:23:22 +00:00
|
|
|
startTime := time.Now()
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.SetOptions{
|
|
|
|
TTL: time.Duration(ttl) * time.Second,
|
|
|
|
PrevExist: etcd.PrevNoExist,
|
|
|
|
}
|
|
|
|
response, err := h.client.Set(ctx, key, string(data), &opts)
|
2015-07-20 14:12:24 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("create", getTypeName(ptrToType), startTime)
|
2015-11-23 19:32:50 +00:00
|
|
|
if etcdutil.IsEtcdNodeExist(err) {
|
2014-08-05 18:43:19 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-03-03 21:16:50 +00:00
|
|
|
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
2014-08-05 18:43:19 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
2014-08-14 19:51:20 +00:00
|
|
|
if string(data) == origBody {
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
2015-05-06 11:23:22 +00:00
|
|
|
startTime := time.Now()
|
2015-06-19 00:42:01 +00:00
|
|
|
// Swap origBody with data, if origBody is the latest etcd data.
|
2015-12-10 14:03:59 +00:00
|
|
|
opts := etcd.SetOptions{
|
|
|
|
PrevValue: origBody,
|
|
|
|
PrevIndex: index,
|
|
|
|
TTL: time.Duration(ttl) * time.Second,
|
|
|
|
}
|
|
|
|
response, err := h.client.Set(ctx, key, string(data), &opts)
|
2015-07-20 14:12:24 +00:00
|
|
|
metrics.RecordEtcdRequestLatency("compareAndSwap", getTypeName(ptrToType), startTime)
|
2015-11-23 19:32:50 +00:00
|
|
|
if etcdutil.IsEtcdTestFailed(err) {
|
2015-06-19 00:42:01 +00:00
|
|
|
// Try again.
|
2014-06-27 19:54:45 +00:00
|
|
|
continue
|
|
|
|
}
|
2015-02-11 23:35:05 +00:00
|
|
|
_, _, err = h.extractObj(response, err, ptrToType, false, false)
|
2014-06-17 23:23:52 +00:00
|
|
|
return err
|
|
|
|
}
|
|
|
|
}
|
2014-10-27 17:04:39 +00:00
|
|
|
|
2015-07-24 11:09:49 +00:00
|
|
|
func (h *etcdHelper) prefixEtcdKey(key string) string {
|
2015-10-27 20:35:07 +00:00
|
|
|
if strings.HasPrefix(key, h.pathPrefix) {
|
2015-03-11 17:10:09 +00:00
|
|
|
return key
|
|
|
|
}
|
2015-10-27 20:35:07 +00:00
|
|
|
return path.Join(h.pathPrefix, key)
|
2015-03-11 17:10:09 +00:00
|
|
|
}
|
|
|
|
|
2015-07-24 07:19:08 +00:00
|
|
|
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
|
|
|
|
// their Node.ModifiedIndex, which is unique across all types.
|
|
|
|
// All implementations must be thread-safe.
|
|
|
|
type etcdCache interface {
|
2015-10-14 11:17:00 +00:00
|
|
|
getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool)
|
2015-07-24 07:19:08 +00:00
|
|
|
addToCache(index uint64, obj runtime.Object)
|
|
|
|
}
|
|
|
|
|
|
|
|
const maxEtcdCacheEntries int = 50000
|
|
|
|
|
|
|
|
func getTypeName(obj interface{}) string {
|
|
|
|
return reflect.TypeOf(obj).String()
|
|
|
|
}
|
|
|
|
|
2015-10-14 11:17:00 +00:00
|
|
|
func (h *etcdHelper) getFromCache(index uint64, filter storage.FilterFunc) (runtime.Object, bool) {
|
2015-07-24 07:19:08 +00:00
|
|
|
startTime := time.Now()
|
|
|
|
defer func() {
|
|
|
|
metrics.ObserveGetCache(startTime)
|
|
|
|
}()
|
|
|
|
obj, found := h.cache.Get(index)
|
|
|
|
if found {
|
2015-10-14 11:17:00 +00:00
|
|
|
if !filter(obj.(runtime.Object)) {
|
|
|
|
return nil, true
|
|
|
|
}
|
2015-08-08 21:29:57 +00:00
|
|
|
// We should not return the object itself to avoid polluting the cache if someone
|
2015-07-24 07:19:08 +00:00
|
|
|
// modifies returned values.
|
2015-07-24 11:09:49 +00:00
|
|
|
objCopy, err := h.copier.Copy(obj.(runtime.Object))
|
2015-07-24 07:19:08 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
2015-10-14 11:17:00 +00:00
|
|
|
// We can't return a copy, thus we report the object as not found.
|
2015-07-24 07:19:08 +00:00
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
metrics.ObserveCacheHit()
|
|
|
|
return objCopy.(runtime.Object), true
|
|
|
|
}
|
|
|
|
metrics.ObserveCacheMiss()
|
|
|
|
return nil, false
|
|
|
|
}
|
|
|
|
|
2015-07-24 11:09:49 +00:00
|
|
|
func (h *etcdHelper) addToCache(index uint64, obj runtime.Object) {
|
2015-07-24 07:19:08 +00:00
|
|
|
startTime := time.Now()
|
|
|
|
defer func() {
|
|
|
|
metrics.ObserveAddCache(startTime)
|
|
|
|
}()
|
2015-07-24 11:09:49 +00:00
|
|
|
objCopy, err := h.copier.Copy(obj)
|
2015-07-24 07:19:08 +00:00
|
|
|
if err != nil {
|
|
|
|
glog.Errorf("Error during DeepCopy of cached object: %q", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
isOverwrite := h.cache.Add(index, objCopy)
|
|
|
|
if !isOverwrite {
|
|
|
|
metrics.ObserveNewEntry()
|
|
|
|
}
|
|
|
|
}
|