diff --git a/pkg/master/master.go b/pkg/master/master.go index 035dd4f201..59e67c4867 100644 --- a/pkg/master/master.go +++ b/pkg/master/master.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" @@ -54,6 +55,7 @@ type Master struct { podRegistry pod.Registry controllerRegistry controller.Registry serviceRegistry service.Registry + endpointRegistry endpoint.Registry minionRegistry minion.Registry bindingRegistry binding.Registry storage map[string]apiserver.RESTStorage @@ -68,6 +70,7 @@ func New(c *Config) *Master { podRegistry: etcd.NewRegistry(etcdClient, minionRegistry), controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry), serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry), + endpointRegistry: etcd.NewRegistry(etcdClient, minionRegistry), bindingRegistry: etcd.NewRegistry(etcdClient, minionRegistry), minionRegistry: minionRegistry, client: c.Client, @@ -118,6 +121,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf }), "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), + "endpoints": endpoint.NewStorage(m.endpointRegistry), "minions": minion.NewRegistryStorage(m.minionRegistry), // TODO: should appear only in scheduler API group. diff --git a/pkg/registry/endpoint/registry.go b/pkg/registry/endpoint/registry.go new file mode 100644 index 0000000000..5384563a48 --- /dev/null +++ b/pkg/registry/endpoint/registry.go @@ -0,0 +1,30 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Registry is an interface for things that know how to store endpoints. +type Registry interface { + GetEndpoints(name string) (*api.Endpoints, error) + WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + UpdateEndpoints(e api.Endpoints) error +} diff --git a/pkg/registry/endpoint/storage.go b/pkg/registry/endpoint/storage.go new file mode 100644 index 0000000000..eb8956056c --- /dev/null +++ b/pkg/registry/endpoint/storage.go @@ -0,0 +1,74 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "errors" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +// Storage adapts endpoints into apiserver's RESTStorage model. +type Storage struct { + registry Registry +} + +// NewStorage returns a new Storage implementation for endpoints +func NewStorage(registry Registry) apiserver.RESTStorage { + return &Storage{ + registry: registry, + } +} + +// Get satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Get(id string) (interface{}, error) { + return rs.registry.GetEndpoints(id) +} + +// List satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) List(selector labels.Selector) (interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Watch returns Endpoint events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchEndpoints(label, field, resourceVersion) +} + +// Create satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Update satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// Delete satisfies the RESTStorage interface but is unimplemented +func (rs *Storage) Delete(id string) (<-chan interface{}, error) { + return nil, errors.New("unimplemented") +} + +// New implements the RESTStorage interface +func (rs Storage) New() interface{} { + return &api.Endpoints{} +} diff --git a/pkg/registry/endpoint/storage_test.go b/pkg/registry/endpoint/storage_test.go new file mode 100644 index 0000000000..105c26f300 --- /dev/null +++ b/pkg/registry/endpoint/storage_test.go @@ -0,0 +1,69 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package endpoint + +import ( + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest" +) + +func TestGetEndpoints(t *testing.T) { + registry := ®istrytest.ServiceRegistry{ + Endpoints: api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"127.0.0.1:9000"}, + }, + } + storage := NewStorage(registry) + obj, err := storage.Get("foo") + if err != nil { + t.Fatalf("unexpected error: %#v", err) + } + if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) { + t.Errorf("unexpected endpoints: %#v", obj) + } +} + +func TestGetEndpointsMissingService(t *testing.T) { + registry := ®istrytest.ServiceRegistry{ + Err: apiserver.NewNotFoundErr("service", "foo"), + } + storage := NewStorage(registry) + + // returns service not found + _, err := storage.Get("foo") + if !apiserver.IsNotFound(err) || !reflect.DeepEqual(err, apiserver.NewNotFoundErr("service", "foo")) { + t.Errorf("expected NotFound error, got %#v", err) + } + + // returns empty endpoints + registry.Err = nil + registry.Service = &api.Service{ + JSONBase: api.JSONBase{ID: "foo"}, + } + obj, err := storage.Get("foo") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if obj.(*api.Endpoints).Endpoints != nil { + t.Errorf("unexpected endpoints: %#v", obj) + } +} diff --git a/pkg/registry/etcd/etcd.go b/pkg/registry/etcd/etcd.go index 41407b65c3..0c08c3b586 100644 --- a/pkg/registry/etcd/etcd.go +++ b/pkg/registry/etcd/etcd.go @@ -340,10 +340,54 @@ func (r *Registry) UpdateService(svc api.Service) error { return r.SetObj(makeServiceKey(svc.ID), svc) } +// WatchServices begins watching for new, changed, or deleted service configurations. +func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if !label.Empty() { + return nil, fmt.Errorf("label selectors are not supported on services") + } + if value, found := field.RequiresExactMatch("ID"); found { + return r.Watch(makeServiceKey(value), resourceVersion) + } + if field.Empty() { + return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything) + } + return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") +} + +// GetEndpoints obtains endpoints specified by a service name +func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) { + obj := &api.Endpoints{} + if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil { + if tools.IsEtcdNotFound(err) { + if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) { + return nil, apiserver.NewNotFoundErr("service", name) + } + return obj, nil + } + return nil, err + } + return obj, nil +} + // UpdateEndpoints update Endpoints of a Service. func (r *Registry) UpdateEndpoints(e api.Endpoints) error { return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, - func(interface{}) (interface{}, error) { + func(input interface{}) (interface{}, error) { + // TODO: racy - label query is returning different results for two simultaneous updaters return e, nil }) } + +// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations. +func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + if !label.Empty() { + return nil, fmt.Errorf("label selectors are not supported on endpoints") + } + if value, found := field.RequiresExactMatch("ID"); found { + return r.Watch(makeServiceEndpointsKey(value), resourceVersion) + } + if field.Empty() { + return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything) + } + return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported") +} diff --git a/pkg/registry/etcd/etcd_test.go b/pkg/registry/etcd/etcd_test.go index 1bf44bd501..e7d574962a 100644 --- a/pkg/registry/etcd/etcd_test.go +++ b/pkg/registry/etcd/etcd_test.go @@ -730,7 +730,7 @@ func TestEtcdGetService(t *testing.T) { } if service.ID != "foo" { - t.Errorf("Unexpected pod: %#v", service) + t.Errorf("Unexpected service: %#v", service) } } @@ -803,6 +803,23 @@ func TestEtcdUpdateService(t *testing.T) { } } +func TestEtcdGetEndpoints(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{ + JSONBase: api.JSONBase{ID: "foo"}, + Endpoints: []string{"127.0.0.1:34855"}, + }), 0) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + endpoints, err := registry.GetEndpoints("foo") + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) { + t.Errorf("Unexpected endpoints: %#v", endpoints) + } +} + func TestEtcdUpdateEndpoints(t *testing.T) { fakeClient := tools.NewFakeEtcdClient(t) fakeClient.TestIndex = true @@ -830,6 +847,104 @@ func TestEtcdUpdateEndpoints(t *testing.T) { } } +func TestEtcdWatchServices(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + watching, err := registry.WatchServices( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"ID": "foo"}), + 1, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchServicesBadSelector(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + _, err := registry.WatchServices( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } + + _, err = registry.WatchServices( + labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), + labels.Everything(), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } +} + +func TestEtcdWatchEndpoints(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + watching, err := registry.WatchEndpoints( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"ID": "foo"}), + 1, + ) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + fakeClient.WaitForWatchCompletion() + + select { + case _, ok := <-watching.ResultChan(): + if !ok { + t.Errorf("watching channel should be open") + } + default: + } + fakeClient.WatchInjectError <- nil + if _, ok := <-watching.ResultChan(); ok { + t.Errorf("watching channel should be closed") + } + watching.Stop() +} + +func TestEtcdWatchEndpointsBadSelector(t *testing.T) { + fakeClient := tools.NewFakeEtcdClient(t) + registry := NewTestEtcdRegistry(fakeClient, []string{"machine"}) + _, err := registry.WatchEndpoints( + labels.Everything(), + labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } + + _, err = registry.WatchEndpoints( + labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}), + labels.Everything(), + 0, + ) + if err == nil { + t.Errorf("unexpected non-error: %v", err) + } +} + // TODO We need a test for the compare and swap behavior. This basically requires two things: // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // channel, this will enable us to orchestrate the flow of etcd requests in the test. diff --git a/pkg/registry/registrytest/service.go b/pkg/registry/registrytest/service.go index 83a3ec8923..3353507728 100644 --- a/pkg/registry/registrytest/service.go +++ b/pkg/registry/registrytest/service.go @@ -18,6 +18,8 @@ package registrytest import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func NewServiceRegistry() *ServiceRegistry { @@ -60,7 +62,19 @@ func (r *ServiceRegistry) UpdateService(svc api.Service) error { return r.Err } +func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} + +func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) { + return &r.Endpoints, r.Err +} + func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error { r.Endpoints = e return r.Err } + +func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return nil, r.Err +} diff --git a/pkg/registry/service/registry.go b/pkg/registry/service/registry.go index 676ad46ab7..bcbfa46165 100644 --- a/pkg/registry/service/registry.go +++ b/pkg/registry/service/registry.go @@ -18,6 +18,9 @@ package service import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // Registry is an interface for things that know how to store services. @@ -27,5 +30,9 @@ type Registry interface { GetService(name string) (*api.Service, error) DeleteService(name string) error UpdateService(svc api.Service) error - UpdateEndpoints(e api.Endpoints) error + WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error) + + // TODO: endpoints and their implementation should be separated, setting endpoints should be + // supported via the API, and the endpoints-controller should use the API to update endpoints. + endpoint.Registry } diff --git a/pkg/registry/service/storage.go b/pkg/registry/service/storage.go index b73a17f731..494e0abc2e 100644 --- a/pkg/registry/service/storage.go +++ b/pkg/registry/service/storage.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // RegistryStorage adapts a service registry into apiserver's RESTStorage model. @@ -123,6 +124,12 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) { return list, err } +// Watch returns Services events via a watch.Interface. +// It implements apiserver.ResourceWatcher. +func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) { + return rs.registry.WatchServices(label, field, resourceVersion) +} + func (rs RegistryStorage) New() interface{} { return &api.Service{} } diff --git a/pkg/tools/etcd_tools_watch_test.go b/pkg/tools/etcd_tools_watch_test.go index b5639445bb..0d0378cae4 100644 --- a/pkg/tools/etcd_tools_watch_test.go +++ b/pkg/tools/etcd_tools_watch_test.go @@ -241,6 +241,122 @@ func TestWatch(t *testing.T) { } } +func TestWatchEtcdState(t *testing.T) { + type T struct { + Type watch.EventType + Endpoints []string + } + testCases := map[string]struct { + Initial map[string]EtcdResponseWithError + Responses []*etcd.Response + From uint64 + Expected []*T + }{ + "from not found": { + Initial: map[string]EtcdResponseWithError{}, + Responses: []*etcd.Response{ + { + Action: "create", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + }, + }, + }, + From: 1, + Expected: []*T{ + {watch.Added, nil}, + }, + }, + "from version 1": { + Responses: []*etcd.Response{ + { + Action: "compareAndSwap", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + PrevNode: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + }, + From: 1, + Expected: []*T{ + {watch.Modified, []string{"127.0.0.1:9000"}}, + }, + }, + "from initial state": { + Initial: map[string]EtcdResponseWithError{ + "/somekey/foo": { + R: &etcd.Response{ + Action: "get", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + EtcdIndex: 1, + }, + }, + }, + Responses: []*etcd.Response{ + nil, + { + Action: "compareAndSwap", + Node: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})), + CreatedIndex: 1, + ModifiedIndex: 2, + }, + PrevNode: &etcd.Node{ + Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})), + CreatedIndex: 1, + ModifiedIndex: 1, + }, + }, + }, + Expected: []*T{ + {watch.Added, nil}, + {watch.Modified, []string{"127.0.0.1:9000"}}, + }, + }, + } + + for k, testCase := range testCases { + fakeClient := NewFakeEtcdClient(t) + for key, value := range testCase.Initial { + fakeClient.Data[key] = value + } + h := EtcdHelper{fakeClient, codec, versioner} + watching, err := h.Watch("/somekey/foo", testCase.From) + if err != nil { + t.Errorf("%s: unexpected error: %v", k, err) + continue + } + fakeClient.WaitForWatchCompletion() + + t.Logf("Testing %v", k) + for i := range testCase.Responses { + if testCase.Responses[i] != nil { + fakeClient.WatchResponse <- testCase.Responses[i] + } + event := <-watching.ResultChan() + if e, a := testCase.Expected[i].Type, event.Type; e != a { + t.Errorf("%s: expected type %v, got %v", k, e, a) + break + } + if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Endpoints; !reflect.DeepEqual(e, a) { + t.Errorf("%s: expected type %v, got %v", k, e, a) + break + } + } + watching.Stop() + } +} + func TestWatchFromZeroIndex(t *testing.T) { pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}