diff --git a/cluster/addons/dns/kube2sky/kube2sky.go b/cluster/addons/dns/kube2sky/kube2sky.go index 6a6b267941..ea298bb8a4 100644 --- a/cluster/addons/dns/kube2sky/kube2sky.go +++ b/cluster/addons/dns/kube2sky/kube2sky.go @@ -37,7 +37,7 @@ import ( kclientcmd "github.com/GoogleCloudPlatform/kubernetes/pkg/client/clientcmd" kframework "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" kSelector "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" - tools "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/wait" etcd "github.com/coreos/go-etcd/etcd" @@ -354,7 +354,7 @@ func newEtcdClient(etcdServer string) (*etcd.Client, error) { err error ) for attempt := 1; attempt <= maxConnectAttempts; attempt++ { - if _, err = tools.GetEtcdVersion(etcdServer); err == nil { + if _, err = etcdstorage.GetEtcdVersion(etcdServer); err == nil { break } if attempt == maxConnectAttempts { diff --git a/cmd/kube-apiserver/app/server.go b/cmd/kube-apiserver/app/server.go index b9c811d82c..9c737d2380 100644 --- a/cmd/kube-apiserver/app/server.go +++ b/cmd/kube-apiserver/app/server.go @@ -38,6 +38,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" forked "github.com/GoogleCloudPlatform/kubernetes/third_party/forked/coreos/go-etcd/etcd" @@ -216,7 +217,7 @@ func (s *APIServer) verifyClusterIPFlags() { } } -func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (etcdStorage tools.StorageInterface, err error) { +func newEtcd(etcdConfigFile string, etcdServerList util.StringList, storageVersion string, pathPrefix string) (etcdStorage storage.Interface, err error) { var client tools.EtcdClient if etcdConfigFile != "" { client, err = etcd.NewClientFromFile(etcdConfigFile) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index c664fa7c50..d510e2f3f3 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -41,6 +41,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" "github.com/GoogleCloudPlatform/kubernetes/pkg/service" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" @@ -167,7 +168,7 @@ func main() { defer util.FlushLogs() glog.Infof("Creating etcd client pointing to %v", *etcdServer) - etcdClient, err := tools.NewEtcdClientStartServerIfNecessary(*etcdServer) + etcdClient, err := etcdstorage.NewEtcdClientStartServerIfNecessary(*etcdServer) if err != nil { glog.Fatalf("Failed to connect to etcd: %v", err) } diff --git a/contrib/mesos/pkg/election/etcd_master.go b/contrib/mesos/pkg/election/etcd_master.go index 59183a9058..81b435ee98 100644 --- a/contrib/mesos/pkg/election/etcd_master.go +++ b/contrib/mesos/pkg/election/etcd_master.go @@ -20,6 +20,7 @@ import ( "fmt" "time" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -90,10 +91,10 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // We don't handle the TTL delete w/o a write case here, it's handled in the next loop // iteration. _, err := e.etcd.CompareAndSwap(path, id, ttl, "", res.Node.ModifiedIndex) - if err != nil && !tools.IsEtcdTestFailed(err) { + if err != nil && !etcdstorage.IsEtcdTestFailed(err) { return "", err } - if err != nil && tools.IsEtcdTestFailed(err) { + if err != nil && etcdstorage.IsEtcdTestFailed(err) { return "", nil } return id, nil @@ -105,11 +106,11 @@ func (e *etcdMasterElector) extendMaster(path, id string, ttl uint64, res *etcd. // returns "", err if an error occurred func (e *etcdMasterElector) becomeMaster(path, id string, ttl uint64) (string, error) { _, err := e.etcd.Create(path, id, ttl) - if err != nil && !tools.IsEtcdNodeExist(err) { + if err != nil && !etcdstorage.IsEtcdNodeExist(err) { // unexpected error return "", err } - if err != nil && tools.IsEtcdNodeExist(err) { + if err != nil && etcdstorage.IsEtcdNodeExist(err) { return "", nil } return id, nil @@ -124,12 +125,12 @@ func (e *etcdMasterElector) handleMaster(path, id string, ttl uint64) (string, e res, err := e.etcd.Get(path, false, false) // Unexpected error, bail out - if err != nil && !tools.IsEtcdNotFound(err) { + if err != nil && !etcdstorage.IsEtcdNotFound(err) { return "", err } // There is no master, try to become the master. - if err != nil && tools.IsEtcdNotFound(err) { + if err != nil && etcdstorage.IsEtcdNotFound(err) { return e.becomeMaster(path, id, ttl) } diff --git a/contrib/mesos/pkg/scheduler/service/service.go b/contrib/mesos/pkg/scheduler/service/service.go index 9fc8b9e3ff..ffbf53e9d7 100644 --- a/contrib/mesos/pkg/scheduler/service/service.go +++ b/contrib/mesos/pkg/scheduler/service/service.go @@ -49,6 +49,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/clientauth" "github.com/GoogleCloudPlatform/kubernetes/pkg/master/ports" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" @@ -786,7 +787,7 @@ func (s *SchedulerServer) buildFrameworkInfo() (info *mesos.FrameworkInfo, cred func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.FrameworkID, error) { if s.FailoverTimeout > 0 { if response, err := client.Get(meta.FrameworkIDKey, false, false); err != nil { - if !tools.IsEtcdNotFound(err) { + if !etcdstorage.IsEtcdNotFound(err) { return nil, fmt.Errorf("unexpected failure attempting to load framework ID from etcd: %v", err) } log.V(1).Infof("did not find framework ID in etcd") @@ -797,7 +798,7 @@ func (s *SchedulerServer) fetchFrameworkID(client tools.EtcdClient) (*mesos.Fram } else { //TODO(jdef) this seems like a totally hackish way to clean up the framework ID if _, err := client.Delete(meta.FrameworkIDKey, true); err != nil { - if !tools.IsEtcdNotFound(err) { + if !etcdstorage.IsEtcdNotFound(err) { return nil, fmt.Errorf("failed to delete framework ID from etcd: %v", err) } log.V(1).Infof("nothing to delete: did not find framework ID in etcd") diff --git a/contrib/pod-master/podmaster.go b/contrib/pod-master/podmaster.go index 71d3f4a2c4..08b94e4cb2 100644 --- a/contrib/pod-master/podmaster.go +++ b/contrib/pod-master/podmaster.go @@ -27,7 +27,7 @@ import ( "strings" "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" @@ -72,7 +72,7 @@ func (c *Config) leaseAndUpdateLoop(etcdClient *etcd.Client) { func (c *Config) acquireOrRenewLease(etcdClient *etcd.Client) (bool, error) { result, err := etcdClient.Get(c.key, false, false) if err != nil { - if tools.IsEtcdNotFound(err) { + if etcdstorage.IsEtcdNotFound(err) { // there is no current master, try to become master, create will fail if the key already exists _, err := etcdClient.Create(c.key, c.whoami, c.ttl) if err != nil { diff --git a/docs/proposals/apiserver_watch.md b/docs/proposals/apiserver_watch.md index ce866b6d68..5610ccbc68 100644 --- a/docs/proposals/apiserver_watch.md +++ b/docs/proposals/apiserver_watch.md @@ -163,7 +163,7 @@ resource type. However, this watch can potentially expire at any time and reconnecting can return "too old resource version". In that case relisting is necessary. In such case, to avoid LIST requests coming from all watchers at the same time, we can introduce an additional etcd event type: -[EtcdResync](../../pkg/tools/etcd_watcher.go#L36) +[EtcdResync](../../pkg/storage/etcd/etcd_watcher.go#L36) Whenever reslisting will be done to refresh the internal watch to etcd, EtcdResync event will be send to all the watchers. It will contain the diff --git a/pkg/api/errors/etcd/etcd.go b/pkg/api/errors/etcd/etcd.go index 4a6656c74e..3b85aaffcd 100644 --- a/pkg/api/errors/etcd/etcd.go +++ b/pkg/api/errors/etcd/etcd.go @@ -18,14 +18,14 @@ package etcd import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" ) // InterpretGetError converts a generic etcd error on a retrieval // operation into the appropriate API error. func InterpretGetError(err error, kind, name string) error { switch { - case tools.IsEtcdNotFound(err): + case etcdstorage.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) default: return err @@ -36,7 +36,7 @@ func InterpretGetError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretCreateError(err error, kind, name string) error { switch { - case tools.IsEtcdNodeExist(err): + case etcdstorage.IsEtcdNodeExist(err): return errors.NewAlreadyExists(kind, name) default: return err @@ -47,7 +47,7 @@ func InterpretCreateError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretUpdateError(err error, kind, name string) error { switch { - case tools.IsEtcdTestFailed(err), tools.IsEtcdNodeExist(err): + case etcdstorage.IsEtcdTestFailed(err), etcdstorage.IsEtcdNodeExist(err): return errors.NewConflict(kind, name, err) default: return err @@ -58,7 +58,7 @@ func InterpretUpdateError(err error, kind, name string) error { // operation into the appropriate API error. func InterpretDeleteError(err error, kind, name string) error { switch { - case tools.IsEtcdNotFound(err): + case etcdstorage.IsEtcdNotFound(err): return errors.NewNotFound(kind, name) default: return err diff --git a/pkg/apiserver/authn.go b/pkg/apiserver/authn.go index eb71b37295..d40aa3eb0e 100644 --- a/pkg/apiserver/authn.go +++ b/pkg/apiserver/authn.go @@ -22,7 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authenticator" "github.com/GoogleCloudPlatform/kubernetes/pkg/auth/authenticator/bearertoken" "github.com/GoogleCloudPlatform/kubernetes/pkg/serviceaccount" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/auth/authenticator/password/passwordfile" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/auth/authenticator/request/basicauth" @@ -32,7 +32,7 @@ import ( ) // NewAuthenticator returns an authenticator.Request or an error -func NewAuthenticator(basicAuthFile, clientCAFile, tokenFile, serviceAccountKeyFile string, serviceAccountLookup bool, storage tools.StorageInterface) (authenticator.Request, error) { +func NewAuthenticator(basicAuthFile, clientCAFile, tokenFile, serviceAccountKeyFile string, serviceAccountLookup bool, storage storage.Interface) (authenticator.Request, error) { var authenticators []authenticator.Request if len(basicAuthFile) > 0 { @@ -104,7 +104,7 @@ func newAuthenticatorFromTokenFile(tokenAuthFile string) (authenticator.Request, } // newServiceAccountAuthenticator returns an authenticator.Request or an error -func newServiceAccountAuthenticator(keyfile string, lookup bool, storage tools.StorageInterface) (authenticator.Request, error) { +func newServiceAccountAuthenticator(keyfile string, lookup bool, storage storage.Interface) (authenticator.Request, error) { publicKey, err := serviceaccount.ReadPublicKey(keyfile) if err != nil { return nil, err diff --git a/pkg/apiserver/errors.go b/pkg/apiserver/errors.go index e234f0472a..503d45ac5b 100644 --- a/pkg/apiserver/errors.go +++ b/pkg/apiserver/errors.go @@ -21,7 +21,7 @@ import ( "net/http" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -52,7 +52,7 @@ func errToAPIStatus(err error) *api.Status { status := http.StatusInternalServerError switch { //TODO: replace me with NewConflictErr - case tools.IsEtcdTestFailed(err): + case etcdstorage.IsEtcdTestFailed(err): status = http.StatusConflict } // Log errors that were not converted to an error status diff --git a/pkg/master/master.go b/pkg/master/master.go index d7cf999a78..43a5c47e85 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -69,6 +69,8 @@ import ( etcdallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" ipallocator "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" serviceaccountetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/ui" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -87,7 +89,7 @@ const ( // Config is a structure used to configure a Master. type Config struct { - DatabaseStorage tools.StorageInterface + DatabaseStorage storage.Interface EventTTL time.Duration MinionRegexp string KubeletClient client.KubeletClient @@ -223,9 +225,9 @@ type Master struct { clock util.Clock } -// NewEtcdStorage returns a StorageInterface for the provided arguments or an error if the version +// NewEtcdStorage returns a storage.Interface for the provided arguments or an error if the version // is incorrect. -func NewEtcdStorage(client tools.EtcdClient, version string, prefix string) (etcdStorage tools.StorageInterface, err error) { +func NewEtcdStorage(client tools.EtcdClient, version string, prefix string) (etcdStorage storage.Interface, err error) { if version == "" { version = latest.Version } @@ -233,7 +235,7 @@ func NewEtcdStorage(client tools.EtcdClient, version string, prefix string) (etc if err != nil { return etcdStorage, err } - return tools.NewEtcdStorage(client, versionInterfaces.Codec, prefix), nil + return etcdstorage.NewEtcdStorage(client, versionInterfaces.Codec, prefix), nil } // setDefaults fills in any fields not set that are required to have valid data. @@ -721,7 +723,7 @@ func (m *Master) getServersToValidate(c *Config) map[string]apiserver.Server { addr = etcdUrl.Host port = 4001 } - serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: tools.EtcdHealthCheck} + serversToValidate[fmt.Sprintf("etcd-%d", ix)] = apiserver.Server{Addr: addr, Port: port, Path: "/health", Validate: etcdstorage.EtcdHealthCheck} } return serversToValidate } diff --git a/pkg/master/master_test.go b/pkg/master/master_test.go index 16ddfcce4f..6dfd56eb5a 100644 --- a/pkg/master/master_test.go +++ b/pkg/master/master_test.go @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) @@ -31,7 +32,7 @@ func TestGetServersToValidate(t *testing.T) { config := Config{} fakeClient := tools.NewFakeEtcdClient(t) fakeClient.Machines = []string{"http://machine1:4001", "http://machine2", "http://machine3:4003"} - config.DatabaseStorage = tools.NewEtcdStorage(fakeClient, latest.Codec, etcdtest.PathPrefix()) + config.DatabaseStorage = etcdstorage.NewEtcdStorage(fakeClient, latest.Codec, etcdtest.PathPrefix()) master.nodeRegistry = registrytest.NewMinionRegistry([]string{"node1", "node2"}, api.NodeResources{}) diff --git a/pkg/registry/controller/etcd/etcd.go b/pkg/registry/controller/etcd/etcd.go index 6fa75e4b11..75a08e1f02 100644 --- a/pkg/registry/controller/etcd/etcd.go +++ b/pkg/registry/controller/etcd/etcd.go @@ -24,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for replication controllers against etcd @@ -37,7 +37,7 @@ type REST struct { var controllerPrefix = "/controllers" // NewREST returns a RESTStorage object that will work against replication controllers. -func NewREST(s tools.StorageInterface) *REST { +func NewREST(s storage.Interface) *REST { store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.ReplicationController{} }, diff --git a/pkg/registry/controller/etcd/etcd_test.go b/pkg/registry/controller/etcd/etcd_test.go index b2c309a2f1..3477892a86 100644 --- a/pkg/registry/controller/etcd/etcd_test.go +++ b/pkg/registry/controller/etcd/etcd_test.go @@ -30,6 +30,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/coreos/go-etcd/etcd" @@ -40,10 +42,10 @@ const ( FAIL ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } @@ -624,10 +626,10 @@ func TestEtcdWatchControllersFields(t *testing.T) { }, } testEtcdActions := []string{ - tools.EtcdCreate, - tools.EtcdSet, - tools.EtcdCAS, - tools.EtcdDelete} + etcdstorage.EtcdCreate, + etcdstorage.EtcdSet, + etcdstorage.EtcdCAS, + etcdstorage.EtcdDelete} controller := &api.ReplicationController{ ObjectMeta: api.ObjectMeta{ @@ -653,7 +655,7 @@ func TestEtcdWatchControllersFields(t *testing.T) { node := &etcd.Node{ Value: string(controllerBytes), } - if action == tools.EtcdDelete { + if action == etcdstorage.EtcdDelete { prevNode = node } fakeClient.WaitForWatchCompletion() diff --git a/pkg/registry/endpoint/etcd/etcd.go b/pkg/registry/endpoint/etcd/etcd.go index 7ca422d742..0068487a33 100644 --- a/pkg/registry/endpoint/etcd/etcd.go +++ b/pkg/registry/endpoint/etcd/etcd.go @@ -24,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for endpoints against etcd @@ -33,7 +33,7 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against endpoints. -func NewStorage(s tools.StorageInterface) *REST { +func NewStorage(s storage.Interface) *REST { prefix := "/services/endpoints" return &REST{ &etcdgeneric.Etcd{ diff --git a/pkg/registry/endpoint/etcd/etcd_test.go b/pkg/registry/endpoint/etcd/etcd_test.go index 45adbe7c3b..9ad3a9cd62 100644 --- a/pkg/registry/endpoint/etcd/etcd_test.go +++ b/pkg/registry/endpoint/etcd/etcd_test.go @@ -25,6 +25,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -32,10 +34,10 @@ import ( "github.com/coreos/go-etcd/etcd" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 16dab56f1e..b9acedff0c 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -26,7 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) @@ -44,17 +44,17 @@ const ( // Registry implements BindingRegistry, ControllerRegistry, EndpointRegistry, // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { - tools.StorageInterface + storage.Interface pods pod.Registry endpoints endpoint.Registry } // NewRegistry creates an etcd registry. -func NewRegistry(storage tools.StorageInterface, pods pod.Registry, endpoints endpoint.Registry) *Registry { +func NewRegistry(storage storage.Interface, pods pod.Registry, endpoints endpoint.Registry) *Registry { registry := &Registry{ - StorageInterface: storage, - pods: pods, - endpoints: endpoints, + Interface: storage, + pods: pods, + endpoints: endpoints, } return registry } @@ -158,7 +158,7 @@ func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) (*api.Servic // WatchServices begins watching for new, changed, or deleted service configurations. func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - version, err := tools.ParseWatchResourceVersion(resourceVersion, "service") + version, err := storage.ParseWatchResourceVersion(resourceVersion, "service") if err != nil { return nil, err } @@ -171,10 +171,10 @@ func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field f return nil, err } // TODO: use generic.SelectionPredicate - return r.Watch(key, version, tools.Everything) + return r.Watch(key, version, storage.Everything) } if field.Empty() { - return r.WatchList(makeServiceListKey(ctx), version, tools.Everything) + return r.WatchList(makeServiceListKey(ctx), version, storage.Everything) } return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported") } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 8771273ac0..57ad7bc101 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -31,6 +31,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" podetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" @@ -38,13 +39,13 @@ import ( ) func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { - storage := tools.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) + storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) registry := NewRegistry(storage, nil, nil) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { - etcdStorage := tools.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) podStorage := podetcd.NewStorage(etcdStorage, nil) endpointStorage := endpointetcd.NewStorage(etcdStorage) registry := NewRegistry(etcdStorage, pod.NewRegistry(podStorage.Pod), endpoint.NewRegistry(endpointStorage)) diff --git a/pkg/registry/event/registry.go b/pkg/registry/event/registry.go index 3856dbd9c6..c3025fd2ba 100644 --- a/pkg/registry/event/registry.go +++ b/pkg/registry/event/registry.go @@ -21,7 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // registry implements custom changes to generic.Etcd. @@ -31,7 +31,7 @@ type registry struct { // NewEtcdRegistry returns a registry which will store Events in the given // EtcdStorage. ttl is the time that Events will be retained by the system. -func NewEtcdRegistry(s tools.StorageInterface, ttl uint64) generic.Registry { +func NewEtcdRegistry(s storage.Interface, ttl uint64) generic.Registry { prefix := "/events" return registry{ Etcd: &etcdgeneric.Etcd{ diff --git a/pkg/registry/event/registry_test.go b/pkg/registry/event/registry_test.go index 7ea78814ff..c54fbad9e2 100644 --- a/pkg/registry/event/registry_test.go +++ b/pkg/registry/event/registry_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -39,7 +40,7 @@ func NewTestEventEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Regi f := tools.NewFakeEtcdClient(t) f.TestIndex = true - s := tools.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) + s := etcdstorage.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) return f, NewEtcdRegistry(s, testTTL) } diff --git a/pkg/registry/generic/etcd/etcd.go b/pkg/registry/generic/etcd/etcd.go index 21db4d79ce..2429c340b5 100644 --- a/pkg/registry/generic/etcd/etcd.go +++ b/pkg/registry/generic/etcd/etcd.go @@ -29,7 +29,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -103,7 +103,7 @@ type Etcd struct { ReturnDeletedObject bool // Used for all etcd access functions - Storage tools.StorageInterface + Storage storage.Interface } // NamespaceKeyRootFunc is the default function for constructing etcd paths to resource directories enforcing namespace rules. @@ -282,7 +282,7 @@ func (e *Etcd) Update(ctx api.Context, obj runtime.Object) (runtime.Object, bool // TODO: expose TTL creating := false out := e.NewFunc() - err = e.Storage.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res tools.ResponseMeta) (runtime.Object, *uint64, error) { + err = e.Storage.GuaranteedUpdate(key, out, true, func(existing runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { version, err := e.Storage.Versioner().ObjectResourceVersion(existing) if err != nil { return nil, nil, err @@ -455,7 +455,7 @@ func (e *Etcd) Watch(ctx api.Context, label labels.Selector, field fields.Select // WatchPredicate starts a watch for the items that m matches. func (e *Etcd) WatchPredicate(ctx api.Context, m generic.Matcher, resourceVersion string) (watch.Interface, error) { - version, err := tools.ParseWatchResourceVersion(resourceVersion, e.EndpointName) + version, err := storage.ParseWatchResourceVersion(resourceVersion, e.EndpointName) if err != nil { return nil, err } diff --git a/pkg/registry/generic/etcd/etcd_test.go b/pkg/registry/generic/etcd/etcd_test.go index 85e08d39a1..7b0b960326 100644 --- a/pkg/registry/generic/etcd/etcd_test.go +++ b/pkg/registry/generic/etcd/etcd_test.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -69,7 +70,7 @@ func hasCreated(t *testing.T, pod *api.Pod) func(runtime.Object) bool { func NewTestGenericEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, *Etcd) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - s := tools.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) + s := etcdstorage.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) strategy := &testRESTStrategy{api.Scheme, api.SimpleNameGenerator, true, false, true} podPrefix := "/pods" return f, &Etcd{ diff --git a/pkg/registry/limitrange/registry.go b/pkg/registry/limitrange/registry.go index 45fd524d2e..b6efa9e6fe 100644 --- a/pkg/registry/limitrange/registry.go +++ b/pkg/registry/limitrange/registry.go @@ -21,7 +21,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // registry implements custom changes to generic.Etcd. @@ -30,7 +30,7 @@ type registry struct { } // NewEtcdRegistry returns a registry which will store LimitRange in the given storage -func NewEtcdRegistry(s tools.StorageInterface) generic.Registry { +func NewEtcdRegistry(s storage.Interface) generic.Registry { prefix := "/limitranges" return registry{ Etcd: &etcdgeneric.Etcd{ diff --git a/pkg/registry/limitrange/registry_test.go b/pkg/registry/limitrange/registry_test.go index 900da79d09..ea5d6020a6 100644 --- a/pkg/registry/limitrange/registry_test.go +++ b/pkg/registry/limitrange/registry_test.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic" etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -37,7 +38,7 @@ import ( func NewTestLimitRangeEtcdRegistry(t *testing.T) (*tools.FakeEtcdClient, generic.Registry) { f := tools.NewFakeEtcdClient(t) f.TestIndex = true - s := tools.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) + s := etcdstorage.NewEtcdStorage(f, testapi.Codec(), etcdtest.PathPrefix()) return f, NewEtcdRegistry(s) } diff --git a/pkg/registry/minion/etcd/etcd.go b/pkg/registry/minion/etcd/etcd.go index 9c8bd7c73e..48388ed6ed 100644 --- a/pkg/registry/minion/etcd/etcd.go +++ b/pkg/registry/minion/etcd/etcd.go @@ -26,7 +26,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) type REST struct { @@ -49,7 +49,7 @@ func (r *StatusREST) Update(ctx api.Context, obj runtime.Object) (runtime.Object } // NewStorage returns a RESTStorage object that will work against nodes. -func NewStorage(s tools.StorageInterface, connection client.ConnectionInfoGetter) (*REST, *StatusREST) { +func NewStorage(s storage.Interface, connection client.ConnectionInfoGetter) (*REST, *StatusREST) { prefix := "/minions" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Node{} }, diff --git a/pkg/registry/minion/etcd/etcd_test.go b/pkg/registry/minion/etcd/etcd_test.go index 6fe9ed4723..884c4fa027 100644 --- a/pkg/registry/minion/etcd/etcd_test.go +++ b/pkg/registry/minion/etcd/etcd_test.go @@ -29,6 +29,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" @@ -47,10 +49,10 @@ func (fakeConnectionInfoGetter) GetConnectionInfo(host string) (string, uint, ht return "http", 12345, nil, nil } -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/namespace/etcd/etcd.go b/pkg/registry/namespace/etcd/etcd.go index aad1511668..e5c77f6ca2 100644 --- a/pkg/registry/namespace/etcd/etcd.go +++ b/pkg/registry/namespace/etcd/etcd.go @@ -28,7 +28,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -49,7 +49,7 @@ type FinalizeREST struct { } // NewStorage returns a RESTStorage object that will work against namespaces -func NewStorage(s tools.StorageInterface) (*REST, *StatusREST, *FinalizeREST) { +func NewStorage(s storage.Interface) (*REST, *StatusREST, *FinalizeREST) { prefix := "/namespaces" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Namespace{} }, diff --git a/pkg/registry/namespace/etcd/etcd_test.go b/pkg/registry/namespace/etcd/etcd_test.go index c1cbbd9fe6..864210e192 100644 --- a/pkg/registry/namespace/etcd/etcd_test.go +++ b/pkg/registry/namespace/etcd/etcd_test.go @@ -26,6 +26,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/namespace" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -33,14 +35,14 @@ import ( "github.com/coreos/go-etcd/etcd" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } -func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient, tools.StorageInterface) { +func newStorage(t *testing.T) (*REST, *tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient, s := newEtcdStorage(t) storage, _, _ := NewStorage(s) return storage, fakeEtcdClient, s diff --git a/pkg/registry/persistentvolume/etcd/etcd.go b/pkg/registry/persistentvolume/etcd/etcd.go index dcb4c30564..acb3362280 100644 --- a/pkg/registry/persistentvolume/etcd/etcd.go +++ b/pkg/registry/persistentvolume/etcd/etcd.go @@ -26,7 +26,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/persistentvolume" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for persistentvolumes against etcd @@ -35,7 +35,7 @@ type REST struct { } // NewREST returns a RESTStorage object that will work against PersistentVolume objects. -func NewStorage(s tools.StorageInterface) (*REST, *StatusREST) { +func NewStorage(s storage.Interface) (*REST, *StatusREST) { prefix := "/persistentvolumes" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.PersistentVolume{} }, diff --git a/pkg/registry/persistentvolume/etcd/etcd_test.go b/pkg/registry/persistentvolume/etcd/etcd_test.go index b1ea67d791..bfd71f0e1d 100644 --- a/pkg/registry/persistentvolume/etcd/etcd_test.go +++ b/pkg/registry/persistentvolume/etcd/etcd_test.go @@ -27,6 +27,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -38,10 +40,10 @@ type testRegistry struct { *registrytest.GenericRegistry } -func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, tools.StorageInterface) { +func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) storage, statusStorage := NewStorage(etcdStorage) return storage, statusStorage, fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd.go b/pkg/registry/persistentvolumeclaim/etcd/etcd.go index 43c43b0e54..384601976e 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd.go @@ -24,7 +24,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/persistentvolumeclaim" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for persistentvolumeclaims against etcd @@ -33,7 +33,7 @@ type REST struct { } // NewREST returns a RESTStorage object that will work against PersistentVolumeClaim objects. -func NewStorage(s tools.StorageInterface) (*REST, *StatusREST) { +func NewStorage(s storage.Interface) (*REST, *StatusREST) { prefix := "/persistentvolumeclaims" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.PersistentVolumeClaim{} }, diff --git a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go index 3325abc448..6f298d5a73 100644 --- a/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go +++ b/pkg/registry/persistentvolumeclaim/etcd/etcd_test.go @@ -27,6 +27,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -38,10 +40,10 @@ type testRegistry struct { *registrytest.GenericRegistry } -func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, tools.StorageInterface) { +func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) storage, statusStorage := NewStorage(etcdStorage) return storage, statusStorage, fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/pod/etcd/etcd.go b/pkg/registry/pod/etcd/etcd.go index bb7b4e2f71..a830a17e1a 100644 --- a/pkg/registry/pod/etcd/etcd.go +++ b/pkg/registry/pod/etcd/etcd.go @@ -35,7 +35,7 @@ import ( genericrest "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/rest" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" ) @@ -56,7 +56,7 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against pods. -func NewStorage(s tools.StorageInterface, k client.ConnectionInfoGetter) PodStorage { +func NewStorage(s storage.Interface, k client.ConnectionInfoGetter) PodStorage { prefix := "/pods" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.Pod{} }, @@ -143,7 +143,7 @@ func (r *BindingREST) setPodHostAndAnnotations(ctx api.Context, podID, oldMachin if err != nil { return nil, err } - err = r.store.Storage.GuaranteedUpdate(podKey, &api.Pod{}, false, tools.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { + err = r.store.Storage.GuaranteedUpdate(podKey, &api.Pod{}, false, storage.SimpleUpdate(func(obj runtime.Object) (runtime.Object, error) { pod, ok := obj.(*api.Pod) if !ok { return nil, fmt.Errorf("unexpected object: %#v", obj) diff --git a/pkg/registry/pod/etcd/etcd_test.go b/pkg/registry/pod/etcd/etcd_test.go index e89ab1513f..e8999e63a6 100644 --- a/pkg/registry/pod/etcd/etcd_test.go +++ b/pkg/registry/pod/etcd/etcd_test.go @@ -33,6 +33,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/securitycontext" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -40,14 +42,14 @@ import ( "github.com/coreos/go-etcd/etcd" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } -func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, tools.StorageInterface) { +func newStorage(t *testing.T) (*REST, *BindingREST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient, etcdStorage := newEtcdStorage(t) storage := NewStorage(etcdStorage, nil) return storage.Pod, storage.Binding, storage.Status, fakeEtcdClient, etcdStorage diff --git a/pkg/registry/podtemplate/etcd/etcd.go b/pkg/registry/podtemplate/etcd/etcd.go index 4140c0702a..96d49695b9 100644 --- a/pkg/registry/podtemplate/etcd/etcd.go +++ b/pkg/registry/podtemplate/etcd/etcd.go @@ -24,7 +24,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/podtemplate" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for pod templates against etcd @@ -33,7 +33,7 @@ type REST struct { } // NewREST returns a RESTStorage object that will work against pod templates. -func NewREST(s tools.StorageInterface) *REST { +func NewREST(s storage.Interface) *REST { prefix := "/podtemplates" store := etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.PodTemplate{} }, diff --git a/pkg/registry/podtemplate/etcd/etcd_test.go b/pkg/registry/podtemplate/etcd/etcd_test.go index 0ac4adf5f2..f6277c9ae8 100644 --- a/pkg/registry/podtemplate/etcd/etcd_test.go +++ b/pkg/registry/podtemplate/etcd/etcd_test.go @@ -22,14 +22,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/resourcequota/etcd/etcd.go b/pkg/registry/resourcequota/etcd/etcd.go index 9b5b1c9eba..11225fcf0a 100644 --- a/pkg/registry/resourcequota/etcd/etcd.go +++ b/pkg/registry/resourcequota/etcd/etcd.go @@ -24,7 +24,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // rest implements a RESTStorage for resourcequotas against etcd @@ -33,7 +33,7 @@ type REST struct { } // NewStorage returns a RESTStorage object that will work against ResourceQuota objects. -func NewStorage(s tools.StorageInterface) (*REST, *StatusREST) { +func NewStorage(s storage.Interface) (*REST, *StatusREST) { prefix := "/resourcequotas" store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.ResourceQuota{} }, diff --git a/pkg/registry/resourcequota/etcd/etcd_test.go b/pkg/registry/resourcequota/etcd/etcd_test.go index c7b7d5f16e..1b195d50a6 100644 --- a/pkg/registry/resourcequota/etcd/etcd_test.go +++ b/pkg/registry/resourcequota/etcd/etcd_test.go @@ -31,6 +31,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/resourcequota" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" @@ -38,14 +40,14 @@ import ( "github.com/coreos/go-etcd/etcd" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, latest.Codec, etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } -func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, tools.StorageInterface) { +func newStorage(t *testing.T) (*REST, *StatusREST, *tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient, h := newEtcdStorage(t) storage, statusStorage := NewStorage(h) return storage, statusStorage, fakeEtcdClient, h diff --git a/pkg/registry/secret/etcd/etcd.go b/pkg/registry/secret/etcd/etcd.go index c9d6b0d7ca..6d678c351e 100644 --- a/pkg/registry/secret/etcd/etcd.go +++ b/pkg/registry/secret/etcd/etcd.go @@ -24,7 +24,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // REST implements a RESTStorage for secrets against etcd @@ -33,7 +33,7 @@ type REST struct { } // NewStorage returns a registry which will store Secret in the given etcdStorage -func NewStorage(s tools.StorageInterface) *REST { +func NewStorage(s storage.Interface) *REST { prefix := "/secrets" store := &etcdgeneric.Etcd{ diff --git a/pkg/registry/secret/etcd/etcd_test.go b/pkg/registry/secret/etcd/etcd_test.go index 83047bdc95..6a32e5dee6 100644 --- a/pkg/registry/secret/etcd/etcd_test.go +++ b/pkg/registry/secret/etcd/etcd_test.go @@ -22,14 +22,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/service/allocator/etcd/etcd.go b/pkg/registry/service/allocator/etcd/etcd.go index 924391a83e..57d45721b6 100644 --- a/pkg/registry/service/allocator/etcd/etcd.go +++ b/pkg/registry/service/allocator/etcd/etcd.go @@ -27,7 +27,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" ) var ( @@ -42,7 +43,7 @@ type Etcd struct { lock sync.Mutex alloc allocator.Snapshottable - storage tools.StorageInterface + storage storage.Interface last string baseKey string @@ -55,7 +56,7 @@ var _ service.RangeRegistry = &Etcd{} // NewEtcd returns an allocator that is backed by Etcd and can manage // persisting the snapshot state of allocation after each allocation is made. -func NewEtcd(alloc allocator.Snapshottable, baseKey string, kind string, storage tools.StorageInterface) *Etcd { +func NewEtcd(alloc allocator.Snapshottable, baseKey string, kind string, storage storage.Interface) *Etcd { return &Etcd{ alloc: alloc, storage: storage, @@ -141,7 +142,7 @@ func (e *Etcd) Release(item int) error { // tryUpdate performs a read-update to persist the latest snapshot state of allocation. func (e *Etcd) tryUpdate(fn func() error) error { err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, - tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { + storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) if len(existing.ResourceVersion) == 0 { return nil, fmt.Errorf("cannot allocate resources of type %s at this time", e.kind) @@ -171,7 +172,7 @@ func (e *Etcd) Refresh() (*api.RangeAllocation, error) { existing := &api.RangeAllocation{} if err := e.storage.Get(e.baseKey, existing, false); err != nil { - if tools.IsEtcdNotFound(err) { + if etcdstorage.IsEtcdNotFound(err) { return nil, nil } return nil, etcderr.InterpretGetError(err, e.kind, "") @@ -198,7 +199,7 @@ func (e *Etcd) CreateOrUpdate(snapshot *api.RangeAllocation) error { last := "" err := e.storage.GuaranteedUpdate(e.baseKey, &api.RangeAllocation{}, true, - tools.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { + storage.SimpleUpdate(func(input runtime.Object) (output runtime.Object, err error) { existing := input.(*api.RangeAllocation) switch { case len(snapshot.ResourceVersion) != 0 && len(existing.ResourceVersion) != 0: diff --git a/pkg/registry/service/allocator/etcd/etcd_test.go b/pkg/registry/service/allocator/etcd/etcd_test.go index a3d3c48b19..a3ff425db5 100644 --- a/pkg/registry/service/allocator/etcd/etcd_test.go +++ b/pkg/registry/service/allocator/etcd/etcd_test.go @@ -26,14 +26,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/service/ipallocator/etcd/etcd_test.go b/pkg/registry/service/ipallocator/etcd/etcd_test.go index 9c7620ecf5..e85d9119b8 100644 --- a/pkg/registry/service/ipallocator/etcd/etcd_test.go +++ b/pkg/registry/service/ipallocator/etcd/etcd_test.go @@ -29,14 +29,16 @@ import ( allocator_etcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/allocator/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/service/ipallocator" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/registry/serviceaccount/etcd/etcd.go b/pkg/registry/serviceaccount/etcd/etcd.go index c160c3be8f..98e3bed856 100644 --- a/pkg/registry/serviceaccount/etcd/etcd.go +++ b/pkg/registry/serviceaccount/etcd/etcd.go @@ -24,7 +24,7 @@ import ( etcdgeneric "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/generic/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // REST implements a RESTStorage for service accounts against etcd @@ -35,7 +35,7 @@ type REST struct { const Prefix = "/serviceaccounts" // NewStorage returns a RESTStorage object that will work against service accounts objects. -func NewStorage(s tools.StorageInterface) *REST { +func NewStorage(s storage.Interface) *REST { store := &etcdgeneric.Etcd{ NewFunc: func() runtime.Object { return &api.ServiceAccount{} }, NewListFunc: func() runtime.Object { return &api.ServiceAccountList{} }, diff --git a/pkg/registry/serviceaccount/etcd/etcd_test.go b/pkg/registry/serviceaccount/etcd/etcd_test.go index 3eef9c627d..9f74985bd6 100644 --- a/pkg/registry/serviceaccount/etcd/etcd_test.go +++ b/pkg/registry/serviceaccount/etcd/etcd_test.go @@ -22,14 +22,16 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/rest/resttest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + etcdstorage "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" ) -func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, tools.StorageInterface) { +func newEtcdStorage(t *testing.T) (*tools.FakeEtcdClient, storage.Interface) { fakeEtcdClient := tools.NewFakeEtcdClient(t) fakeEtcdClient.TestIndex = true - etcdStorage := tools.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcdstorage.NewEtcdStorage(fakeEtcdClient, testapi.Codec(), etcdtest.PathPrefix()) return fakeEtcdClient, etcdStorage } diff --git a/pkg/serviceaccount/tokengetter.go b/pkg/serviceaccount/tokengetter.go index 869cc92211..458f9ab6c4 100644 --- a/pkg/serviceaccount/tokengetter.go +++ b/pkg/serviceaccount/tokengetter.go @@ -23,7 +23,7 @@ import ( secretetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/secret/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount" serviceaccountetcd "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/serviceaccount/etcd" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" ) // ServiceAccountTokenGetter defines functions to retrieve a named service account and secret @@ -73,7 +73,7 @@ func (r *registryGetter) GetSecret(namespace, name string) (*api.Secret, error) // NewGetterFromStorageInterface returns a ServiceAccountTokenGetter that // uses the specified storage to retrieve service accounts and secrets. -func NewGetterFromStorageInterface(storage tools.StorageInterface) ServiceAccountTokenGetter { +func NewGetterFromStorageInterface(storage storage.Interface) ServiceAccountTokenGetter { return NewGetterFromRegistries( serviceaccount.NewRegistry(serviceaccountetcd.NewStorage(storage)), secret.NewRegistry(secretetcd.NewStorage(storage)), diff --git a/pkg/storage/doc.go b/pkg/storage/doc.go new file mode 100644 index 0000000000..dca0d5b709 --- /dev/null +++ b/pkg/storage/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Interfaces for database-related operations. +package storage diff --git a/pkg/tools/etcd_object.go b/pkg/storage/etcd/api_object_versioner.go similarity index 88% rename from pkg/tools/etcd_object.go rename to pkg/storage/etcd/api_object_versioner.go index eda9532be6..41a605f33e 100644 --- a/pkg/tools/etcd_object.go +++ b/pkg/storage/etcd/api_object_versioner.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "strconv" @@ -22,6 +22,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) @@ -29,7 +30,7 @@ import ( // for objects that have an embedded ObjectMeta or ListMeta field. type APIObjectVersioner struct{} -// UpdateObject implements StorageVersioner +// UpdateObject implements Versioner func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error { objectMeta, err := api.ObjectMetaFor(obj) if err != nil { @@ -46,7 +47,7 @@ func (a APIObjectVersioner) UpdateObject(obj runtime.Object, expiration *time.Ti return nil } -// UpdateList implements StorageVersioner +// UpdateList implements Versioner func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint64) error { listMeta, err := api.ListMetaFor(obj) if err != nil || listMeta == nil { @@ -60,7 +61,7 @@ func (a APIObjectVersioner) UpdateList(obj runtime.Object, resourceVersion uint6 return nil } -// ObjectResourceVersion implements StorageVersioner +// ObjectResourceVersion implements Versioner func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, error) { meta, err := api.ObjectMetaFor(obj) if err != nil { @@ -73,5 +74,5 @@ func (a APIObjectVersioner) ObjectResourceVersion(obj runtime.Object) (uint64, e return strconv.ParseUint(version, 10, 64) } -// APIObjectVersioner implements StorageVersioner -var _ StorageVersioner = APIObjectVersioner{} +// APIObjectVersioner implements Versioner +var _ storage.Versioner = APIObjectVersioner{} diff --git a/pkg/tools/etcd_object_test.go b/pkg/storage/etcd/api_object_versioner_test.go similarity index 99% rename from pkg/tools/etcd_object_test.go rename to pkg/storage/etcd/api_object_versioner_test.go index 6675fe3501..3e3129ce1a 100644 --- a/pkg/tools/etcd_object_test.go +++ b/pkg/storage/etcd/api_object_versioner_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "testing" diff --git a/pkg/storage/etcd/doc.go b/pkg/storage/etcd/doc.go new file mode 100644 index 0000000000..44a2b9d445 --- /dev/null +++ b/pkg/storage/etcd/doc.go @@ -0,0 +1,17 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd diff --git a/pkg/tools/etcd_helper.go b/pkg/storage/etcd/etcd_helper.go similarity index 92% rename from pkg/tools/etcd_helper.go rename to pkg/storage/etcd/etcd_helper.go index 730faf3a61..53a9e302e7 100644 --- a/pkg/tools/etcd_helper.go +++ b/pkg/storage/etcd/etcd_helper.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "errors" @@ -27,6 +27,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/metrics" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -35,7 +37,7 @@ import ( "github.com/golang/glog" ) -func NewEtcdStorage(client EtcdClient, codec runtime.Codec, prefix string) StorageInterface { +func NewEtcdStorage(client tools.EtcdClient, codec runtime.Codec, prefix string) storage.Interface { return &etcdHelper{ client: client, codec: codec, @@ -46,13 +48,13 @@ func NewEtcdStorage(client EtcdClient, codec runtime.Codec, prefix string) Stora } } -// etcdHelper is the reference implementation of StorageInterface. +// etcdHelper is the reference implementation of storage.Interface. type etcdHelper struct { - client EtcdClient + client tools.EtcdClient codec runtime.Codec copier runtime.ObjectCopier // optional, has to be set to perform any atomic operations - versioner StorageVersioner + versioner storage.Versioner // prefix for all etcd keys pathPrefix string @@ -70,17 +72,17 @@ func init() { metrics.Register() } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) Backends() []string { return h.client.GetCluster() } -// Implements StorageInterface. -func (h *etcdHelper) Versioner() StorageVersioner { +// Implements storage.Interface. +func (h *etcdHelper) Versioner() storage.Versioner { return h.versioner } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) error { key = h.prefixEtcdKey(key) data, err := h.codec.Encode(obj) @@ -108,7 +110,7 @@ func (h *etcdHelper) Create(key string, obj, out runtime.Object, ttl uint64) err return err } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error { var response *etcd.Response data, err := h.codec.Encode(obj) @@ -149,7 +151,7 @@ func (h *etcdHelper) Set(key string, obj, out runtime.Object, ttl uint64) error return err } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) Delete(key string, out runtime.Object) error { key = h.prefixEtcdKey(key) if _, err := conversion.EnforcePtr(out); err != nil { @@ -168,7 +170,7 @@ func (h *etcdHelper) Delete(key string, out runtime.Object) error { return err } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) RecursiveDelete(key string, recursive bool) error { key = h.prefixEtcdKey(key) startTime := time.Now() @@ -177,23 +179,23 @@ func (h *etcdHelper) RecursiveDelete(key string, recursive bool) error { return err } -// Implements StorageInterface. -func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { +// Implements storage.Interface. +func (h *etcdHelper) Watch(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { key = h.prefixEtcdKey(key) w := newEtcdWatcher(false, nil, filter, h.codec, h.versioner, nil, h) go w.etcdWatch(h.client, key, resourceVersion) return w, nil } -// Implements StorageInterface. -func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) { +// Implements storage.Interface. +func (h *etcdHelper) WatchList(key string, resourceVersion uint64, filter storage.FilterFunc) (watch.Interface, error) { key = h.prefixEtcdKey(key) w := newEtcdWatcher(true, exceptKey(key), filter, h.codec, h.versioner, nil, h) go w.etcdWatch(h.client, key, resourceVersion) return w, nil } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) Get(key string, objPtr runtime.Object, ignoreNotFound bool) error { key = h.prefixEtcdKey(key) _, _, _, err := h.bodyAndExtractObj(key, objPtr, ignoreNotFound) @@ -244,7 +246,7 @@ func (h *etcdHelper) extractObj(response *etcd.Response, inErr error, objPtr run return body, node, err } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) GetToList(key string, listObj runtime.Object) error { trace := util.NewTrace("GetToList " + getTypeName(listObj)) listPtr, err := runtime.GetItemsPtr(listObj) @@ -318,7 +320,7 @@ func (h *etcdHelper) decodeNodeList(nodes []*etcd.Node, slicePtr interface{}) er return nil } -// Implements StorageInterface. +// Implements storage.Interface. func (h *etcdHelper) List(key string, listObj runtime.Object) error { trace := util.NewTrace("List " + getTypeName(listObj)) defer trace.LogIfLong(time.Second) @@ -364,18 +366,8 @@ func (h *etcdHelper) listEtcdNode(key string) ([]*etcd.Node, uint64, error) { return result.Node.Nodes, result.EtcdIndex, nil } -type SimpleEtcdUpdateFunc func(runtime.Object) (runtime.Object, error) - -// SimpleUpdateFunc converts SimpleEtcdUpdateFunc into EtcdUpdateFunc -func SimpleUpdate(fn SimpleEtcdUpdateFunc) StorageUpdateFunc { - return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { - out, err := fn(input) - return out, nil, err - } -} - -// Implements StorageInterface. -func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error { +// Implements storage.Interface. +func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate storage.UpdateFunc) error { v, err := conversion.EnforcePtr(ptrToType) if err != nil { // Panic is appropriate, because this is a programming error. @@ -388,7 +380,7 @@ func (h *etcdHelper) GuaranteedUpdate(key string, ptrToType runtime.Object, igno if err != nil { return err } - meta := ResponseMeta{} + meta := storage.ResponseMeta{} if node != nil { meta.TTL = node.TTL if node.Expiration != nil { diff --git a/pkg/tools/etcd_helper_test.go b/pkg/storage/etcd/etcd_helper_test.go similarity index 91% rename from pkg/tools/etcd_helper_test.go rename to pkg/storage/etcd/etcd_helper_test.go index 32bcf3d2f9..4fa597fb0e 100644 --- a/pkg/tools/etcd_helper_test.go +++ b/pkg/storage/etcd/etcd_helper_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "errors" @@ -34,6 +34,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/conversion" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/coreos/go-etcd/etcd" "github.com/stretchr/testify/assert" @@ -65,7 +67,7 @@ func init() { ) } -func newEtcdHelper(client EtcdClient, codec runtime.Codec, prefix string) etcdHelper { +func newEtcdHelper(client tools.EtcdClient, codec runtime.Codec, prefix string) etcdHelper { return *NewEtcdStorage(client, codec, prefix).(*etcdHelper) } @@ -75,7 +77,7 @@ func TestIsEtcdNotFound(t *testing.T) { t.Errorf("Expected %#v to return %v, but it did not", err, isNotFound) } } - try(EtcdErrorNotFound, true) + try(tools.EtcdErrorNotFound, true) try(&etcd.EtcdError{ErrorCode: 101}, false) try(nil, false) try(fmt.Errorf("some other kind of error"), false) @@ -90,10 +92,10 @@ func getEncodedPod(name string) string { } func TestList(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ EtcdIndex: 10, Node: &etcd.Node{ @@ -160,10 +162,10 @@ func TestList(t *testing.T) { // TestListAcrossDirectories ensures that the client excludes directories and flattens tree-response - simulates cross-namespace query func TestListAcrossDirectories(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ EtcdIndex: 10, Node: &etcd.Node{ @@ -243,10 +245,10 @@ func TestListAcrossDirectories(t *testing.T) { } func TestListExcludesDirectories(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ EtcdIndex: 10, Node: &etcd.Node{ @@ -314,7 +316,7 @@ func TestListExcludesDirectories(t *testing.T) { } func TestGet(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") expect := api.Pod{ @@ -336,10 +338,10 @@ func TestGet(t *testing.T) { } func TestGetNotFoundErr(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key1 := etcdtest.AddPrefix("/some/key") - fakeClient.Data[key1] = EtcdResponseWithError{ + fakeClient.Data[key1] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -348,13 +350,13 @@ func TestGetNotFoundErr(t *testing.T) { }, } key2 := etcdtest.AddPrefix("/some/key2") - fakeClient.Data[key2] = EtcdResponseWithError{ + fakeClient.Data[key2] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, } key3 := etcdtest.AddPrefix("/some/key3") - fakeClient.Data[key3] = EtcdResponseWithError{ + fakeClient.Data[key3] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: "", @@ -380,7 +382,7 @@ func TestGetNotFoundErr(t *testing.T) { func TestCreate(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) returnedObj := &api.Pod{} err := helper.Create("/some/key", obj, returnedObj, 5) @@ -406,7 +408,7 @@ func TestCreate(t *testing.T) { func TestCreateNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) err := helper.Create("/some/key", obj, nil, 5) if err != nil { @@ -416,7 +418,7 @@ func TestCreateNilOutParam(t *testing.T) { func TestSet(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) returnedObj := &api.Pod{} err := helper.Set("/some/key", obj, returnedObj, 5) @@ -443,7 +445,7 @@ func TestSet(t *testing.T) { func TestSetFailCAS(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.CasErr = fakeClient.NewError(123) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) err := helper.Set("/some/key", obj, nil, 5) @@ -454,11 +456,11 @@ func TestSetFailCAS(t *testing.T) { func TestSetWithVersion(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo", ResourceVersion: "1"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") - fakeClient.Data[key] = EtcdResponseWithError{ + fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: runtime.EncodeOrDie(testapi.Codec(), obj), @@ -491,7 +493,7 @@ func TestSetWithVersion(t *testing.T) { func TestSetWithoutResourceVersioner(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) helper.versioner = nil returnedObj := &api.Pod{} @@ -519,7 +521,7 @@ func TestSetWithoutResourceVersioner(t *testing.T) { func TestSetNilOutParam(t *testing.T) { obj := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) helper := newEtcdHelper(fakeClient, testapi.Codec(), etcdtest.PathPrefix()) helper.versioner = nil err := helper.Set("/some/key", obj, nil, 3) @@ -529,7 +531,7 @@ func TestSetNilOutParam(t *testing.T) { } func TestGuaranteedUpdate(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") @@ -537,7 +539,7 @@ func TestGuaranteedUpdate(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -556,7 +558,7 @@ func TestGuaranteedUpdate(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { callbackCalled = true if in.(*TestResource).Value != 1 { @@ -584,7 +586,7 @@ func TestGuaranteedUpdate(t *testing.T) { } func TestGuaranteedUpdateTTL(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") @@ -592,7 +594,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 0 { t.Fatalf("unexpected response meta: %#v", res) } @@ -618,7 +620,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Update an existing node. callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 2} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 10 { t.Fatalf("unexpected response meta: %#v", res) } @@ -650,7 +652,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { // Update an existing node and change ttl callbackCalled = false objUpdate = &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 3} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, func(in runtime.Object, res storage.ResponseMeta) (runtime.Object, *uint64, error) { if res.TTL != 10 { t.Fatalf("unexpected response meta: %#v", res) } @@ -685,7 +687,7 @@ func TestGuaranteedUpdateTTL(t *testing.T) { } func TestGuaranteedUpdateNoChange(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") @@ -693,7 +695,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Create a new node. fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil })) if err != nil { @@ -703,7 +705,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { // Update an existing node with the same data callbackCalled := false objUpdate := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err = helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { fakeClient.Err = errors.New("should not be called") callbackCalled = true return objUpdate, nil @@ -717,7 +719,7 @@ func TestGuaranteedUpdateNoChange(t *testing.T) { } func TestGuaranteedUpdateKeyNotFound(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") @@ -726,7 +728,7 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) { fakeClient.ExpectNotFoundGet(key) obj := &TestResource{ObjectMeta: api.ObjectMeta{Name: "foo"}, Value: 1} - f := SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + f := storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { return obj, nil }) @@ -744,7 +746,7 @@ func TestGuaranteedUpdateKeyNotFound(t *testing.T) { } func TestGuaranteedUpdate_CreateCollision(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true helper := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := etcdtest.AddPrefix("/some/key") @@ -763,7 +765,7 @@ func TestGuaranteedUpdate_CreateCollision(t *testing.T) { defer wgDone.Done() firstCall := true - err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { + err := helper.GuaranteedUpdate("/some/key", &TestResource{}, true, storage.SimpleUpdate(func(in runtime.Object) (runtime.Object, error) { defer func() { firstCall = false }() if firstCall { @@ -842,7 +844,7 @@ func TestGetEtcdVersion_NotListening(t *testing.T) { } func TestPrefixEtcdKey(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) prefix := path.Join("/", etcdtest.PathPrefix()) helper := newEtcdHelper(fakeClient, testapi.Codec(), prefix) diff --git a/pkg/tools/etcd_util.go b/pkg/storage/etcd/etcd_util.go similarity index 83% rename from pkg/tools/etcd_util.go rename to pkg/storage/etcd/etcd_util.go index ebcab56ded..c5f0231d05 100644 --- a/pkg/tools/etcd_util.go +++ b/pkg/storage/etcd/etcd_util.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "encoding/json" @@ -23,41 +23,42 @@ import ( "net/http" "os/exec" - "github.com/coreos/go-etcd/etcd" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + goetcd "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" ) // IsEtcdNotFound returns true iff err is an etcd not found error. func IsEtcdNotFound(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNotFound) + return isEtcdErrorNum(err, tools.EtcdErrorCodeNotFound) } // IsEtcdNodeExist returns true iff err is an etcd node aleady exist error. func IsEtcdNodeExist(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeNodeExist) + return isEtcdErrorNum(err, tools.EtcdErrorCodeNodeExist) } // IsEtcdTestFailed returns true iff err is an etcd write conflict. func IsEtcdTestFailed(err error) bool { - return isEtcdErrorNum(err, EtcdErrorCodeTestFailed) + return isEtcdErrorNum(err, tools.EtcdErrorCodeTestFailed) } // IsEtcdWatchStoppedByUser returns true iff err is a client triggered stop. func IsEtcdWatchStoppedByUser(err error) bool { - return etcd.ErrWatchStoppedByUser == err + return goetcd.ErrWatchStoppedByUser == err } // isEtcdErrorNum returns true iff err is an etcd error, whose errorCode matches errorCode func isEtcdErrorNum(err error, errorCode int) bool { - etcdError, ok := err.(*etcd.EtcdError) + etcdError, ok := err.(*goetcd.EtcdError) return ok && etcdError != nil && etcdError.ErrorCode == errorCode } // etcdErrorIndex returns the index associated with the error message and whether the // index was available. func etcdErrorIndex(err error) (uint64, bool) { - if etcdError, ok := err.(*etcd.EtcdError); ok { + if etcdError, ok := err.(*goetcd.EtcdError); ok { return etcdError.Index, true } return 0, false @@ -90,7 +91,7 @@ func startEtcd() (*exec.Cmd, error) { return cmd, nil } -func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) { +func NewEtcdClientStartServerIfNecessary(server string) (tools.EtcdClient, error) { _, err := GetEtcdVersion(server) if err != nil { glog.Infof("Failed to find etcd, attempting to start.") @@ -101,7 +102,7 @@ func NewEtcdClientStartServerIfNecessary(server string) (EtcdClient, error) { } servers := []string{server} - return etcd.NewClient(servers), nil + return goetcd.NewClient(servers), nil } type etcdHealth struct { diff --git a/pkg/tools/etcd_watcher.go b/pkg/storage/etcd/etcd_watcher.go similarity index 86% rename from pkg/tools/etcd_watcher.go rename to pkg/storage/etcd/etcd_watcher.go index ab1ac597b8..403709712b 100644 --- a/pkg/tools/etcd_watcher.go +++ b/pkg/storage/etcd/etcd_watcher.go @@ -14,18 +14,17 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( - "strconv" "sync" "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -41,31 +40,6 @@ const ( EtcdDelete = "delete" ) -// FilterFunc is a predicate which takes an API object and returns true -// iff the object should remain in the set. -type FilterFunc func(obj runtime.Object) bool - -// Everything is a FilterFunc which accepts all objects. -func Everything(runtime.Object) bool { - return true -} - -// ParseWatchResourceVersion takes a resource version argument and converts it to -// the etcd version we should pass to helper.Watch(). Because resourceVersion is -// an opaque value, the default watch behavior for non-zero watch is to watch -// the next value (if you pass "1", you will see updates from "2" onwards). -func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { - if resourceVersion == "" || resourceVersion == "0" { - return 0, nil - } - version, err := strconv.ParseUint(resourceVersion, 10, 64) - if err != nil { - // TODO: Does this need to be a ValidationErrorList? I can't convince myself it does. - return 0, errors.NewInvalid(kind, "", fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("resourceVersion", resourceVersion, err.Error())}) - } - return version + 1, nil -} - // TransformFunc attempts to convert an object to another object for use with a watcher. type TransformFunc func(runtime.Object) (runtime.Object, error) @@ -82,12 +56,12 @@ func exceptKey(except string) includeFunc { // etcdWatcher converts a native etcd watch to a watch.Interface. type etcdWatcher struct { encoding runtime.Codec - versioner StorageVersioner + versioner storage.Versioner transform TransformFunc list bool // If we're doing a recursive watch, should be true. include includeFunc - filter FilterFunc + filter storage.FilterFunc etcdIncoming chan *etcd.Response etcdError chan error @@ -110,7 +84,7 @@ const watchWaitDuration = 100 * time.Millisecond // newEtcdWatcher returns a new etcdWatcher; if list is true, watch sub-nodes. If you provide a transform // and a versioner, the versioner must be able to handle the objects that transform creates. -func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding runtime.Codec, versioner StorageVersioner, transform TransformFunc, cache etcdCache) *etcdWatcher { +func newEtcdWatcher(list bool, include includeFunc, filter storage.FilterFunc, encoding runtime.Codec, versioner storage.Versioner, transform TransformFunc, cache etcdCache) *etcdWatcher { w := &etcdWatcher{ encoding: encoding, versioner: versioner, @@ -142,7 +116,7 @@ func newEtcdWatcher(list bool, include includeFunc, filter FilterFunc, encoding // etcdWatch calls etcd's Watch function, and handles any errors. Meant to be called // as a goroutine. -func (w *etcdWatcher) etcdWatch(client EtcdClient, key string, resourceVersion uint64) { +func (w *etcdWatcher) etcdWatch(client tools.EtcdClient, key string, resourceVersion uint64) { defer util.HandleCrash() defer close(w.etcdError) if resourceVersion == 0 { @@ -160,7 +134,7 @@ func (w *etcdWatcher) etcdWatch(client EtcdClient, key string, resourceVersion u } // etcdGetInitialWatchState turns an etcd Get request into a watch equivalent -func etcdGetInitialWatchState(client EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { +func etcdGetInitialWatchState(client tools.EtcdClient, key string, recursive bool, incoming chan<- *etcd.Response) (resourceVersion uint64, err error) { resp, err := client.Get(key, false, recursive) if err != nil { if !IsEtcdNotFound(err) { diff --git a/pkg/tools/etcd_watcher_test.go b/pkg/storage/etcd/etcd_watcher_test.go similarity index 87% rename from pkg/tools/etcd_watcher_test.go rename to pkg/storage/etcd/etcd_watcher_test.go index a6e9612994..5edfde8d69 100644 --- a/pkg/tools/etcd_watcher_test.go +++ b/pkg/storage/etcd/etcd_watcher_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package tools +package etcd import ( "fmt" @@ -22,9 +22,10 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/coreos/go-etcd/etcd" @@ -165,7 +166,7 @@ func TestWatchInterpretations(t *testing.T) { } func TestWatchInterpretation_ResponseNotSet(t *testing.T) { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -179,7 +180,7 @@ func TestWatchInterpretation_ResponseNotSet(t *testing.T) { func TestWatchInterpretation_ResponseNoNode(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -193,7 +194,7 @@ func TestWatchInterpretation_ResponseNoNode(t *testing.T) { func TestWatchInterpretation_ResponseBadData(t *testing.T) { actions := []string{"create", "set", "compareAndSwap", "delete"} for _, action := range actions { - w := newEtcdWatcher(false, nil, Everything, codec, versioner, nil, &fakeEtcdCache{}) + w := newEtcdWatcher(false, nil, storage.Everything, codec, versioner, nil, &fakeEtcdCache{}) w.emit = func(e watch.Event) { t.Errorf("Unexpected emit: %v", e) } @@ -215,12 +216,12 @@ func TestWatchInterpretation_ResponseBadData(t *testing.T) { func TestWatchEtcdError(t *testing.T) { codec := latest.Codec - fakeClient := NewFakeEtcdClient(t) - fakeClient.expectNotFoundGetSet["/some/key"] = struct{}{} + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.ExpectNotFoundGet("/some/key") fakeClient.WatchImmediateError = fmt.Errorf("immediate error") h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch("/some/key", 4, Everything) + watching, err := h.Watch("/some/key", 4, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -244,13 +245,13 @@ func TestWatchEtcdError(t *testing.T) { func TestWatch(t *testing.T) { codec := latest.Codec - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{} + fakeClient.ExpectNotFoundGet(prefixedKey) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, Everything) + watching, err := h.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -322,13 +323,13 @@ func TestWatchEtcdState(t *testing.T) { Endpoints []api.EndpointSubset } testCases := map[string]struct { - Initial map[string]EtcdResponseWithError + Initial map[string]tools.EtcdResponseWithError Responses []*etcd.Response From uint64 Expected []*T }{ "from not found": { - Initial: map[string]EtcdResponseWithError{}, + Initial: map[string]tools.EtcdResponseWithError{}, Responses: []*etcd.Response{ { Action: "create", @@ -373,7 +374,7 @@ func TestWatchEtcdState(t *testing.T) { }, }, "from initial state": { - Initial: map[string]EtcdResponseWithError{ + Initial: map[string]tools.EtcdResponseWithError{ prefixedKey: { R: &etcd.Response{ Action: "get", @@ -419,13 +420,13 @@ func TestWatchEtcdState(t *testing.T) { } for k, testCase := range testCases { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) for key, value := range testCase.Initial { fakeClient.Data[key] = value } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(baseKey, testCase.From, Everything) + watching, err := h.Watch(baseKey, testCase.From, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -456,12 +457,12 @@ func TestWatchFromZeroIndex(t *testing.T) { pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} testCases := map[string]struct { - Response EtcdResponseWithError + Response tools.EtcdResponseWithError ExpectedVersion string ExpectedType watch.EventType }{ "get value created": { - EtcdResponseWithError{ + tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: runtime.EncodeOrDie(codec, pod), @@ -476,7 +477,7 @@ func TestWatchFromZeroIndex(t *testing.T) { watch.Added, }, "get value modified": { - EtcdResponseWithError{ + tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Value: runtime.EncodeOrDie(codec, pod), @@ -493,13 +494,13 @@ func TestWatchFromZeroIndex(t *testing.T) { } for k, testCase := range testCases { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) fakeClient.Data[prefixedKey] = testCase.Response h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, Everything) + watching, err := h.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -534,8 +535,8 @@ func TestWatchListFromZeroIndex(t *testing.T) { pod := &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}} key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient := NewFakeEtcdClient(t) - fakeClient.Data[prefixedKey] = EtcdResponseWithError{ + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Dir: true, @@ -560,7 +561,7 @@ func TestWatchListFromZeroIndex(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.WatchList(key, 0, Everything) + watching, err := h.WatchList(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -597,10 +598,10 @@ func TestWatchListIgnoresRootKey(t *testing.T) { key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.WatchList(key, 1, Everything) + watching, err := h.WatchList(key, 1, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -639,10 +640,10 @@ func TestWatchListIgnoresRootKey(t *testing.T) { } func TestWatchFromNotFound(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient.Data[prefixedKey] = EtcdResponseWithError{ + fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -653,7 +654,7 @@ func TestWatchFromNotFound(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, Everything) + watching, err := h.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -666,10 +667,10 @@ func TestWatchFromNotFound(t *testing.T) { } func TestWatchFromOtherError(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient.Data[prefixedKey] = EtcdResponseWithError{ + fakeClient.Data[prefixedKey] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: nil, }, @@ -680,7 +681,7 @@ func TestWatchFromOtherError(t *testing.T) { } h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) - watching, err := h.Watch(key, 0, Everything) + watching, err := h.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -708,15 +709,15 @@ func TestWatchFromOtherError(t *testing.T) { } func TestWatchPurposefulShutdown(t *testing.T) { - fakeClient := NewFakeEtcdClient(t) + fakeClient := tools.NewFakeEtcdClient(t) h := newEtcdHelper(fakeClient, codec, etcdtest.PathPrefix()) key := "/some/key" prefixedKey := etcdtest.AddPrefix(key) - fakeClient.expectNotFoundGetSet[prefixedKey] = struct{}{} + fakeClient.ExpectNotFoundGet(prefixedKey) // Test purposeful shutdown - watching, err := h.Watch(key, 0, Everything) + watching, err := h.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } @@ -732,38 +733,3 @@ func TestWatchPurposefulShutdown(t *testing.T) { t.Errorf("An injected error did not cause a graceful shutdown") } } - -func TestEtcdParseWatchResourceVersion(t *testing.T) { - testCases := []struct { - Version string - Kind string - ExpectVersion uint64 - Err bool - }{ - {Version: "", ExpectVersion: 0}, - {Version: "a", Err: true}, - {Version: " ", Err: true}, - {Version: "1", ExpectVersion: 2}, - {Version: "10", ExpectVersion: 11}, - } - for _, testCase := range testCases { - version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) - switch { - case testCase.Err: - if err == nil { - t.Errorf("%s: unexpected non-error", testCase.Version) - continue - } - if !errors.IsInvalid(err) { - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - case !testCase.Err && err != nil: - t.Errorf("%s: unexpected error: %v", testCase.Version, err) - continue - } - if version != testCase.ExpectVersion { - t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) - } - } -} diff --git a/pkg/storage/interfaces.go b/pkg/storage/interfaces.go new file mode 100644 index 0000000000..36829f99b7 --- /dev/null +++ b/pkg/storage/interfaces.go @@ -0,0 +1,149 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Versioner abstracts setting and retrieving metadata fields from database response +// onto the object ot list. +type Versioner interface { + // UpdateObject sets storage metadata into an API object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from database. + UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error + // UpdateList sets the resource version into an API list object. Returns an error if the object + // cannot be updated correctly. May return nil if the requested object does not need metadata + // from database. + UpdateList(obj runtime.Object, resourceVersion uint64) error + // ObjectResourceVersion returns the resource version (for persistence) of the specified object. + // Should return an error if the specified object does not have a persistable version. + ObjectResourceVersion(obj runtime.Object) (uint64, error) +} + +// ResponseMeta contains information about the database metadata that is associated with +// an object. It abstracts the actual underlying objects to prevent coupling with concrete +// database and to improve testability. +type ResponseMeta struct { + // TTL is the time to live of the node that contained the returned object. It may be + // zero or negative in some cases (objects may be expired after the requested + // expiration time due to server lag). + TTL int64 + // Expiration is the time at which the node that contained the returned object will expire and be deleted. + // This can be nil if there is no expiration time set for the node. + Expiration *time.Time + // The resource version of the node that contained the returned object. + ResourceVersion uint64 +} + +// FilterFunc is a predicate which takes an API object and returns true +// iff the object should remain in the set. +type FilterFunc func(obj runtime.Object) bool + +// Everything is a FilterFunc which accepts all objects. +func Everything(runtime.Object) bool { + return true +} + +// Pass an UpdateFunc to Interface.GuaranteedUpdate to make an update +// that is guaranteed to succeed. +// See the comment for GuaranteedUpdate for more details. +type UpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) + +// Interface offers a common interface for object marshaling/unmarshling operations and +// hids all the storage-related operations behind it. +type Interface interface { + // Returns list of servers addresses of the underyling database. + // TODO: This method is used only in a single place. Consider refactoring and getting rid + // of this method from the interface. + Backends() []string + + // Returns Versioner associated with this interface. + Versioner() Versioner + + // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live + // in seconds (0 means forever). If no error is returned and out is not nil, out will be + // set to the read value from database. + Create(key string, obj, out runtime.Object, ttl uint64) error + + // Set marshals obj via json and stores in database under key. Will do an atomic update + // if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever). + // If no error is returned and out is not nil, out will be set to the read value from database. + Set(key string, obj, out runtime.Object, ttl uint64) error + + // Delete removes the specified key and returns the value that existed at that spot. + Delete(key string, out runtime.Object) error + + // RecursiveDelete removes the specified key. + // TODO: Get rid of this method and use Delete() instead. + RecursiveDelete(key string, recursive bool) error + + // Watch begins watching the specified key. Events are decoded into API objects, + // and any items passing 'filter' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching + // (e.g. reconnecting without missing any updates). + Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + + // WatchList begins watching the specified key's items. Items are decoded into API + // objects and any item passing 'filter' are sent down to returned watch.Interface. + // resourceVersion may be used to specify what version to begin watching + // (e.g. reconnecting without missing any updates). + WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) + + // Get unmarshals json found at key into objPtr. On a not found error, will either + // return a zero object of the requested type, or an error, depending on ignoreNotFound. + // Treats empty responses and nil response nodes exactly like a not found error. + Get(key string, objPtr runtime.Object, ignoreNotFound bool) error + + // GetToList unmarshals json found at key and opaque it into *List api object + // (an object that satisfies the runtime.IsList definition). + GetToList(key string, listObj runtime.Object) error + + // List unmarshalls jsons found at directory defined by key and opaque them + // into *List api object (an object that satisfies runtime.IsList definition). + List(key string, listObj runtime.Object) error + + // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') + // retrying the update until success if there is index conflict. + // Note that object passed to tryUpdate may change acress incovations of tryUpdate() if + // other writers are simultanously updateing it, to tryUpdate() needs to take into account + // the current contents of the object when deciding how the update object should look. + // + // Exmaple: + // + // s := /* implementation of Interface */ + // err := s.GuaranteedUpdate( + // "myKey", &MyType{}, true, + // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { + // // Before each incovation of the user defined function, "input" is reset to + // // current contents for "myKey" in database. + // curr := input.(*MyType) // Guaranteed to succeed. + // + // // Make the modification + // curr.Counter++ + // + // // Return the modified object - return an error to stop iterating. Return + // // a uint64 to alter the TTL on the object, or nil to keep it the same value. + // return cur, nil, nil + // } + // }) + GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate UpdateFunc) error +} diff --git a/pkg/storage/util.go b/pkg/storage/util.go new file mode 100644 index 0000000000..e95735e2ac --- /dev/null +++ b/pkg/storage/util.go @@ -0,0 +1,51 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "strconv" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/fielderrors" +) + +type SimpleUpdateFunc func(runtime.Object) (runtime.Object, error) + +// SimpleUpdateFunc converts SimpleUpdateFunc into UpdateFunc +func SimpleUpdate(fn SimpleUpdateFunc) UpdateFunc { + return func(input runtime.Object, _ ResponseMeta) (runtime.Object, *uint64, error) { + out, err := fn(input) + return out, nil, err + } +} + +// ParseWatchResourceVersion takes a resource version argument and converts it to +// the etcd version we should pass to helper.Watch(). Because resourceVersion is +// an opaque value, the default watch behavior for non-zero watch is to watch +// the next value (if you pass "1", you will see updates from "2" onwards). +func ParseWatchResourceVersion(resourceVersion, kind string) (uint64, error) { + if resourceVersion == "" || resourceVersion == "0" { + return 0, nil + } + version, err := strconv.ParseUint(resourceVersion, 10, 64) + if err != nil { + // TODO: Does this need to be a ValidationErrorList? I can't convince myself it does. + return 0, errors.NewInvalid(kind, "", fielderrors.ValidationErrorList{fielderrors.NewFieldInvalid("resourceVersion", resourceVersion, err.Error())}) + } + return version + 1, nil +} diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go new file mode 100644 index 0000000000..4445b6c1db --- /dev/null +++ b/pkg/storage/util_test.go @@ -0,0 +1,58 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storage + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" +) + +func TestEtcdParseWatchResourceVersion(t *testing.T) { + testCases := []struct { + Version string + Kind string + ExpectVersion uint64 + Err bool + }{ + {Version: "", ExpectVersion: 0}, + {Version: "a", Err: true}, + {Version: " ", Err: true}, + {Version: "1", ExpectVersion: 2}, + {Version: "10", ExpectVersion: 11}, + } + for _, testCase := range testCases { + version, err := ParseWatchResourceVersion(testCase.Version, testCase.Kind) + switch { + case testCase.Err: + if err == nil { + t.Errorf("%s: unexpected non-error", testCase.Version) + continue + } + if !errors.IsInvalid(err) { + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + case !testCase.Err && err != nil: + t.Errorf("%s: unexpected error: %v", testCase.Version, err) + continue + } + if version != testCase.ExpectVersion { + t.Errorf("%s: expected version %d but was %d", testCase.Version, testCase.ExpectVersion, version) + } + } +} diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 45232f5d8b..bf4b7fdd1f 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -281,7 +281,8 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err Index: f.ChangeIndex, } } - if IsEtcdNotFound(existing.E) { + etcdError, ok := existing.E.(*etcd.EtcdError) + if ok && etcdError != nil && etcdError.ErrorCode == EtcdErrorCodeNotFound { f.DeletedKeys = append(f.DeletedKeys, key) return existing.R, existing.E } diff --git a/pkg/tools/interfaces.go b/pkg/tools/interfaces.go index 589a0ca343..02569cb28b 100644 --- a/pkg/tools/interfaces.go +++ b/pkg/tools/interfaces.go @@ -17,11 +17,6 @@ limitations under the License. package tools import ( - "time" - - "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" - "github.com/coreos/go-etcd/etcd" ) @@ -51,119 +46,3 @@ type EtcdClient interface { // the etcd client interface which doesn't, and it doesn't seem worth it to wrap the api. Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) } - -// StorageVersioner abstracts setting and retrieving metadata fields from the etcd response onto the object -// or list. -type StorageVersioner interface { - // UpdateObject sets etcd storage metadata into an API object. Returns an error if the object - // cannot be updated correctly. May return nil if the requested object does not need metadata - // from etcd. - UpdateObject(obj runtime.Object, expiration *time.Time, resourceVersion uint64) error - // UpdateList sets the resource version into an API list object. Returns an error if the object - // cannot be updated correctly. May return nil if the requested object does not need metadata - // from etcd. - UpdateList(obj runtime.Object, resourceVersion uint64) error - // ObjectResourceVersion returns the resource version (for persistence) of the specified object. - // Should return an error if the specified object does not have a persistable version. - ObjectResourceVersion(obj runtime.Object) (uint64, error) -} - -// ResponseMeta contains information about the etcd metadata that is associated with -// an object. It abstracts the actual underlying objects to prevent coupling with etcd -// and to improve testability. -type ResponseMeta struct { - // TTL is the time to live of the node that contained the returned object. It may be - // zero or negative in some cases (objects may be expired after the requested - // expiration time due to server lag). - TTL int64 - // Expiration is the time at which the node that contained the returned object will expire and be deleted. - // This can be nil if there is no expiration time set for the node. - Expiration *time.Time - // The resource version of the node that contained the returned object. - ResourceVersion uint64 -} - -// Pass an StorageUpdateFunc to StorageInterface.GuaranteedUpdate to make an update -// that is guaranteed to succeed. -// See the comment for GuaranteedUpdate for more details. -type StorageUpdateFunc func(input runtime.Object, res ResponseMeta) (output runtime.Object, ttl *uint64, err error) - -// StorageInterface offers a common interface for object marshaling/unmarshling operations and -// hids all the storage-related operations behind it. -type StorageInterface interface { - // Returns list of servers addresses of the underyling database. - // TODO: This method is used only in a single place. Consider refactoring and getting rid - // of this method from the interface. - Backends() []string - - // Returns StorageVersioner associated with this interface. - Versioner() StorageVersioner - - // Create adds a new object at a key unless it already exists. 'ttl' is time-to-live - // in seconds (0 means forever). If no error is returned and out is not nil, out will be - // set to the read value from etcd. - Create(key string, obj, out runtime.Object, ttl uint64) error - - // Set marshals obj via json and stores in etcd under key. Will do an atomic update - // if obj's ResourceVersion field is set. 'ttl' is time-to-live in seconds (0 means forever). - // If no error is returned and out is not nil, out will be set to the read value from etcd. - Set(key string, obj, out runtime.Object, ttl uint64) error - - // Delete removes the specified key and returns the value that existed at that spot. - Delete(key string, out runtime.Object) error - - // RecursiveDelete removes the specified key. - // TODO: Get rid of this method and use Delete() instead. - RecursiveDelete(key string, recursive bool) error - - // Watch begins watching the specified key. Events are decoded into API objects, - // and any items passing 'filter' are sent down to returned watch.Interface. - // resourceVersion may be used to specify what version to begin watching - // (e.g. reconnecting without missing any updates). - Watch(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) - - // WatchList begins watching the specified key's items. Items are decoded into API - // objects and any item passing 'filter' are sent down to returned watch.Interface. - // resourceVersion may be used to specify what version to begin watching - // (e.g. reconnecting without missing any updates). - WatchList(key string, resourceVersion uint64, filter FilterFunc) (watch.Interface, error) - - // Get unmarshals json found at key into objPtr. On a not found error, will either - // return a zero object of the requested type, or an error, depending on ignoreNotFound. - // Treats empty responses and nil response nodes exactly like a not found error. - Get(key string, objPtr runtime.Object, ignoreNotFound bool) error - - // GetToList unmarshals json found at key and opaque it into *List api object - // (an object that satisfies the runtime.IsList definition). - GetToList(key string, listObj runtime.Object) error - - // List unmarshalls jsons found at directory defined by key and opaque them - // into *List api object (an object that satisfies runtime.IsList definition). - List(key string, listObj runtime.Object) error - - // GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'ptrToType') - // retrying the update until success if there is etcd index conflict. - // Note that object passed to tryUpdate may change acress incovations of tryUpdate() if - // other writers are simultanously updateing it, to tryUpdate() needs to take into account - // the current contents of the object when deciding how the update object should look. - // - // Exmaple: - // - // s := /* implementation of StorageInterface */ - // err := s.GuaranteedUpdate( - // "myKey", &MyType{}, true, - // func(input runtime.Object, res ResponseMeta) (runtime.Object, *uint64, error) { - // // Before each incovation of the user defined function, "input" is reset to - // // etcd's current contents for "myKey". - // curr := input.(*MyType) // Guaranteed to succeed. - // - // // Make the modification - // curr.Counter++ - // - // // Return the modified object - return an error to stop iterating. Return - // // a uint64 to alter the TTL on the object, or nil to keep it the same value. - // return cur, nil, nil - // } - // }) - GuaranteedUpdate(key string, ptrToType runtime.Object, ignoreNotFound bool, tryUpdate StorageUpdateFunc) error -} diff --git a/test/integration/etcd_tools_test.go b/test/integration/etcd_tools_test.go index 258b20787b..f28fda0eb5 100644 --- a/test/integration/etcd_tools_test.go +++ b/test/integration/etcd_tools_test.go @@ -25,7 +25,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/test/integration/framework" @@ -33,7 +34,7 @@ import ( func TestSet(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := tools.NewEtcdStorage(client, testapi.Codec(), "") + etcdStorage := etcd.NewEtcdStorage(client, testapi.Codec(), "") framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} if err := etcdStorage.Set(key, &testObject, nil, 0); err != nil { @@ -56,7 +57,7 @@ func TestSet(t *testing.T) { func TestGet(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := tools.NewEtcdStorage(client, testapi.Codec(), "") + etcdStorage := etcd.NewEtcdStorage(client, testapi.Codec(), "") framework.WithEtcdKey(func(key string) { testObject := api.ServiceAccount{ObjectMeta: api.ObjectMeta{Name: "foo"}} coded, err := testapi.Codec().Encode(&testObject) @@ -81,7 +82,7 @@ func TestGet(t *testing.T) { func TestWatch(t *testing.T) { client := framework.NewEtcdClient() - etcdStorage := tools.NewEtcdStorage(client, testapi.Codec(), etcdtest.PathPrefix()) + etcdStorage := etcd.NewEtcdStorage(client, testapi.Codec(), etcdtest.PathPrefix()) framework.WithEtcdKey(func(key string) { key = etcdtest.AddPrefix(key) resp, err := client.Set(key, runtime.EncodeOrDie(testapi.Codec(), &api.Pod{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) @@ -91,7 +92,7 @@ func TestWatch(t *testing.T) { expectedVersion := resp.Node.ModifiedIndex // watch should load the object at the current index - w, err := etcdStorage.Watch(key, 0, tools.Everything) + w, err := etcdStorage.Watch(key, 0, storage.Everything) if err != nil { t.Fatalf("Unexpected error: %v", err) } diff --git a/test/integration/framework/etcd_utils.go b/test/integration/framework/etcd_utils.go index a46bd7bac2..2104a7bf3c 100644 --- a/test/integration/framework/etcd_utils.go +++ b/test/integration/framework/etcd_utils.go @@ -24,7 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/coreos/go-etcd/etcd" "github.com/golang/glog" @@ -41,7 +41,7 @@ func NewEtcdClient() *etcd.Client { return etcd.NewClient([]string{}) } -func NewEtcdStorage() (tools.StorageInterface, error) { +func NewEtcdStorage() (storage.Interface, error) { return master.NewEtcdStorage(NewEtcdClient(), testapi.Version(), etcdtest.PathPrefix()) } diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 719972a126..3d6a2a590d 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -35,7 +35,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/kubectl" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/master" - "github.com/GoogleCloudPlatform/kubernetes/pkg/tools" + "github.com/GoogleCloudPlatform/kubernetes/pkg/storage" "github.com/GoogleCloudPlatform/kubernetes/pkg/tools/etcdtest" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/admission/admit" "github.com/golang/glog" @@ -72,7 +72,7 @@ type MasterComponents struct { // Used to stop master components individually, and via MasterComponents.Stop once sync.Once // Kubernetes etcd storage, has embedded etcd client - EtcdStorage tools.StorageInterface + EtcdStorage storage.Interface } // Config is a struct of configuration directives for NewMasterComponents. @@ -119,13 +119,13 @@ func NewMasterComponents(c *Config) *MasterComponents { } // startMasterOrDie starts a kubernetes master and an httpserver to handle api requests -func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, tools.StorageInterface) { +func startMasterOrDie(masterConfig *master.Config) (*master.Master, *httptest.Server, storage.Interface) { var m *master.Master s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { m.Handler.ServeHTTP(w, req) })) - var etcdStorage tools.StorageInterface + var etcdStorage storage.Interface var err error if masterConfig == nil { etcdStorage, err = master.NewEtcdStorage(NewEtcdClient(), "", etcdtest.PathPrefix())