2014-06-17 23:23:52 +00:00
/ *
2015-05-01 16:19:44 +00:00
Copyright 2014 The Kubernetes Authors All rights reserved .
2014-06-17 23:23:52 +00:00
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
2015-07-30 11:27:18 +00:00
package etcd
2014-06-17 23:23:52 +00:00
import (
2014-08-03 17:07:40 +00:00
"errors"
2014-06-17 23:23:52 +00:00
"fmt"
2015-12-10 14:03:59 +00:00
"net"
2015-12-08 20:01:29 +00:00
"net/http"
2015-03-11 17:10:09 +00:00
"path"
2014-06-17 23:23:52 +00:00
"reflect"
2015-03-11 17:10:09 +00:00
"strings"
2015-05-06 11:23:22 +00:00
"time"
2014-06-17 23:23:52 +00:00
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/api"
2015-11-12 10:45:42 +00:00
"k8s.io/kubernetes/pkg/api/meta"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/storage"
2015-11-10 11:23:51 +00:00
"k8s.io/kubernetes/pkg/storage/etcd/metrics"
2015-11-23 19:32:50 +00:00
etcdutil "k8s.io/kubernetes/pkg/storage/etcd/util"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/util"
2016-03-09 06:43:30 +00:00
utilcache "k8s.io/kubernetes/pkg/util/cache"
2016-03-21 07:01:20 +00:00
utilnet "k8s.io/kubernetes/pkg/util/net"
2015-08-05 22:03:47 +00:00
"k8s.io/kubernetes/pkg/watch"
2014-10-27 17:04:39 +00:00
2015-12-10 14:03:59 +00:00
etcd "github.com/coreos/etcd/client"
2016-03-11 16:21:16 +00:00
"github.com/coreos/etcd/pkg/transport"
2014-10-27 17:04:39 +00:00
"github.com/golang/glog"
2015-10-09 14:49:01 +00:00
"golang.org/x/net/context"
2014-06-17 23:23:52 +00:00
)
2015-12-08 20:01:29 +00:00
// storage.Config object for etcd.
2016-03-11 16:21:16 +00:00
type EtcdStorageConfig struct {
Config EtcdConfig
Codec runtime . Codec
2015-12-08 20:01:29 +00:00
}
// implements storage.Config
2016-03-11 16:21:16 +00:00
func ( c * EtcdStorageConfig ) GetType ( ) string {
2015-12-08 20:01:29 +00:00
return "etcd"
}
// implements storage.Config
2016-03-11 16:21:16 +00:00
func ( c * EtcdStorageConfig ) NewStorage ( ) ( storage . Interface , error ) {
etcdClient , err := c . Config . newEtcdClient ( )
if err != nil {
return nil , err
}
return NewEtcdStorage ( etcdClient , c . Codec , c . Config . Prefix , c . Config . Quorum ) , nil
}
// Configuration object for constructing etcd.Config
type EtcdConfig struct {
Prefix string
ServerList [ ] string
KeyFile string
CertFile string
CAFile string
Quorum bool
}
func ( c * EtcdConfig ) newEtcdClient ( ) ( etcd . Client , error ) {
t , err := c . newHttpTransport ( )
if err != nil {
return nil , err
}
cli , err := etcd . New ( etcd . Config {
2015-12-10 14:03:59 +00:00
Endpoints : c . ServerList ,
2016-03-11 16:21:16 +00:00
Transport : t ,
} )
if err != nil {
return nil , err
}
return cli , nil
}
func ( c * EtcdConfig ) newHttpTransport ( ) ( * http . Transport , error ) {
info := transport . TLSInfo {
CertFile : c . CertFile ,
KeyFile : c . KeyFile ,
CAFile : c . CAFile ,
}
cfg , err := info . ClientConfig ( )
2015-12-10 14:03:59 +00:00
if err != nil {
return nil , err
}
2016-03-11 16:21:16 +00:00
// Copied from etcd.DefaultTransport declaration.
// TODO: Determine if transport needs optimization
2016-03-15 19:29:08 +00:00
tr := utilnet . SetTransportDefaults ( & http . Transport {
2016-03-11 16:21:16 +00:00
Proxy : http . ProxyFromEnvironment ,
Dial : ( & net . Dialer {
Timeout : 30 * time . Second ,
KeepAlive : 30 * time . Second ,
} ) . Dial ,
TLSHandshakeTimeout : 10 * time . Second ,
MaxIdleConnsPerHost : 500 ,
TLSClientConfig : cfg ,
2016-03-15 19:29:08 +00:00
} )
2016-03-11 16:21:16 +00:00
return tr , nil
2015-12-08 20:01:29 +00:00
}
// Creates a new storage interface from the client
// TODO: deprecate in favor of storage.Config abstraction over time
2016-01-26 06:17:11 +00:00
func NewEtcdStorage ( client etcd . Client , codec runtime . Codec , prefix string , quorum bool ) storage . Interface {
2015-07-24 11:09:49 +00:00
return & etcdHelper {
2016-03-11 16:21:16 +00:00
etcdMembersAPI : etcd . NewMembersAPI ( client ) ,
etcdKeysAPI : etcd . NewKeysAPI ( client ) ,
codec : codec ,
versioner : APIObjectVersioner { } ,
copier : api . Scheme ,
pathPrefix : path . Join ( "/" , prefix ) ,
quorum : quorum ,
2016-03-09 06:43:30 +00:00
cache : utilcache . NewCache ( maxEtcdCacheEntries ) ,
2015-07-21 09:19:11 +00:00
}
2015-05-08 07:16:24 +00:00
}
2015-07-30 07:45:06 +00:00
// etcdHelper is the reference implementation of storage.Interface.
2015-07-24 11:09:49 +00:00
type etcdHelper struct {
2016-03-11 16:21:16 +00:00
etcdMembersAPI etcd . MembersAPI
etcdKeysAPI etcd . KeysAPI
codec runtime . Codec
copier runtime . ObjectCopier
2016-02-15 10:12:02 +00:00
// Note that versioner is required for etcdHelper to work correctly.
// The public constructors (NewStorage & NewEtcdStorage) are setting it
// correctly, so be careful when manipulating with it manually.
2016-03-11 16:21:16 +00:00
// optional, has to be set to perform any atomic operations
2015-07-30 07:45:06 +00:00
versioner storage . Versioner
2015-03-11 17:10:09 +00:00
// prefix for all etcd keys
2015-07-24 11:09:49 +00:00
pathPrefix string
2016-01-26 06:17:11 +00:00
// if true, perform quorum read
quorum bool
2015-04-24 11:07:32 +00:00
// We cache objects stored in etcd. For keys we use Node.ModifiedIndex which is equivalent
// to resourceVersion.
// This depends on etcd's indexes being globally unique across all objects/types. This will
// have to revisited if we decide to do things like multiple etcd clusters, or etcd will
// support multi-object transaction that will result in many objects with the same index.
// Number of entries stored in the cache is controlled by maxEtcdCacheEntries constant.
// TODO: Measure how much this cache helps after the conversion code is optimized.
2016-03-09 06:43:30 +00:00
cache utilcache . Cache
2015-03-05 05:17:29 +00:00
}
2015-07-21 09:19:11 +00:00
func init ( ) {
metrics . Register ( )
2014-08-11 04:08:06 +00:00
}
2015-08-05 13:39:24 +00:00
// Codec provides access to the underlying codec being used by the implementation.
func ( h * etcdHelper ) Codec ( ) runtime . Codec {
return h . codec
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) Backends ( ctx context . Context ) [ ] string {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2016-03-11 16:21:16 +00:00
members , err := h . etcdMembersAPI . List ( ctx )
2015-12-10 14:03:59 +00:00
if err != nil {
glog . Errorf ( "Error obtaining etcd members list: %q" , err )
return nil
}
2015-12-30 11:22:27 +00:00
mlist := [ ] string { }
2015-12-10 14:03:59 +00:00
for _ , member := range members {
mlist = append ( mlist , member . ClientURLs ... )
}
return mlist
2015-07-24 11:09:49 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
func ( h * etcdHelper ) Versioner ( ) storage . Versioner {
2015-07-24 11:09:49 +00:00
return h . versioner
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) Create ( ctx context . Context , key string , obj , out runtime . Object , ttl uint64 ) error {
2016-01-26 16:14:30 +00:00
trace := util . NewTrace ( "etcdHelper::Create " + getTypeName ( obj ) )
defer trace . LogIfLong ( 250 * time . Millisecond )
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-07-24 07:19:08 +00:00
key = h . prefixEtcdKey ( key )
2016-01-22 05:11:30 +00:00
data , err := runtime . Encode ( h . codec , obj )
2016-01-26 16:14:30 +00:00
trace . Step ( "Object encoded" )
2014-06-17 23:23:52 +00:00
if err != nil {
2015-07-24 07:19:08 +00:00
return err
2015-04-24 11:07:32 +00:00
}
2016-02-15 10:12:02 +00:00
if version , err := h . versioner . ObjectResourceVersion ( obj ) ; err == nil && version != 0 {
return errors . New ( "resourceVersion may not be set on objects to be created" )
2015-04-24 11:07:32 +00:00
}
2016-01-26 16:14:30 +00:00
trace . Step ( "Version checked" )
2015-04-24 11:07:32 +00:00
2015-05-08 07:16:24 +00:00
startTime := time . Now ( )
2015-12-10 14:03:59 +00:00
opts := etcd . SetOptions {
TTL : time . Duration ( ttl ) * time . Second ,
PrevExist : etcd . PrevNoExist ,
}
2016-03-11 16:21:16 +00:00
response , err := h . etcdKeysAPI . Set ( ctx , key , string ( data ) , & opts )
2015-07-24 07:19:08 +00:00
metrics . RecordEtcdRequestLatency ( "create" , getTypeName ( obj ) , startTime )
2016-01-26 16:14:30 +00:00
trace . Step ( "Object created" )
2015-04-24 11:07:32 +00:00
if err != nil {
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , 0 )
2015-04-24 11:07:32 +00:00
}
2015-07-24 07:19:08 +00:00
if out != nil {
if _ , err := conversion . EnforcePtr ( out ) ; err != nil {
panic ( "unable to convert output object to pointer" )
}
_ , _ , err = h . extractObj ( response , err , out , false , false )
2015-05-04 11:20:15 +00:00
}
2015-07-24 07:19:08 +00:00
return err
2015-04-24 11:07:32 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) Set ( ctx context . Context , key string , obj , out runtime . Object , ttl uint64 ) error {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2016-02-09 13:06:52 +00:00
version := uint64 ( 0 )
2016-02-15 10:12:02 +00:00
var err error
if version , err = h . versioner . ObjectResourceVersion ( obj ) ; err != nil {
return errors . New ( "couldn't get resourceVersion from object" )
}
if version != 0 {
// We cannot store object with resourceVersion in etcd, we need to clear it here.
if err := h . versioner . UpdateObject ( obj , nil , 0 ) ; err != nil {
return errors . New ( "resourceVersion cannot be set on objects store in etcd" )
2016-02-09 13:06:52 +00:00
}
}
2015-07-24 07:19:08 +00:00
var response * etcd . Response
2016-01-22 05:11:30 +00:00
data , err := runtime . Encode ( h . codec , obj )
2014-09-23 23:58:08 +00:00
if err != nil {
return err
}
2015-07-21 09:19:11 +00:00
key = h . prefixEtcdKey ( key )
2015-07-24 07:19:08 +00:00
create := true
2016-02-15 10:12:02 +00:00
if version != 0 {
create = false
startTime := time . Now ( )
opts := etcd . SetOptions {
TTL : time . Duration ( ttl ) * time . Second ,
PrevIndex : version ,
}
2016-03-11 16:21:16 +00:00
response , err = h . etcdKeysAPI . Set ( ctx , key , string ( data ) , & opts )
2016-02-15 10:12:02 +00:00
metrics . RecordEtcdRequestLatency ( "compareAndSwap" , getTypeName ( obj ) , startTime )
if err != nil {
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , int64 ( version ) )
2015-04-13 10:33:37 +00:00
}
}
2015-07-24 07:19:08 +00:00
if create {
// Create will fail if a key already exists.
startTime := time . Now ( )
2015-12-10 14:03:59 +00:00
opts := etcd . SetOptions {
TTL : time . Duration ( ttl ) * time . Second ,
PrevExist : etcd . PrevNoExist ,
}
2016-03-11 16:21:16 +00:00
response , err = h . etcdKeysAPI . Set ( ctx , key , string ( data ) , & opts )
2015-12-10 14:03:59 +00:00
if err != nil {
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , 0 )
2015-12-10 14:03:59 +00:00
}
2015-07-24 07:19:08 +00:00
metrics . RecordEtcdRequestLatency ( "create" , getTypeName ( obj ) , startTime )
}
2015-04-13 10:33:37 +00:00
2015-07-24 07:19:08 +00:00
if out != nil {
if _ , err := conversion . EnforcePtr ( out ) ; err != nil {
panic ( "unable to convert output object to pointer" )
2015-04-16 22:07:11 +00:00
}
2015-07-24 07:19:08 +00:00
_ , _ , err = h . extractObj ( response , err , out , false , false )
2015-04-13 10:33:37 +00:00
}
2015-07-24 07:19:08 +00:00
return err
}
2015-04-13 10:33:37 +00:00
2016-03-21 06:15:00 +00:00
func checkPreconditions ( preconditions * storage . Preconditions , out runtime . Object ) error {
if preconditions == nil {
return nil
}
objMeta , err := api . ObjectMetaFor ( out )
if err != nil {
return storage . NewInternalErrorf ( "can't enforce preconditions %v on un-introspectable object %v, got error: %v" , * preconditions , out , err )
}
if preconditions . UID != nil && * preconditions . UID != objMeta . UID {
return etcd . Error { Code : etcd . ErrorCodeTestFailed , Message : fmt . Sprintf ( "the UID in the precondition (%s) does not match the UID in record (%s). The object might have been deleted and then recreated" , * preconditions . UID , objMeta . UID ) }
}
return nil
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2016-03-21 06:15:00 +00:00
func ( h * etcdHelper ) Delete ( ctx context . Context , key string , out runtime . Object , preconditions * storage . Preconditions ) error {
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-07-24 07:19:08 +00:00
key = h . prefixEtcdKey ( key )
2016-03-21 06:15:00 +00:00
v , err := conversion . EnforcePtr ( out )
if err != nil {
2015-07-24 07:19:08 +00:00
panic ( "unable to convert output object to pointer" )
2015-04-13 10:33:37 +00:00
}
2015-07-24 07:19:08 +00:00
2016-03-21 06:15:00 +00:00
if preconditions == nil {
startTime := time . Now ( )
response , err := h . etcdKeysAPI . Delete ( ctx , key , nil )
metrics . RecordEtcdRequestLatency ( "delete" , getTypeName ( out ) , startTime )
if ! etcdutil . IsEtcdNotFound ( err ) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response . PrevNode != nil {
_ , _ , err = h . extractObj ( response , err , out , false , true )
}
}
return toStorageErr ( err , key , 0 )
}
// Check the preconditions match.
obj := reflect . New ( v . Type ( ) ) . Interface ( ) . ( runtime . Object )
for {
_ , node , res , err := h . bodyAndExtractObj ( ctx , key , obj , false )
if err != nil {
return toStorageErr ( err , key , 0 )
}
if err := checkPreconditions ( preconditions , obj ) ; err != nil {
return toStorageErr ( err , key , 0 )
}
index := uint64 ( 0 )
if node != nil {
index = node . ModifiedIndex
} else if res != nil {
index = res . Index
}
opt := etcd . DeleteOptions { PrevIndex : index }
startTime := time . Now ( )
response , err := h . etcdKeysAPI . Delete ( ctx , key , & opt )
metrics . RecordEtcdRequestLatency ( "delete" , getTypeName ( out ) , startTime )
if etcdutil . IsEtcdTestFailed ( err ) {
glog . Infof ( "deletion of %s failed because of a conflict, going to retry" , key )
} else {
if ! etcdutil . IsEtcdNotFound ( err ) {
// if the object that existed prior to the delete is returned by etcd, update the out object.
if err != nil || response . PrevNode != nil {
_ , _ , err = h . extractObj ( response , err , out , false , true )
}
}
return toStorageErr ( err , key , 0 )
2014-09-23 23:58:08 +00:00
}
}
2015-07-24 07:19:08 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-12-04 08:58:24 +00:00
func ( h * etcdHelper ) Watch ( ctx context . Context , key string , resourceVersion string , filter storage . FilterFunc ) ( watch . Interface , error ) {
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-12-04 08:58:24 +00:00
watchRV , err := storage . ParseWatchResourceVersion ( resourceVersion )
if err != nil {
return nil , err
}
2015-07-27 07:54:07 +00:00
key = h . prefixEtcdKey ( key )
2016-01-26 06:17:11 +00:00
w := newEtcdWatcher ( false , h . quorum , nil , filter , h . codec , h . versioner , nil , h )
2016-03-11 16:21:16 +00:00
go w . etcdWatch ( ctx , h . etcdKeysAPI , key , watchRV )
2015-07-27 07:54:07 +00:00
return w , nil
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-12-04 08:58:24 +00:00
func ( h * etcdHelper ) WatchList ( ctx context . Context , key string , resourceVersion string , filter storage . FilterFunc ) ( watch . Interface , error ) {
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-12-04 08:58:24 +00:00
watchRV , err := storage . ParseWatchResourceVersion ( resourceVersion )
if err != nil {
return nil , err
}
2015-07-27 07:54:07 +00:00
key = h . prefixEtcdKey ( key )
2016-01-26 06:17:11 +00:00
w := newEtcdWatcher ( true , h . quorum , exceptKey ( key ) , filter , h . codec , h . versioner , nil , h )
2016-03-11 16:21:16 +00:00
go w . etcdWatch ( ctx , h . etcdKeysAPI , key , watchRV )
2015-07-27 07:54:07 +00:00
return w , nil
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) Get ( ctx context . Context , key string , objPtr runtime . Object , ignoreNotFound bool ) error {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-07-21 09:19:11 +00:00
key = h . prefixEtcdKey ( key )
2015-10-09 14:49:01 +00:00
_ , _ , _ , err := h . bodyAndExtractObj ( ctx , key , objPtr , ignoreNotFound )
2014-07-02 20:51:27 +00:00
return err
2014-06-27 19:54:45 +00:00
}
2014-06-27 17:55:05 +00:00
2015-05-23 00:17:49 +00:00
// bodyAndExtractObj performs the normal Get path to etcd, returning the parsed node and response for additional information
// about the response, like the current etcd index and the ttl.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) bodyAndExtractObj ( ctx context . Context , key string , objPtr runtime . Object , ignoreNotFound bool ) ( body string , node * etcd . Node , res * etcd . Response , err error ) {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-05-06 11:23:22 +00:00
startTime := time . Now ( )
2016-01-26 06:17:11 +00:00
opts := & etcd . GetOptions {
Quorum : h . quorum ,
}
2016-03-11 16:21:16 +00:00
response , err := h . etcdKeysAPI . Get ( ctx , key , opts )
2015-07-20 14:12:24 +00:00
metrics . RecordEtcdRequestLatency ( "get" , getTypeName ( objPtr ) , startTime )
2015-11-23 19:32:50 +00:00
if err != nil && ! etcdutil . IsEtcdNotFound ( err ) {
2016-03-21 20:14:59 +00:00
return "" , nil , nil , toStorageErr ( err , key , 0 )
2014-06-17 23:23:52 +00:00
}
2015-05-23 00:17:49 +00:00
body , node , err = h . extractObj ( response , err , objPtr , ignoreNotFound , false )
2016-03-21 20:14:59 +00:00
return body , node , response , toStorageErr ( err , key , 0 )
2015-02-11 23:35:05 +00:00
}
2015-07-24 11:09:49 +00:00
func ( h * etcdHelper ) extractObj ( response * etcd . Response , inErr error , objPtr runtime . Object , ignoreNotFound , prevNode bool ) ( body string , node * etcd . Node , err error ) {
2015-02-11 23:35:05 +00:00
if response != nil {
if prevNode {
node = response . PrevNode
} else {
node = response . Node
}
}
if inErr != nil || node == nil || len ( node . Value ) == 0 {
2014-06-17 23:23:52 +00:00
if ignoreNotFound {
2014-10-28 08:02:29 +00:00
v , err := conversion . EnforcePtr ( objPtr )
if err != nil {
2015-05-23 00:17:49 +00:00
return "" , nil , err
2014-10-28 08:02:29 +00:00
}
v . Set ( reflect . Zero ( v . Type ( ) ) )
2015-05-23 00:17:49 +00:00
return "" , nil , nil
2015-02-11 23:35:05 +00:00
} else if inErr != nil {
2015-05-23 00:17:49 +00:00
return "" , nil , inErr
2014-06-17 23:23:52 +00:00
}
2015-05-23 00:17:49 +00:00
return "" , nil , fmt . Errorf ( "unable to locate a value on the response: %#v" , response )
2014-06-27 03:24:10 +00:00
}
2015-02-11 23:35:05 +00:00
body = node . Value
2016-01-22 05:11:30 +00:00
out , gvk , err := h . codec . Decode ( [ ] byte ( body ) , nil , objPtr )
if err != nil {
return body , nil , err
}
if out != objPtr {
return body , nil , fmt . Errorf ( "unable to decode object %s into %v" , gvk . String ( ) , reflect . TypeOf ( objPtr ) )
}
2016-02-15 10:12:02 +00:00
// being unable to set the version does not prevent the object from being extracted
_ = h . versioner . UpdateObject ( objPtr , node . Expiration , node . ModifiedIndex )
2015-05-23 00:17:49 +00:00
return body , node , err
2014-06-27 03:24:10 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) GetToList ( ctx context . Context , key string , filter storage . FilterFunc , listObj runtime . Object ) error {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-07-27 09:59:09 +00:00
trace := util . NewTrace ( "GetToList " + getTypeName ( listObj ) )
2015-11-12 10:45:42 +00:00
listPtr , err := meta . GetItemsPtr ( listObj )
2015-02-11 23:35:05 +00:00
if err != nil {
return err
}
2015-07-24 07:19:08 +00:00
key = h . prefixEtcdKey ( key )
startTime := time . Now ( )
trace . Step ( "About to read etcd node" )
2016-01-26 06:17:11 +00:00
opts := & etcd . GetOptions {
Quorum : h . quorum ,
}
2016-03-11 16:21:16 +00:00
response , err := h . etcdKeysAPI . Get ( ctx , key , opts )
2016-01-26 06:17:11 +00:00
2015-07-24 07:19:08 +00:00
metrics . RecordEtcdRequestLatency ( "get" , getTypeName ( listPtr ) , startTime )
trace . Step ( "Etcd node read" )
if err != nil {
2015-11-23 19:32:50 +00:00
if etcdutil . IsEtcdNotFound ( err ) {
2015-07-24 07:19:08 +00:00
return nil
2015-02-11 23:35:05 +00:00
}
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , 0 )
2015-02-11 23:35:05 +00:00
}
2015-03-11 17:10:09 +00:00
2015-07-24 07:19:08 +00:00
nodes := make ( [ ] * etcd . Node , 0 )
nodes = append ( nodes , response . Node )
2015-09-09 10:35:44 +00:00
if err := h . decodeNodeList ( nodes , filter , listPtr ) ; err != nil {
2015-02-11 23:35:05 +00:00
return err
}
2015-07-24 07:19:08 +00:00
trace . Step ( "Object decoded" )
2016-02-15 10:12:02 +00:00
if err := h . versioner . UpdateList ( listObj , response . Index ) ; err != nil {
return err
2015-02-11 23:35:05 +00:00
}
2015-07-24 07:19:08 +00:00
return nil
2014-08-06 02:23:22 +00:00
}
2015-07-24 07:19:08 +00:00
// decodeNodeList walks the tree of each node in the list and decodes into the specified object
2015-09-09 10:35:44 +00:00
func ( h * etcdHelper ) decodeNodeList ( nodes [ ] * etcd . Node , filter storage . FilterFunc , slicePtr interface { } ) error {
2015-07-24 07:19:08 +00:00
trace := util . NewTrace ( "decodeNodeList " + getTypeName ( slicePtr ) )
defer trace . LogIfLong ( 500 * time . Millisecond )
v , err := conversion . EnforcePtr ( slicePtr )
if err != nil || v . Kind ( ) != reflect . Slice {
// This should not happen at runtime.
panic ( "need ptr to slice" )
2015-02-11 23:35:05 +00:00
}
2015-07-24 07:19:08 +00:00
for _ , node := range nodes {
if node . Dir {
trace . Step ( "Decoding dir " + node . Key + " START" )
2015-09-09 10:35:44 +00:00
if err := h . decodeNodeList ( node . Nodes , filter , slicePtr ) ; err != nil {
2015-07-24 07:19:08 +00:00
return err
}
trace . Step ( "Decoding dir " + node . Key + " END" )
continue
}
2015-10-14 11:17:00 +00:00
if obj , found := h . getFromCache ( node . ModifiedIndex , filter ) ; found {
// obj != nil iff it matches the filter function.
if obj != nil {
2015-09-09 10:35:44 +00:00
v . Set ( reflect . Append ( v , reflect . ValueOf ( obj ) . Elem ( ) ) )
}
2015-07-24 07:19:08 +00:00
} else {
2016-01-22 05:11:30 +00:00
obj , _ , err := h . codec . Decode ( [ ] byte ( node . Value ) , nil , reflect . New ( v . Type ( ) . Elem ( ) ) . Interface ( ) . ( runtime . Object ) )
if err != nil {
2015-07-24 07:19:08 +00:00
return err
}
2016-02-15 10:12:02 +00:00
// being unable to set the version does not prevent the object from being extracted
_ = h . versioner . UpdateObject ( obj , node . Expiration , node . ModifiedIndex )
2016-01-22 05:11:30 +00:00
if filter ( obj ) {
v . Set ( reflect . Append ( v , reflect . ValueOf ( obj ) . Elem ( ) ) )
2015-09-09 10:35:44 +00:00
}
2015-07-24 07:19:08 +00:00
if node . ModifiedIndex != 0 {
2016-01-22 05:11:30 +00:00
h . addToCache ( node . ModifiedIndex , obj )
2015-07-24 07:19:08 +00:00
}
2015-02-11 23:35:05 +00:00
}
}
2015-07-24 07:19:08 +00:00
trace . Step ( fmt . Sprintf ( "Decoded %v nodes" , len ( nodes ) ) )
return nil
2015-02-11 23:35:05 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2015-12-04 08:58:24 +00:00
func ( h * etcdHelper ) List ( ctx context . Context , key string , resourceVersion string , filter storage . FilterFunc , listObj runtime . Object ) error {
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-07-27 09:59:09 +00:00
trace := util . NewTrace ( "List " + getTypeName ( listObj ) )
2015-07-24 07:19:08 +00:00
defer trace . LogIfLong ( time . Second )
2015-11-12 10:45:42 +00:00
listPtr , err := meta . GetItemsPtr ( listObj )
2014-06-27 03:24:10 +00:00
if err != nil {
return err
2014-06-17 23:23:52 +00:00
}
2015-07-21 09:19:11 +00:00
key = h . prefixEtcdKey ( key )
2015-07-24 07:19:08 +00:00
startTime := time . Now ( )
trace . Step ( "About to list etcd node" )
2015-10-09 14:49:01 +00:00
nodes , index , err := h . listEtcdNode ( ctx , key )
2015-07-24 07:19:08 +00:00
metrics . RecordEtcdRequestLatency ( "list" , getTypeName ( listPtr ) , startTime )
trace . Step ( "Etcd node listed" )
if err != nil {
return err
}
2015-09-09 10:35:44 +00:00
if err := h . decodeNodeList ( nodes , filter , listPtr ) ; err != nil {
2015-07-24 07:19:08 +00:00
return err
}
trace . Step ( "Node list decoded" )
2016-02-15 10:12:02 +00:00
if err := h . versioner . UpdateList ( listObj , index ) ; err != nil {
return err
2015-02-27 14:59:49 +00:00
}
2015-07-24 07:19:08 +00:00
return nil
}
2015-02-27 14:59:49 +00:00
2015-10-09 14:49:01 +00:00
func ( h * etcdHelper ) listEtcdNode ( ctx context . Context , key string ) ( [ ] * etcd . Node , uint64 , error ) {
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2015-12-10 14:03:59 +00:00
opts := etcd . GetOptions {
Recursive : true ,
Sort : true ,
2016-01-26 06:17:11 +00:00
Quorum : h . quorum ,
2015-12-10 14:03:59 +00:00
}
2016-03-11 16:21:16 +00:00
result , err := h . etcdKeysAPI . Get ( ctx , key , & opts )
2015-02-27 14:59:49 +00:00
if err != nil {
2015-11-23 19:32:50 +00:00
var index uint64
2015-12-10 14:03:59 +00:00
if etcdError , ok := err . ( etcd . Error ) ; ok {
2015-11-23 19:32:50 +00:00
index = etcdError . Index
2015-07-24 07:19:08 +00:00
}
nodes := make ( [ ] * etcd . Node , 0 )
2015-11-23 19:32:50 +00:00
if etcdutil . IsEtcdNotFound ( err ) {
2015-07-24 07:19:08 +00:00
return nodes , index , nil
} else {
2016-03-21 20:14:59 +00:00
return nodes , index , toStorageErr ( err , key , 0 )
2014-08-04 00:23:56 +00:00
}
2014-06-30 19:00:14 +00:00
}
2015-12-10 14:03:59 +00:00
return result . Node . Nodes , result . Index , nil
2014-06-17 23:23:52 +00:00
}
2015-07-30 07:45:06 +00:00
// Implements storage.Interface.
2016-03-21 06:15:00 +00:00
func ( h * etcdHelper ) GuaranteedUpdate ( ctx context . Context , key string , ptrToType runtime . Object , ignoreNotFound bool , preconditions * storage . Preconditions , tryUpdate storage . UpdateFunc ) error {
2015-10-09 14:49:01 +00:00
if ctx == nil {
glog . Errorf ( "Context is nil" )
}
2014-10-28 08:02:29 +00:00
v , err := conversion . EnforcePtr ( ptrToType )
if err != nil {
2014-06-27 22:02:06 +00:00
// Panic is appropriate, because this is a programming error.
panic ( "need ptr to type" )
}
2015-07-21 09:19:11 +00:00
key = h . prefixEtcdKey ( key )
2014-06-27 17:55:05 +00:00
for {
2014-10-28 08:02:29 +00:00
obj := reflect . New ( v . Type ( ) ) . Interface ( ) . ( runtime . Object )
2015-10-09 14:49:01 +00:00
origBody , node , res , err := h . bodyAndExtractObj ( ctx , key , obj , ignoreNotFound )
2014-06-27 17:55:05 +00:00
if err != nil {
2016-03-21 06:15:00 +00:00
return toStorageErr ( err , key , 0 )
}
if err := checkPreconditions ( preconditions , obj ) ; err != nil {
return toStorageErr ( err , key , 0 )
2014-06-27 17:55:05 +00:00
}
2015-07-30 07:27:38 +00:00
meta := storage . ResponseMeta { }
2015-05-23 00:17:49 +00:00
if node != nil {
meta . TTL = node . TTL
2015-06-19 00:42:01 +00:00
if node . Expiration != nil {
meta . Expiration = node . Expiration
}
meta . ResourceVersion = node . ModifiedIndex
2015-05-23 00:17:49 +00:00
}
2015-06-19 00:42:01 +00:00
// Get the object to be written by calling tryUpdate.
2015-05-23 00:17:49 +00:00
ret , newTTL , err := tryUpdate ( obj , meta )
2014-06-27 17:55:05 +00:00
if err != nil {
2016-03-21 06:15:00 +00:00
return toStorageErr ( err , key , 0 )
2014-06-27 17:55:05 +00:00
}
2015-05-23 00:17:49 +00:00
index := uint64 ( 0 )
ttl := uint64 ( 0 )
if node != nil {
index = node . ModifiedIndex
2015-08-19 23:59:43 +00:00
if node . TTL != 0 {
2015-05-23 00:17:49 +00:00
ttl = uint64 ( node . TTL )
}
2015-08-19 23:59:43 +00:00
if node . Expiration != nil && ttl == 0 {
ttl = 1
}
2015-05-23 00:17:49 +00:00
} else if res != nil {
2015-12-10 14:03:59 +00:00
index = res . Index
2015-05-23 00:17:49 +00:00
}
if newTTL != nil {
2015-08-19 23:59:43 +00:00
if ttl != 0 && * newTTL == 0 {
// TODO: remove this after we have verified this is no longer an issue
glog . V ( 4 ) . Infof ( "GuaranteedUpdate is clearing TTL for %q, may not be intentional" , key )
}
2015-05-23 00:17:49 +00:00
ttl = * newTTL
}
2016-02-09 13:06:52 +00:00
// Since update object may have a resourceVersion set, we need to clear it here.
2016-02-15 10:12:02 +00:00
if err := h . versioner . UpdateObject ( ret , meta . Expiration , 0 ) ; err != nil {
return errors . New ( "resourceVersion cannot be set on objects store in etcd" )
2016-02-09 13:06:52 +00:00
}
2016-01-22 05:11:30 +00:00
data , err := runtime . Encode ( h . codec , ret )
2014-08-05 18:34:00 +00:00
if err != nil {
2014-07-30 10:05:10 +00:00
return err
}
2014-08-05 18:43:19 +00:00
// First time this key has been used, try creating new value.
if index == 0 {
2015-05-06 11:23:22 +00:00
startTime := time . Now ( )
2015-12-10 14:03:59 +00:00
opts := etcd . SetOptions {
TTL : time . Duration ( ttl ) * time . Second ,
PrevExist : etcd . PrevNoExist ,
}
2016-03-11 16:21:16 +00:00
response , err := h . etcdKeysAPI . Set ( ctx , key , string ( data ) , & opts )
2015-07-20 14:12:24 +00:00
metrics . RecordEtcdRequestLatency ( "create" , getTypeName ( ptrToType ) , startTime )
2015-11-23 19:32:50 +00:00
if etcdutil . IsEtcdNodeExist ( err ) {
2014-08-05 18:43:19 +00:00
continue
}
2015-03-03 21:16:50 +00:00
_ , _ , err = h . extractObj ( response , err , ptrToType , false , false )
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , 0 )
2014-08-05 18:43:19 +00:00
}
2014-08-14 19:51:20 +00:00
if string ( data ) == origBody {
2016-02-09 13:06:52 +00:00
// If we don't send an update, we simply return the currently existing
// version of the object.
_ , _ , err := h . extractObj ( res , nil , ptrToType , ignoreNotFound , false )
return err
2014-08-14 19:51:20 +00:00
}
2015-05-06 11:23:22 +00:00
startTime := time . Now ( )
2015-06-19 00:42:01 +00:00
// Swap origBody with data, if origBody is the latest etcd data.
2015-12-10 14:03:59 +00:00
opts := etcd . SetOptions {
PrevValue : origBody ,
PrevIndex : index ,
TTL : time . Duration ( ttl ) * time . Second ,
}
2016-03-11 16:21:16 +00:00
response , err := h . etcdKeysAPI . Set ( ctx , key , string ( data ) , & opts )
2015-07-20 14:12:24 +00:00
metrics . RecordEtcdRequestLatency ( "compareAndSwap" , getTypeName ( ptrToType ) , startTime )
2015-11-23 19:32:50 +00:00
if etcdutil . IsEtcdTestFailed ( err ) {
2015-06-19 00:42:01 +00:00
// Try again.
2014-06-27 19:54:45 +00:00
continue
}
2015-02-11 23:35:05 +00:00
_ , _ , err = h . extractObj ( response , err , ptrToType , false , false )
2016-03-21 20:14:59 +00:00
return toStorageErr ( err , key , int64 ( index ) )
2014-06-17 23:23:52 +00:00
}
}
2014-10-27 17:04:39 +00:00
2015-07-24 11:09:49 +00:00
func ( h * etcdHelper ) prefixEtcdKey ( key string ) string {
2015-10-27 20:35:07 +00:00
if strings . HasPrefix ( key , h . pathPrefix ) {
2015-03-11 17:10:09 +00:00
return key
}
2015-10-27 20:35:07 +00:00
return path . Join ( h . pathPrefix , key )
2015-03-11 17:10:09 +00:00
}
2015-07-24 07:19:08 +00:00
// etcdCache defines interface used for caching objects stored in etcd. Objects are keyed by
// their Node.ModifiedIndex, which is unique across all types.
// All implementations must be thread-safe.
type etcdCache interface {
2015-10-14 11:17:00 +00:00
getFromCache ( index uint64 , filter storage . FilterFunc ) ( runtime . Object , bool )
2015-07-24 07:19:08 +00:00
addToCache ( index uint64 , obj runtime . Object )
}
const maxEtcdCacheEntries int = 50000
func getTypeName ( obj interface { } ) string {
return reflect . TypeOf ( obj ) . String ( )
}
2015-10-14 11:17:00 +00:00
func ( h * etcdHelper ) getFromCache ( index uint64 , filter storage . FilterFunc ) ( runtime . Object , bool ) {
2015-07-24 07:19:08 +00:00
startTime := time . Now ( )
defer func ( ) {
metrics . ObserveGetCache ( startTime )
} ( )
obj , found := h . cache . Get ( index )
if found {
2015-10-14 11:17:00 +00:00
if ! filter ( obj . ( runtime . Object ) ) {
return nil , true
}
2015-08-08 21:29:57 +00:00
// We should not return the object itself to avoid polluting the cache if someone
2015-07-24 07:19:08 +00:00
// modifies returned values.
2015-07-24 11:09:49 +00:00
objCopy , err := h . copier . Copy ( obj . ( runtime . Object ) )
2015-07-24 07:19:08 +00:00
if err != nil {
glog . Errorf ( "Error during DeepCopy of cached object: %q" , err )
2015-10-14 11:17:00 +00:00
// We can't return a copy, thus we report the object as not found.
2015-07-24 07:19:08 +00:00
return nil , false
}
metrics . ObserveCacheHit ( )
return objCopy . ( runtime . Object ) , true
}
metrics . ObserveCacheMiss ( )
return nil , false
}
2015-07-24 11:09:49 +00:00
func ( h * etcdHelper ) addToCache ( index uint64 , obj runtime . Object ) {
2015-07-24 07:19:08 +00:00
startTime := time . Now ( )
defer func ( ) {
metrics . ObserveAddCache ( startTime )
} ( )
2015-07-24 11:09:49 +00:00
objCopy , err := h . copier . Copy ( obj )
2015-07-24 07:19:08 +00:00
if err != nil {
glog . Errorf ( "Error during DeepCopy of cached object: %q" , err )
return
}
isOverwrite := h . cache . Add ( index , objCopy )
if ! isOverwrite {
metrics . ObserveNewEntry ( )
}
}
2016-03-21 20:14:59 +00:00
func toStorageErr ( err error , key string , rv int64 ) error {
if err == nil {
return nil
}
switch {
case etcdutil . IsEtcdNotFound ( err ) :
return storage . NewKeyNotFoundError ( key , rv )
case etcdutil . IsEtcdNodeExist ( err ) :
return storage . NewKeyExistsError ( key , rv )
case etcdutil . IsEtcdTestFailed ( err ) :
return storage . NewResourceVersionConflictsError ( key , rv )
case etcdutil . IsEtcdUnreachable ( err ) :
return storage . NewUnreachableError ( key , rv )
default :
return err
}
}