From d11ab964467e7643c05545dcc6e047225f68d4c8 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 6 Aug 2015 10:46:45 +0200 Subject: [PATCH 1/2] Extract non-storage operations from service etcd --- pkg/master/master.go | 5 +- pkg/registry/etcd/etcd.go | 13 +---- pkg/registry/etcd/etcd_test.go | 70 ++------------------------- pkg/registry/registrytest/endpoint.go | 17 ++++++- pkg/registry/service/rest.go | 7 +++ 5 files changed, 28 insertions(+), 84 deletions(-) diff --git a/pkg/master/master.go b/pkg/master/master.go index 85bb7cdf8e..f591cf903f 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -448,10 +448,7 @@ func (m *Master) init(c *Config) { nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient) m.nodeRegistry = minion.NewRegistry(nodeStorage) - - // TODO: split me up into distinct storage registries - registry := etcd.NewRegistry(c.DatabaseStorage, m.endpointRegistry) - m.serviceRegistry = registry + m.serviceRegistry = etcd.NewRegistry(c.DatabaseStorage) var serviceClusterIPRegistry service.RangeRegistry serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 775557d12e..b71308289f 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -20,11 +20,9 @@ import ( "fmt" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" etcderr "k8s.io/kubernetes/pkg/api/errors/etcd" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/registry/endpoint" "k8s.io/kubernetes/pkg/storage" "k8s.io/kubernetes/pkg/watch" ) @@ -44,14 +42,12 @@ const ( // MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. type Registry struct { storage.Interface - endpoints endpoint.Registry } // NewRegistry creates an etcd registry. -func NewRegistry(storage storage.Interface, endpoints endpoint.Registry) *Registry { +func NewRegistry(storage storage.Interface) *Registry { registry := &Registry{ Interface: storage, - endpoints: endpoints, } return registry } @@ -132,13 +128,6 @@ func (r *Registry) DeleteService(ctx api.Context, name string) error { if err != nil { return etcderr.InterpretDeleteError(err, "service", name) } - - // TODO: can leave dangling endpoints, and potentially return incorrect - // endpoints if a new service is created with the same name - err = r.endpoints.DeleteEndpoints(ctx, name) - if err != nil && !errors.IsNotFound(err) { - return err - } return nil } diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 522d79955e..972a3cb9ca 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -25,8 +25,6 @@ import ( "k8s.io/kubernetes/pkg/api/latest" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/registry/endpoint" - endpointetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd" etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" "k8s.io/kubernetes/pkg/runtime" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" @@ -38,14 +36,13 @@ import ( func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - registry := NewRegistry(storage, nil) + registry := NewRegistry(storage) return registry } func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - endpointStorage := endpointetcd.NewStorage(etcdStorage) - registry := NewRegistry(etcdStorage, endpoint.NewRegistry(endpointStorage)) + registry := NewRegistry(etcdStorage) return registry } @@ -236,15 +233,12 @@ func TestEtcdDeleteService(t *testing.T) { t.Errorf("unexpected error: %v", err) } - if len(fakeClient.DeletedKeys) != 2 { + if len(fakeClient.DeletedKeys) != 1 { t.Errorf("Expected 2 delete, found %#v", fakeClient.DeletedKeys) } if fakeClient.DeletedKeys[0] != key { t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[0], key) } - if fakeClient.DeletedKeys[1] != endpointsKey { - t.Errorf("Unexpected key: %s, expected %s", fakeClient.DeletedKeys[1], endpointsKey) - } } func TestEtcdUpdateService(t *testing.T) { @@ -341,64 +335,6 @@ func TestEtcdWatchServicesBadSelector(t *testing.T) { } } -func TestEtcdWatchEndpoints(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistryWithPods(fakeClient) - watching, err := registry.endpoints.WatchEndpoints( - ctx, - labels.Everything(), - fields.SelectorFromSet(fields.Set{"name": "foo"}), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - -func TestEtcdWatchEndpointsAcrossNamespaces(t *testing.T) { - ctx := api.NewContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistryWithPods(fakeClient) - watching, err := registry.endpoints.WatchEndpoints( - ctx, - labels.Everything(), - fields.Everything(), - "1", - ) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - fakeClient.WaitForWatchCompletion() - - select { - case _, ok := <-watching.ResultChan(): - if !ok { - t.Errorf("watching channel should be open") - } - default: - } - fakeClient.WatchInjectError <- nil - if _, ok := <-watching.ResultChan(); ok { - t.Errorf("watching channel should be closed") - } - watching.Stop() -} - // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/registrytest/endpoint.go b/pkg/registry/registrytest/endpoint.go index 22855f8a2b..a389caf27f 100644 --- a/pkg/registry/registrytest/endpoint.go +++ b/pkg/registry/registrytest/endpoint.go @@ -93,5 +93,20 @@ func (e *EndpointRegistry) UpdateEndpoints(ctx api.Context, endpoints *api.Endpo } func (e *EndpointRegistry) DeleteEndpoints(ctx api.Context, name string) error { - return fmt.Errorf("unimplemented!") + // TODO: support namespaces in this mock + e.lock.Lock() + defer e.lock.Unlock() + if e.Err != nil { + return e.Err + } + if e.Endpoints != nil { + var newList []api.Endpoints + for _, endpoint := range e.Endpoints.Items { + if endpoint.Name != name { + newList = append(newList, endpoint) + } + } + e.Endpoints.Items = newList + } + return nil } diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 48ac719611..6b010fb3e8 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -144,6 +144,13 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { return nil, err } + // TODO: can leave dangling endpoints, and potentially return incorrect + // endpoints if a new service is created with the same name + err = rs.endpoints.DeleteEndpoints(ctx, id) + if err != nil && !errors.IsNotFound(err) { + return nil, err + } + if api.IsServiceIPSet(service) { rs.serviceIPs.Release(net.ParseIP(service.Spec.ClusterIP)) } From 79125f460c5449f944840b072b70434c8b5c7e47 Mon Sep 17 00:00:00 2001 From: Wojciech Tyczynski Date: Thu, 6 Aug 2015 12:02:01 +0200 Subject: [PATCH 2/2] Services using standard REST storage --- contrib/mesos/pkg/scheduler/podtask/leaky.go | 4 +- pkg/master/master.go | 6 +- pkg/registry/etcd/doc.go | 19 -- pkg/registry/etcd/etcd.go | 166 ------------------ pkg/registry/registrytest/service.go | 2 +- pkg/registry/service/etcd/etcd.go | 75 ++++++++ .../service/ipallocator/controller/repair.go | 4 +- .../portallocator/controller/repair.go | 4 +- pkg/registry/service/registry.go | 55 +++++- .../etcd_test.go => service/registry_test.go} | 116 ++++++------ pkg/registry/service/rest.go | 19 +- 11 files changed, 194 insertions(+), 276 deletions(-) delete mode 100644 pkg/registry/etcd/doc.go delete mode 100644 pkg/registry/etcd/etcd.go create mode 100644 pkg/registry/service/etcd/etcd.go rename pkg/registry/{etcd/etcd_test.go => service/registry_test.go} (76%) diff --git a/contrib/mesos/pkg/scheduler/podtask/leaky.go b/contrib/mesos/pkg/scheduler/podtask/leaky.go index 653ba2f7b1..4105bb32d4 100644 --- a/contrib/mesos/pkg/scheduler/podtask/leaky.go +++ b/contrib/mesos/pkg/scheduler/podtask/leaky.go @@ -20,10 +20,10 @@ package podtask import ( "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/registry/etcd" + "k8s.io/kubernetes/pkg/registry/generic/etcd" ) // makePodKey constructs etcd paths to pod items enforcing namespace rules. func MakePodKey(ctx api.Context, id string) (string, error) { - return etcd.MakeEtcdItemKey(ctx, PodPath, id) + return etcd.NamespaceKeyFunc(ctx, PodPath, id) } diff --git a/pkg/master/master.go b/pkg/master/master.go index f591cf903f..2b8a9078b9 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -53,7 +53,6 @@ import ( controlleretcd "k8s.io/kubernetes/pkg/registry/controller/etcd" "k8s.io/kubernetes/pkg/registry/endpoint" endpointsetcd "k8s.io/kubernetes/pkg/registry/endpoint/etcd" - "k8s.io/kubernetes/pkg/registry/etcd" "k8s.io/kubernetes/pkg/registry/event" "k8s.io/kubernetes/pkg/registry/limitrange" "k8s.io/kubernetes/pkg/registry/minion" @@ -68,6 +67,7 @@ import ( secretetcd "k8s.io/kubernetes/pkg/registry/secret/etcd" "k8s.io/kubernetes/pkg/registry/service" etcdallocator "k8s.io/kubernetes/pkg/registry/service/allocator/etcd" + serviceetcd "k8s.io/kubernetes/pkg/registry/service/etcd" ipallocator "k8s.io/kubernetes/pkg/registry/service/ipallocator" serviceaccountetcd "k8s.io/kubernetes/pkg/registry/serviceaccount/etcd" "k8s.io/kubernetes/pkg/storage" @@ -448,7 +448,9 @@ func (m *Master) init(c *Config) { nodeStorage, nodeStatusStorage := nodeetcd.NewStorage(c.DatabaseStorage, c.KubeletClient) m.nodeRegistry = minion.NewRegistry(nodeStorage) - m.serviceRegistry = etcd.NewRegistry(c.DatabaseStorage) + + serviceStorage := serviceetcd.NewStorage(c.DatabaseStorage) + m.serviceRegistry = service.NewRegistry(serviceStorage) var serviceClusterIPRegistry service.RangeRegistry serviceClusterIPAllocator := ipallocator.NewAllocatorCIDRRange(m.serviceClusterIPRange, func(max int, rangeSpec string) allocator.Interface { diff --git a/pkg/registry/etcd/doc.go b/pkg/registry/etcd/doc.go deleted file mode 100644 index 134a37a622..0000000000 --- a/pkg/registry/etcd/doc.go +++ /dev/null @@ -1,19 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package etcd provides etcd backend implementation for storing -// PodRegistry, ControllerRegistry and ServiceRegistry api objects. -package etcd diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go deleted file mode 100644 index b71308289f..0000000000 --- a/pkg/registry/etcd/etcd.go +++ /dev/null @@ -1,166 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors All rights reserved. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package etcd - -import ( - "fmt" - - "k8s.io/kubernetes/pkg/api" - etcderr "k8s.io/kubernetes/pkg/api/errors/etcd" - "k8s.io/kubernetes/pkg/fields" - "k8s.io/kubernetes/pkg/labels" - "k8s.io/kubernetes/pkg/storage" - "k8s.io/kubernetes/pkg/watch" -) - -const ( - // ServicePath is the path to service resources in etcd - ServicePath string = "/services/specs" -) - -// TODO(wojtek-t): Change it to use rest.StandardStorage (as everything else) -// and move it to service/ directory. - -// TODO: Need to add a reconciler loop that makes sure that things in pods are reflected into -// kubelet (and vice versa) - -// Registry implements BindingRegistry, ControllerRegistry, EndpointRegistry, -// MinionRegistry, PodRegistry and ServiceRegistry, backed by etcd. -type Registry struct { - storage.Interface -} - -// NewRegistry creates an etcd registry. -func NewRegistry(storage storage.Interface) *Registry { - registry := &Registry{ - Interface: storage, - } - return registry -} - -// MakeEtcdListKey constructs etcd paths to resource directories enforcing namespace rules -func MakeEtcdListKey(ctx api.Context, prefix string) string { - key := prefix - ns, ok := api.NamespaceFrom(ctx) - if ok && len(ns) > 0 { - key = key + "/" + ns - } - return key -} - -// MakeEtcdItemKey constructs etcd paths to a resource relative to prefix enforcing namespace rules. If no namespace is on context, it errors. -func MakeEtcdItemKey(ctx api.Context, prefix string, id string) (string, error) { - key := MakeEtcdListKey(ctx, prefix) - ns, ok := api.NamespaceFrom(ctx) - if !ok || len(ns) == 0 { - return "", fmt.Errorf("invalid request. Namespace parameter required.") - } - if len(id) == 0 { - return "", fmt.Errorf("invalid request. Id parameter required.") - } - key = key + "/" + id - return key, nil -} - -// makePodListKey constructs etcd paths to service directories enforcing namespace rules. -func makeServiceListKey(ctx api.Context) string { - return MakeEtcdListKey(ctx, ServicePath) -} - -// makeServiceKey constructs etcd paths to service items enforcing namespace rules. -func makeServiceKey(ctx api.Context, name string) (string, error) { - return MakeEtcdItemKey(ctx, ServicePath, name) -} - -// ListServices obtains a list of Services. -func (r *Registry) ListServices(ctx api.Context) (*api.ServiceList, error) { - list := &api.ServiceList{} - err := r.List(makeServiceListKey(ctx), list) - return list, err -} - -// CreateService creates a new Service. -func (r *Registry) CreateService(ctx api.Context, svc *api.Service) (*api.Service, error) { - key, err := makeServiceKey(ctx, svc.Name) - if err != nil { - return nil, err - } - out := &api.Service{} - err = r.Create(key, svc, out, 0) - return out, etcderr.InterpretCreateError(err, "service", svc.Name) -} - -// GetService obtains a Service specified by its name. -func (r *Registry) GetService(ctx api.Context, name string) (*api.Service, error) { - key, err := makeServiceKey(ctx, name) - if err != nil { - return nil, err - } - var svc api.Service - err = r.Get(key, &svc, false) - if err != nil { - return nil, etcderr.InterpretGetError(err, "service", name) - } - return &svc, nil -} - -// DeleteService deletes a Service specified by its name. -func (r *Registry) DeleteService(ctx api.Context, name string) error { - key, err := makeServiceKey(ctx, name) - if err != nil { - return err - } - err = r.RecursiveDelete(key, true) - if err != nil { - return etcderr.InterpretDeleteError(err, "service", name) - } - return nil -} - -// UpdateService replaces an existing Service. -func (r *Registry) UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error) { - key, err := makeServiceKey(ctx, svc.Name) - if err != nil { - return nil, err - } - out := &api.Service{} - err = r.Set(key, svc, out, 0) - return out, etcderr.InterpretUpdateError(err, "service", svc.Name) -} - -// WatchServices begins watching for new, changed, or deleted service configurations. -func (r *Registry) WatchServices(ctx api.Context, label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { - version, err := storage.ParseWatchResourceVersion(resourceVersion, "service") - if err != nil { - return nil, err - } - if !label.Empty() { - return nil, fmt.Errorf("label selectors are not supported on services") - } - if value, found := field.RequiresExactMatch("name"); found { - key, err := makeServiceKey(ctx, value) - if err != nil { - return nil, err - } - // TODO: use generic.SelectionPredicate - return r.Watch(key, version, storage.Everything) - } - if field.Empty() { - return r.WatchList(makeServiceListKey(ctx), version, storage.Everything) - } - return nil, fmt.Errorf("only the 'name' and default (everything) field selectors are supported") -} diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 3b49d39eb8..56439454fa 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -46,7 +46,7 @@ func (r *ServiceRegistry) SetError(err error) { r.Err = err } -func (r *ServiceRegistry) ListServices(ctx api.Context) (*api.ServiceList, error) { +func (r *ServiceRegistry) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) { r.mu.Lock() defer r.mu.Unlock() diff --git a/pkg/registry/service/etcd/etcd.go b/pkg/registry/service/etcd/etcd.go new file mode 100644 index 0000000000..e7ba1e374a --- /dev/null +++ b/pkg/registry/service/etcd/etcd.go @@ -0,0 +1,75 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package etcd + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/rest" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/registry/generic" + etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/storage" +) + +type REST struct { + etcdgeneric.Etcd +} + +func NewStorage(s storage.Interface) *REST { + prefix := "/services/specs" + store := etcdgeneric.Etcd{ + NewFunc: func() runtime.Object { return &api.Service{} }, + NewListFunc: func() runtime.Object { return &api.ServiceList{} }, + KeyRootFunc: func(ctx api.Context) string { + return etcdgeneric.NamespaceKeyRootFunc(ctx, prefix) + }, + KeyFunc: func(ctx api.Context, name string) (string, error) { + return etcdgeneric.NamespaceKeyFunc(ctx, prefix, name) + }, + ObjectNameFunc: func(obj runtime.Object) (string, error) { + return obj.(*api.Service).Name, nil + }, + PredicateFunc: func(label labels.Selector, field fields.Selector) generic.Matcher { + return MatchServices(label, field) + }, + EndpointName: "services", + + CreateStrategy: rest.Services, + UpdateStrategy: rest.Services, + + Storage: s, + } + return &REST{store} +} + +func MatchServices(label labels.Selector, field fields.Selector) generic.Matcher { + return &generic.SelectionPredicate{label, field, ServiceAttributes} +} + +func ServiceAttributes(obj runtime.Object) (objLabels labels.Set, objFields fields.Set, err error) { + service, ok := obj.(*api.Service) + if !ok { + return nil, nil, fmt.Errorf("invalid object type %#v", obj) + } + return service.Labels, fields.Set{ + "metadata.name": service.Name, + }, nil +} diff --git a/pkg/registry/service/ipallocator/controller/repair.go b/pkg/registry/service/ipallocator/controller/repair.go index 3cc036cda3..2ebd06039d 100644 --- a/pkg/registry/service/ipallocator/controller/repair.go +++ b/pkg/registry/service/ipallocator/controller/repair.go @@ -22,6 +22,8 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service/ipallocator" "k8s.io/kubernetes/pkg/util" @@ -94,7 +96,7 @@ func (c *Repair) RunOnce() error { } ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) - list, err := c.registry.ListServices(ctx) + list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything()) if err != nil { return fmt.Errorf("unable to refresh the service IP block: %v", err) } diff --git a/pkg/registry/service/portallocator/controller/repair.go b/pkg/registry/service/portallocator/controller/repair.go index 7ffb415ae7..936221e2d2 100644 --- a/pkg/registry/service/portallocator/controller/repair.go +++ b/pkg/registry/service/portallocator/controller/repair.go @@ -21,6 +21,8 @@ import ( "time" "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/registry/service" "k8s.io/kubernetes/pkg/registry/service/portallocator" "k8s.io/kubernetes/pkg/util" @@ -79,7 +81,7 @@ func (c *Repair) RunOnce() error { } ctx := api.WithNamespace(api.NewDefaultContext(), api.NamespaceAll) - list, err := c.registry.ListServices(ctx) + list, err := c.registry.ListServices(ctx, labels.Everything(), fields.Everything()) if err != nil { return fmt.Errorf("unable to refresh the port block: %v", err) } diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index b918605bf1..779fbc2f4c 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -18,6 +18,7 @@ package service import ( "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/rest" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" "k8s.io/kubernetes/pkg/watch" @@ -25,7 +26,7 @@ import ( // Registry is an interface for things that know how to store services. type Registry interface { - ListServices(ctx api.Context) (*api.ServiceList, error) + ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) CreateService(ctx api.Context, svc *api.Service) (*api.Service, error) GetService(ctx api.Context, name string) (*api.Service, error) DeleteService(ctx api.Context, name string) error @@ -33,6 +34,58 @@ type Registry interface { WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) } +// storage puts strong typing around storage calls +type storage struct { + rest.StandardStorage +} + +// NewRegistry returns a new Registry interface for the given Storage. Any mismatched +// types will panic. +func NewRegistry(s rest.StandardStorage) Registry { + return &storage{s} +} + +func (s *storage) ListServices(ctx api.Context, label labels.Selector, field fields.Selector) (*api.ServiceList, error) { + obj, err := s.List(ctx, label, field) + if err != nil { + return nil, err + } + return obj.(*api.ServiceList), nil +} + +func (s *storage) CreateService(ctx api.Context, svc *api.Service) (*api.Service, error) { + obj, err := s.Create(ctx, svc) + if err != nil { + return nil, err + } + return obj.(*api.Service), nil +} + +func (s *storage) GetService(ctx api.Context, name string) (*api.Service, error) { + obj, err := s.Get(ctx, name) + if err != nil { + return nil, err + } + return obj.(*api.Service), nil +} + +func (s *storage) DeleteService(ctx api.Context, name string) error { + _, err := s.Delete(ctx, name, nil) + return err +} + +func (s *storage) UpdateService(ctx api.Context, svc *api.Service) (*api.Service, error) { + obj, _, err := s.Update(ctx, svc) + if err != nil { + return nil, err + } + return obj.(*api.Service), nil +} + +func (s *storage) WatchServices(ctx api.Context, labels labels.Selector, fields fields.Selector, resourceVersion string) (watch.Interface, error) { + return s.Watch(ctx, labels, fields, resourceVersion) +} + // TODO: Move to a general location (as other components may need allocation in future; it's not service specific) // RangeRegistry is a registry that can retrieve or persist a RangeAllocation object. type RangeRegistry interface { diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/service/registry_test.go similarity index 76% rename from pkg/registry/etcd/etcd_test.go rename to pkg/registry/service/registry_test.go index 972a3cb9ca..1c166c628a 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/service/registry_test.go @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package etcd +package service import ( "strconv" @@ -26,37 +26,47 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" etcdgeneric "k8s.io/kubernetes/pkg/registry/generic/etcd" + etcdservice "k8s.io/kubernetes/pkg/registry/service/etcd" "k8s.io/kubernetes/pkg/runtime" etcdstorage "k8s.io/kubernetes/pkg/storage/etcd" "k8s.io/kubernetes/pkg/tools" "k8s.io/kubernetes/pkg/tools/etcdtest" + "k8s.io/kubernetes/pkg/util" "github.com/coreos/go-etcd/etcd" ) -func NewTestEtcdRegistry(client tools.EtcdClient) *Registry { +func NewTestEtcdRegistry(client tools.EtcdClient) (Registry, *etcdservice.REST) { storage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - registry := NewRegistry(storage) - return registry + rest := etcdservice.NewStorage(storage) + registry := NewRegistry(rest) + return registry, rest } -func NewTestEtcdRegistryWithPods(client tools.EtcdClient) *Registry { - etcdStorage := etcdstorage.NewEtcdStorage(client, latest.Codec, etcdtest.PathPrefix()) - registry := NewRegistry(etcdStorage) - return registry +func makeTestService(name string) *api.Service { + return &api.Service{ + ObjectMeta: api.ObjectMeta{Name: name, Namespace: "default"}, + Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "port", Protocol: api.ProtocolTCP, Port: 12345, TargetPort: util.NewIntOrStringFromInt(12345)}, + }, + Type: api.ServiceTypeClusterIP, + SessionAffinity: api.ServiceAffinityNone, + }, + } } func TestEtcdListServicesNotFound(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) + registry, rest := NewTestEtcdRegistry(fakeClient) ctx := api.NewDefaultContext() - key := makeServiceListKey(ctx) + key := rest.KeyRootFunc(ctx) key = etcdtest.AddPrefix(key) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{}, E: tools.EtcdErrorNotFound, } - services, err := registry.ListServices(ctx) + services, err := registry.ListServices(ctx, labels.Everything(), fields.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -69,25 +79,25 @@ func TestEtcdListServicesNotFound(t *testing.T) { func TestEtcdListServices(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - key := makeServiceListKey(ctx) + registry, rest := NewTestEtcdRegistry(fakeClient) + key := rest.KeyRootFunc(ctx) key = etcdtest.AddPrefix(key) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ Node: &etcd.Node{ Nodes: []*etcd.Node{ { - Value: runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), + Value: runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), }, { - Value: runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}}), + Value: runtime.EncodeOrDie(latest.Codec, makeTestService("bar")), }, }, }, }, E: nil, } - services, err := registry.ListServices(ctx) + services, err := registry.ListServices(ctx, labels.Everything(), fields.Everything()) if err != nil { t.Errorf("unexpected error: %v", err) } @@ -100,15 +110,13 @@ func TestEtcdListServices(t *testing.T) { func TestEtcdCreateService(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.CreateService(ctx, &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }) + registry, rest := NewTestEtcdRegistry(fakeClient) + _, err := registry.CreateService(ctx, makeTestService("foo")) if err != nil { t.Errorf("unexpected error: %v", err) } - key, _ := makeServiceKey(ctx, "foo") + key, _ := rest.KeyFunc(ctx, "foo") key = etcdtest.AddPrefix(key) resp, err := fakeClient.Get(key, false, false) if err != nil { @@ -129,13 +137,11 @@ func TestEtcdCreateService(t *testing.T) { func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - key, _ := makeServiceKey(ctx, "foo") + registry, rest := NewTestEtcdRegistry(fakeClient) + key, _ := rest.KeyFunc(ctx, "foo") key = etcdtest.AddPrefix(key) - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) - _, err := registry.CreateService(ctx, &api.Service{ - ObjectMeta: api.ObjectMeta{Name: "foo"}, - }) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0) + _, err := registry.CreateService(ctx, makeTestService("foo")) if !errors.IsAlreadyExists(err) { t.Errorf("expected already exists err, got %#v", err) } @@ -144,13 +150,13 @@ func TestEtcdCreateServiceAlreadyExisting(t *testing.T) { // TestEtcdGetServiceDifferentNamespace ensures same-name services in different namespaces do not clash func TestEtcdGetServiceDifferentNamespace(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) + registry, rest := NewTestEtcdRegistry(fakeClient) ctx1 := api.NewDefaultContext() ctx2 := api.WithNamespace(api.NewContext(), "other") - key1, _ := makeServiceKey(ctx1, "foo") - key2, _ := makeServiceKey(ctx2, "foo") + key1, _ := rest.KeyFunc(ctx1, "foo") + key2, _ := rest.KeyFunc(ctx2, "foo") key1 = etcdtest.AddPrefix(key1) key2 = etcdtest.AddPrefix(key2) @@ -185,10 +191,10 @@ func TestEtcdGetServiceDifferentNamespace(t *testing.T) { func TestEtcdGetService(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - key, _ := makeServiceKey(ctx, "foo") + registry, rest := NewTestEtcdRegistry(fakeClient) + key, _ := rest.KeyFunc(ctx, "foo") key = etcdtest.AddPrefix(key) - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0) service, err := registry.GetService(ctx, "foo") if err != nil { t.Errorf("unexpected error: %v", err) @@ -202,8 +208,8 @@ func TestEtcdGetService(t *testing.T) { func TestEtcdGetServiceNotFound(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - key, _ := makeServiceKey(ctx, "foo") + registry, rest := NewTestEtcdRegistry(fakeClient) + key, _ := rest.KeyFunc(ctx, "foo") key = etcdtest.AddPrefix(key) fakeClient.Data[key] = tools.EtcdResponseWithError{ R: &etcd.Response{ @@ -220,10 +226,10 @@ func TestEtcdGetServiceNotFound(t *testing.T) { func TestEtcdDeleteService(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistryWithPods(fakeClient) + registry, _ := NewTestEtcdRegistry(fakeClient) key, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/services/specs", "foo") key = etcdtest.AddPrefix(key) - fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) + fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("foo")), 0) path, _ := etcdgeneric.NamespaceKeyFunc(ctx, "/services/endpoints", "foo") endpointsKey := etcdtest.AddPrefix(path) fakeClient.Set(endpointsKey, runtime.EncodeOrDie(latest.Codec, &api.Endpoints{ObjectMeta: api.ObjectMeta{Name: "foo"}}), 0) @@ -245,10 +251,10 @@ func TestEtcdUpdateService(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true - registry := NewTestEtcdRegistry(fakeClient) - key, _ := makeServiceKey(ctx, "uniquefoo") + registry, rest := NewTestEtcdRegistry(fakeClient) + key, _ := rest.KeyFunc(ctx, "uniquefoo") key = etcdtest.AddPrefix(key) - resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, &api.Service{ObjectMeta: api.ObjectMeta{Name: "uniquefoo"}}), 0) + resp, _ := fakeClient.Set(key, runtime.EncodeOrDie(latest.Codec, makeTestService("uniquefoo")), 0) testService := api.Service{ ObjectMeta: api.ObjectMeta{ Name: "uniquefoo", @@ -258,6 +264,9 @@ func TestEtcdUpdateService(t *testing.T) { }, }, Spec: api.ServiceSpec{ + Ports: []api.ServicePort{ + {Name: "port", Protocol: api.ProtocolTCP, Port: 12345, TargetPort: util.NewIntOrStringFromInt(12345)}, + }, Selector: map[string]string{ "baz": "bar", }, @@ -285,7 +294,7 @@ func TestEtcdUpdateService(t *testing.T) { func TestEtcdWatchServices(t *testing.T) { ctx := api.NewDefaultContext() fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) + registry, _ := NewTestEtcdRegistry(fakeClient) watching, err := registry.WatchServices(ctx, labels.Everything(), fields.SelectorFromSet(fields.Set{"name": "foo"}), @@ -310,31 +319,6 @@ func TestEtcdWatchServices(t *testing.T) { watching.Stop() } -func TestEtcdWatchServicesBadSelector(t *testing.T) { - ctx := api.NewDefaultContext() - fakeClient := tools.NewFakeEtcdClient(t) - registry := NewTestEtcdRegistry(fakeClient) - _, err := registry.WatchServices( - ctx, - labels.Everything(), - fields.SelectorFromSet(fields.Set{"Field.Selector": "foo"}), - "", - ) - if err == nil { - t.Errorf("unexpected non-error: %v", err) - } - - _, err = registry.WatchServices( - ctx, - labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), - fields.Everything(), - "", - ) - if err == nil { - t.Errorf("unexpected non-error: %v", err) - } -} - // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/service/rest.go b/pkg/registry/service/rest.go index 6b010fb3e8..d2009402e1 100644 --- a/pkg/registry/service/rest.go +++ b/pkg/registry/service/rest.go @@ -167,26 +167,11 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) { } func (rs *REST) Get(ctx api.Context, id string) (runtime.Object, error) { - service, err := rs.registry.GetService(ctx, id) - if err != nil { - return nil, err - } - return service, err + return rs.registry.GetService(ctx, id) } func (rs *REST) List(ctx api.Context, label labels.Selector, field fields.Selector) (runtime.Object, error) { - list, err := rs.registry.ListServices(ctx) - if err != nil { - return nil, err - } - var filtered []api.Service - for _, service := range list.Items { - if label.Matches(labels.Set(service.Labels)) { - filtered = append(filtered, service) - } - } - list.Items = filtered - return list, err + return rs.registry.ListServices(ctx, label, field) } // Watch returns Services events via a watch.Interface.