Extract EtcdHelper interface

pull/6/head
Wojciech Tyczynski 2015-07-21 11:19:11 +02:00
parent 1a49ba1bdb
commit fdb3f45077
7 changed files with 278 additions and 312 deletions

View File

@ -86,7 +86,6 @@ type APIServer struct {
EtcdServerList util.StringList
EtcdConfigFile string
EtcdPathPrefix string
OldEtcdPathPrefix string
CorsAllowedOriginList util.StringList
AllowPrivileged bool
ServiceClusterIPRange util.IPNet // TODO: make this a list
@ -187,7 +186,6 @@ func (s *APIServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.EtcdServerList, "etcd-servers", "List of etcd servers to watch (http://ip:port), comma separated. Mutually exclusive with -etcd-config")
fs.StringVar(&s.EtcdConfigFile, "etcd-config", s.EtcdConfigFile, "The config file for the etcd client. Mutually exclusive with -etcd-servers.")
fs.StringVar(&s.EtcdPathPrefix, "etcd-prefix", s.EtcdPathPrefix, "The prefix for all resource paths in etcd.")
fs.StringVar(&s.OldEtcdPathPrefix, "old-etcd-prefix", s.OldEtcdPathPrefix, "The previous prefix for all resource paths in etcd, if any.")
fs.Var(&s.CorsAllowedOriginList, "cors-allowed-origins", "List of allowed origins for CORS, comma separated. An allowed origin can be a regular expression to support subdomain matching. If this list is empty CORS will not be enabled.")
fs.BoolVar(&s.AllowPrivileged, "allow-privileged", s.AllowPrivileged, "If true, allow privileged containers.")
fs.Var(&s.ServiceClusterIPRange, "service-cluster-ip-range", "A CIDR notation IP range from which to assign service cluster IPs. This must not overlap with any IP ranges assigned to nodes for pods.")
@ -305,14 +303,6 @@ func (s *APIServer) Run(_ []string) error {
glog.Fatalf("Invalid storage version or misconfigured etcd: %v", err)
}
// TODO Is this the right place for migration to happen? Must *both* old and
// new etcd prefix params be supplied for this to be valid?
if s.OldEtcdPathPrefix != "" {
if err = helper.MigrateKeys(s.OldEtcdPathPrefix); err != nil {
glog.Fatalf("Migration of old etcd keys failed: %v", err)
}
}
n := net.IPNet(s.ServiceClusterIPRange)
// Default to the private server key for service account token signing

View File

@ -17,12 +17,8 @@ limitations under the License.
package tools
import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"path"
"reflect"
"strings"
@ -38,22 +34,24 @@ import (
"github.com/golang/glog"
)
const maxEtcdCacheEntries int = 50000
func init() {
metrics.Register()
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
Client: client,
Codec: codec,
Versioner: APIObjectVersioner{},
Copier: api.Scheme,
PathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
}
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
// EtcdHelper offers common object marshalling/unmarshalling operations on an etcd client.
// EtcdHelper is the reference implementation of StorageInterface.
// TODO(wojtekt): Make it private and expose only StorageInterface to outside world.
type EtcdHelper struct {
Client EtcdClient
Codec runtime.Codec
Copier runtime.ObjectCopier
// optional, no atomic operations can be performed without this interface
// optional, has to be set to perform any atomic operations
Versioner EtcdVersioner
// prefix for all etcd keys
PathPrefix string
@ -68,53 +66,8 @@ type EtcdHelper struct {
cache util.Cache
}
// NewEtcdHelper creates a helper that works against objects that use the internal
// Kubernetes API objects.
// TODO: Refactor to take a runtiem.ObjectCopier
func NewEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) EtcdHelper {
return EtcdHelper{
Client: client,
Codec: codec,
Versioner: APIObjectVersioner{},
Copier: api.Scheme,
PathPrefix: prefix,
cache: util.NewCache(maxEtcdCacheEntries),
}
}
// IsEtcdNotFound returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound)
}
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
func init() {
metrics.Register()
}
func (h *EtcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) {
@ -181,6 +134,12 @@ type etcdCache interface {
addToCache(index uint64, obj runtime.Object)
}
const maxEtcdCacheEntries int = 50000
func getTypeName(obj interface{}) string {
return reflect.TypeOf(obj).String()
}
func (h *EtcdHelper) getFromCache(index uint64) (runtime.Object, bool) {
startTime := time.Now()
defer func() {
@ -218,8 +177,7 @@ func (h *EtcdHelper) addToCache(index uint64, obj runtime.Object) {
}
}
// ExtractToList works on a *List api object (an object that satisfies the runtime.IsList
// definition) and extracts a go object per etcd node into a slice with the resource version.
// Implements StorageInterface.
func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractToList " + getTypeName(listObj))
defer trace.LogIfLong(time.Second)
@ -227,7 +185,7 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
if err != nil {
return err
}
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to list etcd node")
nodes, index, err := h.listEtcdNode(key)
@ -248,15 +206,14 @@ func (h *EtcdHelper) ExtractToList(key string, listObj runtime.Object) error {
return nil
}
// ExtractObjToList unmarshals json found at key and opaques it into a *List api object
// (an object that satisfies the runtime.IsList definition).
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error {
trace := util.NewTrace("ExtractObjToList " + getTypeName(listObj))
listPtr, err := runtime.GetItemsPtr(listObj)
if err != nil {
return err
}
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
startTime := time.Now()
trace.Step("About to read etcd node")
response, err := h.Client.Get(key, false, false)
@ -284,11 +241,9 @@ func (h *EtcdHelper) ExtractObjToList(key string, listObj runtime.Object) error
return nil
}
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either return
// a zero object of the requested type, or an error, depending on ignoreNotFound. Treats
// empty responses and nil response nodes exactly like a not found error.
// Implements StorageInterface.
func (h *EtcdHelper) ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
_, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound)
return err
}
@ -337,11 +292,10 @@ func (h *EtcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run
return body, node, err
}
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live in seconds,
// and 0 means forever. If no error is returned and out is not nil, out will be set to the read value
// from etcd.
// Implements StorageInterface.
func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64) error {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
data, err := h.Codec.Encode(obj)
if err != nil {
return err
@ -367,18 +321,18 @@ func (h *EtcdHelper) CreateObj(key string, obj, out runtime.Object, ttl uint64)
return err
}
// Delete removes the specified key.
// Implements StorageInterface.
func (h *EtcdHelper) Delete(key string, recursive bool) error {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
startTime := time.Now()
_, err := h.Client.Delete(key, recursive)
metrics.RecordEtcdRequestLatency("delete", "UNKNOWN", startTime)
return err
}
// DeleteObj removes the specified key and returns the value that existed at that spot.
// Implements StorageInterface.
func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
if _, err := conversion.EnforcePtr(out); err != nil {
panic("unable to convert output object to pointer")
}
@ -395,16 +349,14 @@ func (h *EtcdHelper) DeleteObj(key string, out runtime.Object) error {
return err
}
// SetObj marshals obj via json, and stores under key. Will do an atomic update if obj's ResourceVersion
// field is set. 'ttl' is time-to-live in seconds, and 0 means forever. If no error is returned and out is
// not nil, out will be set to the read value from etcd.
// Implements StorageInterface.
func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) error {
var response *etcd.Response
data, err := h.Codec.Encode(obj)
if err != nil {
return err
}
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
create := true
if h.Versioner != nil {
@ -438,63 +390,24 @@ func (h *EtcdHelper) SetObj(key string, obj, out runtime.Object, ttl uint64) err
return err
}
// ResponseMeta contains information about the etcd metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with etcd
// and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// Pass an EtcdUpdateFunc to EtcdHelper.GuaranteedUpdate to make an etcd update that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more detail.
type EtcdUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error)
// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc
func SimpleUpdate(fn SimpleEtcdUpdateFunc) EtcdUpdateFunc {
func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc {
return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) {
out, err := fn(input)
return out, nil, err
}
}
// GuaranteedUpdate calls "tryUpdate()" to update key "key" that is of type "ptrToType". It keeps
// calling tryUpdate() and retrying the update until success if there is etcd index conflict. Note that object
// passed to tryUpdate() may change across invocations of tryUpdate() if other writers are simultaneously
// updating it, so tryUpdate() needs to take into account the current contents of the object when
// deciding how the updated object (that it returns) should look.
//
// Example:
//
// h := &util.EtcdHelper{client, encoding, versioning}
// err := h.GuaranteedUpdate("myKey", &MyType{}, true, func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each invocation of the user-defined function, "input" is reset to etcd's current contents for "myKey".
//
// cur := input.(*MyType) // Guaranteed to succeed.
//
// // Make a *modification*.
// cur.Counter++
//
// // Return the modified object. Return an error to stop iterating. Return a uint64 to alter
// // the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// })
//
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate EtcdUpdateFunc) error {
// Implements StorageInterface.
func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error {
v, err := conversion.EnforcePtr(ptrToType)
if err != nil {
// Panic is appropriate, because this is a programming error.
panic("need ptr to type")
}
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
for {
obj := reflect.New(v.Type()).Interface().(runtime.Object)
origBody, node, res, err := h.bodyAndExtractObj(key, obj, ignoreNotFound)
@ -564,132 +477,10 @@ func (h *EtcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno
}
}
func (h *EtcdHelper) PrefixEtcdKey(key string) string {
func (h *EtcdHelper) prefixEtcdKey(key string) string {
if strings.HasPrefix(key, path.Join("/", h.PathPrefix)) {
return key
}
return path.Join("/", h.PathPrefix, key)
}
// Copies the key-value pairs from their old location to a new location based
// on this helper's etcd prefix. All old keys without the prefix are then deleted.
func (h *EtcdHelper) MigrateKeys(oldPathPrefix string) error {
// Check to see if a migration is necessary, i.e. is the oldPrefix different
// from the newPrefix?
if h.PathPrefix == oldPathPrefix {
return nil
}
// Get the root node
response, err := h.Client.Get(oldPathPrefix, false, true)
if err != nil {
glog.Infof("Couldn't get the existing etcd root node.")
return err
}
// Perform the migration
if err = h.migrateChildren(response.Node, oldPathPrefix); err != nil {
glog.Infof("Error performing the migration.")
return err
}
// Delete the old top-level entry recursively
// Quick sanity check: Did the process at least create a new top-level entry?
if _, err = h.Client.Get(h.PathPrefix, false, false); err != nil {
glog.Infof("Couldn't get the new etcd root node.")
return err
} else {
if _, err = h.Client.Delete(oldPathPrefix, true); err != nil {
glog.Infof("Couldn't delete the old etcd root node.")
return err
}
}
return nil
}
// This recurses through the etcd registry. Each key-value pair is copied with
// to a new pair with a prefixed key.
func (h *EtcdHelper) migrateChildren(parent *etcd.Node, oldPathPrefix string) error {
for _, child := range parent.Nodes {
if child.Dir && len(child.Nodes) > 0 {
// Descend into this directory
h.migrateChildren(child, oldPathPrefix)
// All children have been migrated, so this directory has
// already been automatically added.
continue
}
// Check if already prefixed (maybe we got interrupted in last attempt)
if strings.HasPrefix(child.Key, h.PathPrefix) {
// Skip this iteration
continue
}
// Create new entry
newKey := path.Join("/", h.PathPrefix, strings.TrimPrefix(child.Key, oldPathPrefix))
if _, err := h.Client.Create(newKey, child.Value, 0); err != nil {
// Assuming etcd is still available, this is due to the key
// already existing, in which case we can skip.
continue
}
}
return nil
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
func startEtcd() (*exec.Cmd, error) {
cmd := exec.Command("etcd")
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) {
_, err := GetEtcdVersion(server)
if err != nil {
glog.Infof("Failed to find etcd, attempting to start.")
_, err := startEtcd()
if err != nil {
return nil, err
}
}
servers := []string{server}
return etcd.NewClient(servers), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:health`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@ -846,13 +846,13 @@ func TestPrefixEtcdKey(t *testing.T) {
// Verify prefix is added
keyBefore := baseKey
keyAfter := helper.PrefixEtcdKey(keyBefore)
keyAfter := helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyAfter, path.Join(prefix, baseKey), "Prefix incorrectly added by EtcdHelper")
// Verify prefix is not added
keyBefore = path.Join(prefix, baseKey)
keyAfter = helper.PrefixEtcdKey(keyBefore)
keyAfter = helper.prefixEtcdKey(keyBefore)
assert.Equal(t, keyBefore, keyAfter, "Prefix incorrectly added by EtcdHelper")
}

View File

@ -71,7 +71,7 @@ func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) {
// watch.Interface. resourceVersion may be used to specify what version to begin
// watching (e.g., for reconnecting without missing any updates).
func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(true, exceptKey(key), filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
@ -81,7 +81,7 @@ func (h *EtcdHelper) WatchList(key string, resourceVersion uint64, filter Filter
// API objects and sent down the returned watch.Interface.
// Errors will be sent down the channel.
func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) {
key = h.PrefixEtcdKey(key)
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, filter, h.Codec, h.Versioner, nil, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w, nil
@ -103,12 +103,12 @@ func (h *EtcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc
// })
//
// Errors will be sent down the channel.
func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
key = h.PrefixEtcdKey(key)
/*func (h *EtcdHelper) WatchAndTransform(key string, resourceVersion uint64, transform TransformFunc) watch.Interface {
key = h.prefixEtcdKey(key)
w := newEtcdWatcher(false, nil, Everything, h.Codec, h.Versioner, transform, h)
go w.etcdWatch(h.Client, key, resourceVersion)
return w
}
}*/
// TransformFunc attempts to convert an object to another object for use with a watcher.
type TransformFunc func(runtime.Object) (runtime.Object, error)

121
pkg/tools/etcd_util.go Normal file
View File

@ -0,0 +1,121 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package tools
import (
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"os/exec"
"github.com/coreos/go-etcd/etcd"
"github.com/golang/glog"
)
// IsEtcdNotFound returns true iff err is an etcd not found error.
func IsEtcdNotFound(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNotFound)
}
// IsEtcdNodeExist returns true iff err is an etcd node aleady exist error.
func IsEtcdNodeExist(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeNodeExist)
}
// IsEtcdTestFailed returns true iff err is an etcd write conflict.
func IsEtcdTestFailed(err error) bool {
return isEtcdErrorNum(err, EtcdErrorCodeTestFailed)
}
// IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop.
func IsEtcdWatchStoppedByUser(err error) bool {
return etcd.ErrWatchStoppedByUser == err
}
// isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode
func isEtcdErrorNum(err error, errorCode int) bool {
etcdError, ok := err.(*etcd.EtcdError)
return ok && etcdError != nil && etcdError.ErrorCode == errorCode
}
// etcdErrorIndex returns the index associated with the error message and whether the
// index was available.
func etcdErrorIndex(err error) (uint64, bool) {
if etcdError, ok := err.(*etcd.EtcdError); ok {
return etcdError.Index, true
}
return 0, false
}
// GetEtcdVersion performs a version check against the provided Etcd server,
// returning the string response, and error (if any).
func GetEtcdVersion(host string) (string, error) {
response, err := http.Get(host + "/version")
if err != nil {
return "", err
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
return "", fmt.Errorf("unsuccessful response from etcd server %q: %v", host, err)
}
versionBytes, err := ioutil.ReadAll(response.Body)
if err != nil {
return "", err
}
return string(versionBytes), nil
}
func startEtcd() (*exec.Cmd, error) {
cmd := exec.Command("etcd")
err := cmd.Start()
if err != nil {
return nil, err
}
return cmd, nil
}
func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) {
_, err := GetEtcdVersion(server)
if err != nil {
glog.Infof("Failed to find etcd, attempting to start.")
_, err := startEtcd()
if err != nil {
return nil, err
}
}
servers := []string{server}
return etcd.NewClient(servers), nil
}
type etcdHealth struct {
// Note this has to be public so the json library can modify it.
Health string `json:health`
}
func EtcdHealthCheck(data []byte) error {
obj := etcdHealth{}
if err := json.Unmarshal(data, &obj); err != nil {
return err
}
if obj.Health != "true" {
return fmt.Errorf("Unhealthy status: %s", obj.Health)
}
return nil
}

View File

@ -17,9 +17,12 @@ limitations under the License.
package tools
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/coreos/go-etcd/etcd"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/coreos/go-etcd/etcd"
)
const (
@ -64,3 +67,108 @@ type EtcdVersioner interface {
// Should return an error if the specified object does not have a persistable version.
ObjectResourceVersion(obj runtime.Object) (uint64, error)
}
// ResponseMeta contains information about the etcd metadata that is associated with
// an object. It abstracts the actual underlying objects to prevent coupling with etcd
// and to improve testability.
type ResponseMeta struct {
// TTL is the time to live of the node that contained the returned object. It may be
// zero or negative in some cases (objects may be expired after the requested
// expiration time due to server lag).
TTL int64
// Expiration is the time at which the node that contained the returned object will expire and be deleted.
// This can be nil if there is no expiration time set for the node.
Expiration *time.Time
// The resource version of the node that contained the returned object.
ResourceVersion uint64
}
// Pass an StorageUpdateFunc to StorageInterface.GuaranteedUpdate to make an update
// that is guaranteed to succeed.
// See the comment for GuaranteedUpdate for more details.
type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error)
// StorageInterface offers a common interface for object marshaling/unmarshling operations and
// hids all the storage-related operations behind it.
type StorageInterface interface {
// CreateObj adds a new object at a key unless it already exists. 'ttl' is time-to-live
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
// set to the read value from etcd.
//
// TODO(wojtekt): Rename to Create().
CreateObj(key string, obj, out runtime.Object, ttl uint64) error
// SetObj marshals obj via json and stores in etcd under key. Will do an atomic update
// if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever).
// If no error is returned and out is not nil, out will be set to the read value from etcd.
//
// TODO(wojtekt): Rename to Set() (or Update?).
SetObj(key string, obj, out runtime.Object, ttl uint64) error
// DeleteObj removes the specified key and returns the value that existed at that spot.
//
// TODO(wojtekt): Rename to Delete().
DeleteObj(key string, out runtime.Object) error
// Delete removes the specified key.
//
// TODO(wojtekt): Unify it with DeleteObj().
Delete(key string, recursive bool) error
// Watch begins watching the specified key. Events are decoded into API objects,
// and any items passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// WatchList begins watching the specified key's items. Items are decoded into API
// objects and any item passing 'filter' are sent down to returned watch.Interface.
// resourceVersion may be used to specify what version to begin watching
// (e.g. reconnecting without missing any updates).
WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error)
// ExtractObj unmarshals json found at key into objPtr. On a not found error, will either
// return a zero object of the requested type, or an error, depending on ignoreNotFound.
// Treats empty responses and nil response nodes exactly like a not found error.
//
// TODO(wojtekt): Rename to Get().
ExtractObj(key string, objPtr runtime.Object, ignoreNotFound bool) error
// ExtractObjToList unmarshals json found at key and opaque it into *List api object
// (an object that satisfies the runtime.IsList definition).
//
// TODO(wojtekt): Rename to GetToList().
ExtractObjToList(key string, listObj runtime.Object) error
// ExtractToList unmarshalls jsons found at directory defined by key and opaque them
// into *List api object (an object that satisfies runtime.IsList definition).
//
// TODO(wojtekt): Rename to List().
ExtractToList(key string, listObj runtime.Object) error
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType')
// retrying the update until success if there is etcd index conflict.
// Note that object passed to tryUpdate may change acress incovations of tryUpdate() if
// other writers are simultanously updateing it, to tryUpdate() needs to take into account
// the current contents of the object when deciding how the update object should look.
//
// Exmaple:
//
// s := /* implementation of StorageInterface */
// err := s.GuaranteedUpdate(
// "myKey", &MyType{}, true,
// func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) {
// // Before each incovation of the user defined function, "input" is reset to
// // etcd's current contents for "myKey".
// curr := input.(*MyType) // Guaranteed to succeed.
//
// // Make the modification
// curr.Counter++
//
// // Return the modified object - return an error to stop iterating. Return
// // a uint64 to alter the TTL on the object, or nil to keep it the same value.
// return cur, nil, nil
// }
// })
GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error
}

View File

@ -142,47 +142,3 @@ func TestWatch(t *testing.T) {
}
})
}
func TestMigrateKeys(t *testing.T) {
withEtcdKey(func(oldPrefix string) {
client := newEtcdClient()
helper := tools.NewEtcdHelper(client, testapi.Codec(), oldPrefix)
key1 := oldPrefix + "/obj1"
key2 := oldPrefix + "/foo/obj2"
key3 := oldPrefix + "/foo/bar/obj3"
// Create a new entres - these are the 'existing' entries with old prefix
_, _ = helper.Client.Create(key1, "foo", 0)
_, _ = helper.Client.Create(key2, "foo", 0)
_, _ = helper.Client.Create(key3, "foo", 0)
// Change the helper to a new prefix
newPrefix := "/kubernetes.io"
helper = tools.NewEtcdHelper(client, testapi.Codec(), newPrefix)
// Migrate the keys
err := helper.MigrateKeys(oldPrefix)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
// Check the resources are at the correct new location
newNames := []string{
newPrefix + "/obj1",
newPrefix + "/foo/obj2",
newPrefix + "/foo/bar/obj3",
}
for _, name := range newNames {
_, err := helper.Client.Get(name, false, false)
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
}
// Check the old locations are removed
if _, err := helper.Client.Get(oldPrefix, false, false); err == nil {
t.Fatalf("Old directory still exists.")
}
})
}