diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index f591b70da1..bb9c84f28c 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -86,7 +86,6 @@ type APIServer struct { EtcdServerList util.StringList EtcdConfigFile string EtcdPathPrefix string - OldEtcdPathPrefix string CorsAllowedOriginList util.StringList AllowPrivileged bool ServiceClusterIPRange util.IPNet // TODO: make this a list @@ -187,7 +186,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) { fs.Var(&s.EtcdServerList, "etcd-servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config") fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.") fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.") - fs.StringVar(&s.OldEtcdPathPrefix, "old-etcd-prefix", s.OldEtcdPathPrefix, "The previous prefix for all resource paths in etcd, if any.") fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.") fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.") fs.Var(&s.ServiceClusterIPRange, "service-cluster-ip-range", "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.") @@ -305,14 +303,6 @@ func (s *APIServer) Run(_ []string) error { glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err) } - // TODO Is this the right place for migration to happen? Must *both* old and - // new etcd prefix params be supplied for this to be valid? - if s.OldEtcdPathPrefix != "" { - if err = helper.MigrateKeys(s.OldEtcdPathPrefix); err != nil { - glog.Fatalf("Migration of old etcd keys failed: %v", err) - } - } - n := net.IPNet(s.ServiceClusterIPRange) // Default to the private server key for service account token signing diff --git a/pkg/tools/etcd_helper.go b/pkg/tools/etcd_helper.go index 17ff1af533..24aaf876d7 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/tools/etcd_helper.go @@ -17,12 +17,8 @@ limitations under the License. package tools import ( - "encoding/json" "errors" "fmt" - "io/ioutil" - "net/http" - "os/exec" "path" "reflect" "strings" @@ -38,22 +34,24 @@ import ( "github.com/golang/glog" ) -const maxEtcdCacheEntries int = 50000 - -func init() { - metrics.Register() +func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper { + return EtcdHelper{ + Client: client, + Codec: codec, + Versioner: APIObjectVersioner{}, + Copier: api.Scheme, + PathPrefix: prefix, + cache: util.NewCache(maxEtcdCacheEntries), + } } -func getTypeName(obj interface{}) string { - return reflect.TypeOf(obj).String() -} - -// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client. +// EtcdHelper is the reference implementation of StorageInterface. +// TODO(wojtekt): Make it private and expose only StorageInterface to outside world. type EtcdHelper struct { Client EtcdClient Codec runtime.Codec Copier runtime.ObjectCopier - // optional, no atomic operations can be performed without this interface + // optional, has to be set to perform any atomic operations Versioner EtcdVersioner // prefix for all etcd keys PathPrefix string @@ -68,53 +66,8 @@ type EtcdHelper struct { cache util.Cache } -// NewEtcdHelper creates a helper that works against objects that use the internal -// Kubernetes API objects. -// TODO: Refactor to take a runtiem.ObjectCopier -func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper { - return EtcdHelper{ - Client: client, - Codec: codec, - Versioner: APIObjectVersioner{}, - Copier: api.Scheme, - PathPrefix: prefix, - cache: util.NewCache(maxEtcdCacheEntries), - } -} - -// IsEtcdNotFound returns true iff err is an etcd not found error. -func IsEtcdNotFound(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNotFound) -} - -// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. -func IsEtcdNodeExist(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) -} - -// IsEtcdTestFailed returns true iff err is an etcd write conflict. -func IsEtcdTestFailed(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) -} - -// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. -func IsEtcdWatchStoppedByUser(err error) bool { - return etcd.ErrWatchStoppedByUser == err -} - -// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode -func isEtcdErrorNum(err error, errorCode int) bool { - etcdError, ok := err.(*etcd.EtcdError) - return ok && etcdError != nil && etcdError.ErrorCode == errorCode -} - -// etcdErrorIndex returns the index associated with the error message and whether the -// index was available. -func etcdErrorIndex(err error) (uint64, bool) { - if etcdError, ok := err.(*etcd.EtcdError); ok { - return etcdError.Index, true - } - return 0, false +func init() { + metrics.Register() } func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { @@ -181,6 +134,12 @@ type etcdCache interface { addToCache(index uint64, obj runtime.Object) } +const maxEtcdCacheEntries int = 50000 + +func getTypeName(obj interface{}) string { + return reflect.TypeOf(obj).String() +} + func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) { startTime := time.Now() defer func() { @@ -218,8 +177,7 @@ func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) { } } -// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList -// definition) and extracts a go object per etcd node into a slice with the resource version. +// Implements StorageInterface. func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { trace := util.NewTrace("ExtractToList " + getTypeName(listObj)) defer trace.LogIfLong(time.Second) @@ -227,7 +185,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { if err != nil { return err } - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to list etcd node") nodes, index, err := h.listEtcdNode(key) @@ -248,15 +206,14 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error { return nil } -// ExtractObjToList unmarshals json found at key and opaques it into a *List api object -// (an object that satisfies the runtime.IsList definition). +// Implements StorageInterface. func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error { trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj)) listPtr, err := runtime.GetItemsPtr(listObj) if err != nil { return err } - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) startTime := time.Now() trace.Step("About to read etcd node") response, err := h.Client.Get(key, false, false) @@ -284,11 +241,9 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error return nil } -// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return -// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats -// empty responses and nil response nodes exactly like a not found error. +// Implements StorageInterface. func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) _, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) return err } @@ -337,11 +292,10 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run return body, node, err } -// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds, -// and 0 means forever. If no error is returned and out is not nil, out will be set to the read value -// from etcd. + +// Implements StorageInterface. func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) data, err := h.Codec.Encode(obj) if err != nil { return err @@ -367,18 +321,18 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) return err } -// Delete removes the specified key. +// Implements StorageInterface. func (h *EtcdHelper) Delete(key string, recursive bool) error { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) startTime := time.Now() _, err := h.Client.Delete(key, recursive) metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime) return err } -// DeleteObj removes the specified key and returns the value that existed at that spot. +// Implements StorageInterface. func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) if _, err := conversion.EnforcePtr(out); err != nil { panic("unable to convert output object to pointer") } @@ -395,16 +349,14 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error { return err } -// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion -// field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is -// not nil, out will be set to the read value from etcd. +// Implements StorageInterface. func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error { var response *etcd.Response data, err := h.Codec.Encode(obj) if err != nil { return err } - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) create := true if h.Versioner != nil { @@ -438,63 +390,24 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err return err } -// ResponseMeta contains information about the etcd metadata that is associated with -// an object. It abstracts the actual underlying objects to prevent coupling with etcd -// and to improve testability. -type ResponseMeta struct { - // TTL is the time to live of the node that contained the returned object. It may be - // zero or negative in some cases (objects may be expired after the requested - // expiration time due to server lag). - TTL int64 - // Expiration is the time at which the node that contained the returned object will expire and be deleted. - // This can be nil if there is no expiration time set for the node. - Expiration *time.Time - // The resource version of the node that contained the returned object. - ResourceVersion uint64 -} - -// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed. -// See the comment for GuaranteedUpdate for more detail. -type EtcdUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error) // SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc -func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc { +func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc { return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { out, err := fn(input) return out, nil, err } } -// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps -// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object -// passed to tryUpdate() may change across invocations of tryUpdate() if other writers are simultaneously -// updating it, so tryUpdate() needs to take into account the current contents of the object when -// deciding how the updated object (that it returns) should look. -// -// Example: -// -// h := &util.EtcdHelper{client, encoding, versioning} -// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { -// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey". -// -// cur := input.(*MyType) // Guaranteed to succeed. -// -// // Make a *modification*. -// cur.Counter++ -// -// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter -// // the TTL on the object, or nil to keep it the same value. -// return cur, nil, nil -// }) -// -func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error { +// Implements StorageInterface. +func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error { v, err := conversion.EnforcePtr(ptrToType) if err != nil { // Panic is appropriate, because this is a programming error. panic("need ptr to type") } - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) for { obj := reflect.New(v.Type()).Interface().(runtime.Object) origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound) @@ -564,132 +477,10 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno } } -func (h *EtcdHelper) PrefixEtcdKey(key string) string { +func (h *EtcdHelper) prefixEtcdKey(key string) string { if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) { return key } return path.Join("/", h.PathPrefix, key) } -// Copies the key-value pairs from their old location to a new location based -// on this helper's etcd prefix. All old keys without the prefix are then deleted. -func (h *EtcdHelper) MigrateKeys(oldPathPrefix string) error { - // Check to see if a migration is necessary, i.e. is the oldPrefix different - // from the newPrefix? - if h.PathPrefix == oldPathPrefix { - return nil - } - - // Get the root node - response, err := h.Client.Get(oldPathPrefix, false, true) - if err != nil { - glog.Infof("Couldn't get the existing etcd root node.") - return err - } - - // Perform the migration - if err = h.migrateChildren(response.Node, oldPathPrefix); err != nil { - glog.Infof("Error performing the migration.") - return err - } - - // Delete the old top-level entry recursively - // Quick sanity check: Did the process at least create a new top-level entry? - if _, err = h.Client.Get(h.PathPrefix, false, false); err != nil { - glog.Infof("Couldn't get the new etcd root node.") - return err - } else { - if _, err = h.Client.Delete(oldPathPrefix, true); err != nil { - glog.Infof("Couldn't delete the old etcd root node.") - return err - } - } - return nil -} - -// This recurses through the etcd registry. Each key-value pair is copied with -// to a new pair with a prefixed key. -func (h *EtcdHelper) migrateChildren(parent *etcd.Node, oldPathPrefix string) error { - for _, child := range parent.Nodes { - if child.Dir && len(child.Nodes) > 0 { - // Descend into this directory - h.migrateChildren(child, oldPathPrefix) - - // All children have been migrated, so this directory has - // already been automatically added. - continue - } - - // Check if already prefixed (maybe we got interrupted in last attempt) - if strings.HasPrefix(child.Key, h.PathPrefix) { - // Skip this iteration - continue - } - - // Create new entry - newKey := path.Join("/", h.PathPrefix, strings.TrimPrefix(child.Key, oldPathPrefix)) - if _, err := h.Client.Create(newKey, child.Value, 0); err != nil { - // Assuming etcd is still available, this is due to the key - // already existing, in which case we can skip. - continue - } - } - return nil -} - -// GetEtcdVersion performs a version check against the provided Etcd server, -// returning the string response, and error (if any). -func GetEtcdVersion(host string) (string, error) { - response, err := http.Get(host + "/version") - if err != nil { - return "", err - } - defer response.Body.Close() - if response.StatusCode != http.StatusOK { - return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err) - } - versionBytes, err := ioutil.ReadAll(response.Body) - if err != nil { - return "", err - } - return string(versionBytes), nil -} - -func startEtcd() (*exec.Cmd, error) { - cmd := exec.Command("etcd") - err := cmd.Start() - if err != nil { - return nil, err - } - return cmd, nil -} - -func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) { - _, err := GetEtcdVersion(server) - if err != nil { - glog.Infof("Failed to find etcd, attempting to start.") - _, err := startEtcd() - if err != nil { - return nil, err - } - } - - servers := []string{server} - return etcd.NewClient(servers), nil -} - -type etcdHealth struct { - // Note this has to be public so the json library can modify it. - Health string `json:health` -} - -func EtcdHealthCheck(data []byte) error { - obj := etcdHealth{} - if err := json.Unmarshal(data, &obj); err != nil { - return err - } - if obj.Health != "true" { - return fmt.Errorf("Unhealthy status: %s", obj.Health) - } - return nil -} diff --git a/pkg/tools/etcd_helper_test.go b/pkg/tools/etcd_helper_test.go index 95beb529ea..00cd140abd 100644 --- a/pkg/tools/etcd_helper_test.go +++ b/pkg/tools/etcd_helper_test.go @@ -846,13 +846,13 @@ func TestPrefixEtcdKey(t *testing.T) { // Verify prefix is added keyBefore := baseKey - keyAfter := helper.PrefixEtcdKey(keyBefore) + keyAfter := helper.prefixEtcdKey(keyBefore) assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper") // Verify prefix is not added keyBefore = path.Join(prefix, baseKey) - keyAfter = helper.PrefixEtcdKey(keyBefore) + keyAfter = helper.prefixEtcdKey(keyBefore) assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper") } diff --git a/pkg/tools/etcd_helper_watch.go b/pkg/tools/etcd_helper_watch.go index deca3b4ef0..317dfaba9d 100644 --- a/pkg/tools/etcd_helper_watch.go +++ b/pkg/tools/etcd_helper_watch.go @@ -71,7 +71,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { // watch.Interface. resourceVersion may be used to specify what version to begin // watching (e.g., for reconnecting without missing any updates). func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil @@ -81,7 +81,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter // API objects and sent down the returned watch.Interface. // Errors will be sent down the channel. func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { - key = h.PrefixEtcdKey(key) + key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h) go w.etcdWatch(h.Client, key, resourceVersion) return w, nil @@ -103,12 +103,12 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc // }) // // Errors will be sent down the channel. -func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { - key = h.PrefixEtcdKey(key) +/*func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface { + key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h) go w.etcdWatch(h.Client, key, resourceVersion) return w -} +}*/ // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) diff --git a/pkg/tools/etcd_util.go b/pkg/tools/etcd_util.go new file mode 100644 index 0000000000..ebcab56ded --- /dev/null +++ b/pkg/tools/etcd_util.go @@ -0,0 +1,121 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package tools + +import ( + "encoding/json" + "fmt" + "io/ioutil" + "net/http" + "os/exec" + + "github.com/coreos/go-etcd/etcd" + + "github.com/golang/glog" +) + +// IsEtcdNotFound returns true iff err is an etcd not found error. +func IsEtcdNotFound(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeNotFound) +} + +// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. +func IsEtcdNodeExist(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) +} + +// IsEtcdTestFailed returns true iff err is an etcd write conflict. +func IsEtcdTestFailed(err error) bool { + return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) +} + +// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. +func IsEtcdWatchStoppedByUser(err error) bool { + return etcd.ErrWatchStoppedByUser == err +} + +// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode +func isEtcdErrorNum(err error, errorCode int) bool { + etcdError, ok := err.(*etcd.EtcdError) + return ok && etcdError != nil && etcdError.ErrorCode == errorCode +} + +// etcdErrorIndex returns the index associated with the error message and whether the +// index was available. +func etcdErrorIndex(err error) (uint64, bool) { + if etcdError, ok := err.(*etcd.EtcdError); ok { + return etcdError.Index, true + } + return 0, false +} + +// GetEtcdVersion performs a version check against the provided Etcd server, +// returning the string response, and error (if any). +func GetEtcdVersion(host string) (string, error) { + response, err := http.Get(host + "/version") + if err != nil { + return "", err + } + defer response.Body.Close() + if response.StatusCode != http.StatusOK { + return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err) + } + versionBytes, err := ioutil.ReadAll(response.Body) + if err != nil { + return "", err + } + return string(versionBytes), nil +} + +func startEtcd() (*exec.Cmd, error) { + cmd := exec.Command("etcd") + err := cmd.Start() + if err != nil { + return nil, err + } + return cmd, nil +} + +func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) { + _, err := GetEtcdVersion(server) + if err != nil { + glog.Infof("Failed to find etcd, attempting to start.") + _, err := startEtcd() + if err != nil { + return nil, err + } + } + + servers := []string{server} + return etcd.NewClient(servers), nil +} + +type etcdHealth struct { + // Note this has to be public so the json library can modify it. + Health string `json:health` +} + +func EtcdHealthCheck(data []byte) error { + obj := etcdHealth{} + if err := json.Unmarshal(data, &obj); err != nil { + return err + } + if obj.Health != "true" { + return fmt.Errorf("Unhealthy status: %s", obj.Health) + } + return nil +} diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go index 257abbfd52..51aa281397 100644 --- a/pkg/tools/interfaces.go +++ b/pkg/tools/interfaces.go @@ -17,9 +17,12 @@ limitations under the License. package tools import ( - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/coreos/go-etcd/etcd" "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + + "github.com/coreos/go-etcd/etcd" ) const ( @@ -64,3 +67,108 @@ type EtcdVersioner interface { // Should return an error if the specified object does not have a persistable version. ObjectResourceVersion(obj runtime.Object) (uint64, error) } + +// ResponseMeta contains information about the etcd metadata that is associated with +// an object. It abstracts the actual underlying objects to prevent coupling with etcd +// and to improve testability. +type ResponseMeta struct { + // TTL is the time to live of the node that contained the returned object. It may be + // zero or negative in some cases (objects may be expired after the requested + // expiration time due to server lag). + TTL int64 + // Expiration is the time at which the node that contained the returned object will expire and be deleted. + // This can be nil if there is no expiration time set for the node. + Expiration *time.Time + // The resource version of the node that contained the returned object. + ResourceVersion uint64 +} + +// Pass an StorageUpdateFunc to StorageInterface.GuaranteedUpdate to make an update +// that is guaranteed to succeed. +// See the comment for GuaranteedUpdate for more details. +type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) + +// StorageInterface offers a common interface for object marshaling/unmarshling operations and +// hids all the storage-related operations behind it. +type StorageInterface interface { + // CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live + // in seconds (0 means forever). If no error is returned and out is not nil, out will be + // set to the read value from etcd. + // + // TODO(wojtekt): Rename to Create(). + CreateObj(key string, obj, out runtime.Object, ttl uint64) error + + // SetObj marshals obj via json and stores in etcd under key. Will do an atomic update + // if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever). + // If no error is returned and out is not nil, out will be set to the read value from etcd. + // + // TODO(wojtekt): Rename to Set() (or Update?). + SetObj(key string, obj, out runtime.Object, ttl uint64) error + + // DeleteObj removes the specified key and returns the value that existed at that spot. + // + // TODO(wojtekt): Rename to Delete(). + DeleteObj(key string, out runtime.Object) error + + // Delete removes the specified key. + // + // TODO(wojtekt): Unify it with DeleteObj(). + Delete(key string, recursive bool) error + + // Watch begins watching the specified key. Events are decoded into API objects, + // and any items passing 'filter' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching + // (e.g. reconnecting without missing any updates). + Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + + // WatchList begins watching the specified key's items. Items are decoded into API + // objects and any item passing 'filter' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching + // (e.g. reconnecting without missing any updates). + WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + + // ExtractObj unmarshals json found at key into objPtr. On a not found error, will either + // return a zero object of the requested type, or an error, depending on ignoreNotFound. + // Treats empty responses and nil response nodes exactly like a not found error. + // + // TODO(wojtekt): Rename to Get(). + ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error + + // ExtractObjToList unmarshals json found at key and opaque it into *List api object + // (an object that satisfies the runtime.IsList definition). + // + // TODO(wojtekt): Rename to GetToList(). + ExtractObjToList(key string, listObj runtime.Object) error + + // ExtractToList unmarshalls jsons found at directory defined by key and opaque them + // into *List api object (an object that satisfies runtime.IsList definition). + // + // TODO(wojtekt): Rename to List(). + ExtractToList(key string, listObj runtime.Object) error + + // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') + // retrying the update until success if there is etcd index conflict. + // Note that object passed to tryUpdate may change acress incovations of tryUpdate() if + // other writers are simultanously updateing it, to tryUpdate() needs to take into account + // the current contents of the object when deciding how the update object should look. + // + // Exmaple: + // + // s := /* implementation of StorageInterface */ + // err := s.GuaranteedUpdate( + // "myKey", &MyType{}, true, + // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + // // Before each incovation of the user defined function, "input" is reset to + // // etcd's current contents for "myKey". + // curr := input.(*MyType) // Guaranteed to succeed. + // + // // Make the modification + // curr.Counter++ + // + // // Return the modified object - return an error to stop iterating. Return + // // a uint64 to alter the TTL on the object, or nil to keep it the same value. + // return cur, nil, nil + // } + // }) + GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error +} diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 0197968eb5..ce3c999bb5 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -142,47 +142,3 @@ func TestWatch(t *testing.T) { } }) } - -func TestMigrateKeys(t *testing.T) { - withEtcdKey(func(oldPrefix string) { - client := newEtcdClient() - helper := tools.NewEtcdHelper(client, testapi.Codec(), oldPrefix) - - key1 := oldPrefix + "/obj1" - key2 := oldPrefix + "/foo/obj2" - key3 := oldPrefix + "/foo/bar/obj3" - - // Create a new entres - these are the 'existing' entries with old prefix - _, _ = helper.Client.Create(key1, "foo", 0) - _, _ = helper.Client.Create(key2, "foo", 0) - _, _ = helper.Client.Create(key3, "foo", 0) - - // Change the helper to a new prefix - newPrefix := "/kubernetes.io" - helper = tools.NewEtcdHelper(client, testapi.Codec(), newPrefix) - - // Migrate the keys - err := helper.MigrateKeys(oldPrefix) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - // Check the resources are at the correct new location - newNames := []string{ - newPrefix + "/obj1", - newPrefix + "/foo/obj2", - newPrefix + "/foo/bar/obj3", - } - for _, name := range newNames { - _, err := helper.Client.Get(name, false, false) - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - } - - // Check the old locations are removed - if _, err := helper.Client.Get(oldPrefix, false, false); err == nil { - t.Fatalf("Old directory still exists.") - } - }) -}