Expose REST resource for endpoints and watch on services/endpoints

Will allow kube-proxies to listen on endpoints.
pull/6/head
Clayton Coleman 2014-08-14 15:48:34 -04:00
parent b5e1e044bc
commit 083d81b6d7
10 changed files with 483 additions and 3 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider" "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/binding"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/controller"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/etcd"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/pod"
@ -54,6 +55,7 @@ type Master struct {
podRegistry pod.Registry podRegistry pod.Registry
controllerRegistry controller.Registry controllerRegistry controller.Registry
serviceRegistry service.Registry serviceRegistry service.Registry
endpointRegistry endpoint.Registry
minionRegistry minion.Registry minionRegistry minion.Registry
bindingRegistry binding.Registry bindingRegistry binding.Registry
storage map[string]apiserver.RESTStorage storage map[string]apiserver.RESTStorage
@ -68,6 +70,7 @@ func New(c *Config) *Master {
podRegistry: etcd.NewRegistry(etcdClient, minionRegistry), podRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry), controllerRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry), serviceRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
endpointRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
bindingRegistry: etcd.NewRegistry(etcdClient, minionRegistry), bindingRegistry: etcd.NewRegistry(etcdClient, minionRegistry),
minionRegistry: minionRegistry, minionRegistry: minionRegistry,
client: c.Client, client: c.Client,
@ -118,6 +121,7 @@ func (m *Master) init(cloud cloudprovider.Interface, podInfoGetter client.PodInf
}), }),
"replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry), "replicationControllers": controller.NewRegistryStorage(m.controllerRegistry, m.podRegistry),
"services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry), "services": service.NewRegistryStorage(m.serviceRegistry, cloud, m.minionRegistry),
"endpoints": endpoint.NewStorage(m.endpointRegistry),
"minions": minion.NewRegistryStorage(m.minionRegistry), "minions": minion.NewRegistryStorage(m.minionRegistry),
// TODO: should appear only in scheduler API group. // TODO: should appear only in scheduler API group.

View File

@ -0,0 +1,30 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Registry is an interface for things that know how to store endpoints.
type Registry interface {
GetEndpoints(name string) (*api.Endpoints, error)
WatchEndpoints(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
UpdateEndpoints(e api.Endpoints) error
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"errors"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
// Storage adapts endpoints into apiserver's RESTStorage model.
type Storage struct {
registry Registry
}
// NewStorage returns a new Storage implementation for endpoints
func NewStorage(registry Registry) apiserver.RESTStorage {
return &Storage{
registry: registry,
}
}
// Get satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Get(id string) (interface{}, error) {
return rs.registry.GetEndpoints(id)
}
// List satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) List(selector labels.Selector) (interface{}, error) {
return nil, errors.New("unimplemented")
}
// Watch returns Endpoint events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *Storage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchEndpoints(label, field, resourceVersion)
}
// Create satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Create(obj interface{}) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// Update satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Update(obj interface{}) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// Delete satisfies the RESTStorage interface but is unimplemented
func (rs *Storage) Delete(id string) (<-chan interface{}, error) {
return nil, errors.New("unimplemented")
}
// New implements the RESTStorage interface
func (rs Storage) New() interface{} {
return &api.Endpoints{}
}

View File

@ -0,0 +1,69 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package endpoint
import (
"reflect"
"testing"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
)
func TestGetEndpoints(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Endpoints: api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"127.0.0.1:9000"},
},
}
storage := NewStorage(registry)
obj, err := storage.Get("foo")
if err != nil {
t.Fatalf("unexpected error: %#v", err)
}
if !reflect.DeepEqual([]string{"127.0.0.1:9000"}, obj.(*api.Endpoints).Endpoints) {
t.Errorf("unexpected endpoints: %#v", obj)
}
}
func TestGetEndpointsMissingService(t *testing.T) {
registry := &registrytest.ServiceRegistry{
Err: apiserver.NewNotFoundErr("service", "foo"),
}
storage := NewStorage(registry)
// returns service not found
_, err := storage.Get("foo")
if !apiserver.IsNotFound(err) || !reflect.DeepEqual(err, apiserver.NewNotFoundErr("service", "foo")) {
t.Errorf("expected NotFound error, got %#v", err)
}
// returns empty endpoints
registry.Err = nil
registry.Service = &api.Service{
JSONBase: api.JSONBase{ID: "foo"},
}
obj, err := storage.Get("foo")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if obj.(*api.Endpoints).Endpoints != nil {
t.Errorf("unexpected endpoints: %#v", obj)
}
}

View File

@ -340,10 +340,54 @@ func (r *Registry) UpdateService(svc api.Service) error {
return r.SetObj(makeServiceKey(svc.ID), svc) return r.SetObj(makeServiceKey(svc.ID), svc)
} }
// WatchServices begins watching for new, changed, or deleted service configurations.
func (r *Registry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on services")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceKey(value), resourceVersion)
}
if field.Empty() {
return r.WatchList("/registry/services/specs", resourceVersion, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}
// GetEndpoints obtains endpoints specified by a service name
func (r *Registry) GetEndpoints(name string) (*api.Endpoints, error) {
obj := &api.Endpoints{}
if err := r.ExtractObj(makeServiceEndpointsKey(name), obj, false); err != nil {
if tools.IsEtcdNotFound(err) {
if _, err := r.GetService(name); err != nil && apiserver.IsNotFound(err) {
return nil, apiserver.NewNotFoundErr("service", name)
}
return obj, nil
}
return nil, err
}
return obj, nil
}
// UpdateEndpoints update Endpoints of a Service. // UpdateEndpoints update Endpoints of a Service.
func (r *Registry) UpdateEndpoints(e api.Endpoints) error { func (r *Registry) UpdateEndpoints(e api.Endpoints) error {
return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{}, return r.AtomicUpdate(makeServiceEndpointsKey(e.ID), &api.Endpoints{},
func(interface{}) (interface{}, error) { func(input interface{}) (interface{}, error) {
// TODO: racy - label query is returning different results for two simultaneous updaters
return e, nil return e, nil
}) })
} }
// WatchEndpoints begins watching for new, changed, or deleted endpoint configurations.
func (r *Registry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
if !label.Empty() {
return nil, fmt.Errorf("label selectors are not supported on endpoints")
}
if value, found := field.RequiresExactMatch("ID"); found {
return r.Watch(makeServiceEndpointsKey(value), resourceVersion)
}
if field.Empty() {
return r.WatchList("/registry/services/endpoints", resourceVersion, tools.Everything)
}
return nil, fmt.Errorf("only the 'ID' and default (everything) field selectors are supported")
}

View File

@ -730,7 +730,7 @@ func TestEtcdGetService(t *testing.T) {
} }
if service.ID != "foo" { if service.ID != "foo" {
t.Errorf("Unexpected pod: %#v", service) t.Errorf("Unexpected service: %#v", service)
} }
} }
@ -803,6 +803,23 @@ func TestEtcdUpdateService(t *testing.T) {
} }
} }
func TestEtcdGetEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.Set("/registry/services/endpoints/foo", api.EncodeOrDie(api.Endpoints{
JSONBase: api.JSONBase{ID: "foo"},
Endpoints: []string{"127.0.0.1:34855"},
}), 0)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
endpoints, err := registry.GetEndpoints("foo")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
if endpoints.ID != "foo" || !reflect.DeepEqual(endpoints.Endpoints, []string{"127.0.0.1:34855"}) {
t.Errorf("Unexpected endpoints: %#v", endpoints)
}
}
func TestEtcdUpdateEndpoints(t *testing.T) { func TestEtcdUpdateEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t) fakeClient := tools.NewFakeEtcdClient(t)
fakeClient.TestIndex = true fakeClient.TestIndex = true
@ -830,6 +847,104 @@ func TestEtcdUpdateEndpoints(t *testing.T) {
} }
} }
func TestEtcdWatchServices(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
watching, err := registry.WatchServices(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchServicesBadSelector(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
_, err := registry.WatchServices(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
_, err = registry.WatchServices(
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
}
func TestEtcdWatchEndpoints(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
watching, err := registry.WatchEndpoints(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"ID": "foo"}),
1,
)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
fakeClient.WaitForWatchCompletion()
select {
case _, ok := <-watching.ResultChan():
if !ok {
t.Errorf("watching channel should be open")
}
default:
}
fakeClient.WatchInjectError <- nil
if _, ok := <-watching.ResultChan(); ok {
t.Errorf("watching channel should be closed")
}
watching.Stop()
}
func TestEtcdWatchEndpointsBadSelector(t *testing.T) {
fakeClient := tools.NewFakeEtcdClient(t)
registry := NewTestEtcdRegistry(fakeClient, []string{"machine"})
_, err := registry.WatchEndpoints(
labels.Everything(),
labels.SelectorFromSet(labels.Set{"Field.Selector": "foo"}),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
_, err = registry.WatchEndpoints(
labels.SelectorFromSet(labels.Set{"Label.Selector": "foo"}),
labels.Everything(),
0,
)
if err == nil {
t.Errorf("unexpected non-error: %v", err)
}
}
// TODO We need a test for the compare and swap behavior. This basically requires two things: // TODO We need a test for the compare and swap behavior. This basically requires two things:
// 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that // 1) Add a per-operation synchronization channel to the fake etcd client, such that any operation waits on that
// channel, this will enable us to orchestrate the flow of etcd requests in the test. // channel, this will enable us to orchestrate the flow of etcd requests in the test.

View File

@ -18,6 +18,8 @@ package registrytest
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
func NewServiceRegistry() *ServiceRegistry { func NewServiceRegistry() *ServiceRegistry {
@ -60,7 +62,19 @@ func (r *ServiceRegistry) UpdateService(svc api.Service) error {
return r.Err return r.Err
} }
func (r *ServiceRegistry) WatchServices(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err
}
func (r *ServiceRegistry) GetEndpoints(name string) (*api.Endpoints, error) {
return &r.Endpoints, r.Err
}
func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error { func (r *ServiceRegistry) UpdateEndpoints(e api.Endpoints) error {
r.Endpoints = e r.Endpoints = e
return r.Err return r.Err
} }
func (r *ServiceRegistry) WatchEndpoints(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return nil, r.Err
}

View File

@ -18,6 +18,9 @@ package service
import ( import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/endpoint"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// Registry is an interface for things that know how to store services. // Registry is an interface for things that know how to store services.
@ -27,5 +30,9 @@ type Registry interface {
GetService(name string) (*api.Service, error) GetService(name string) (*api.Service, error)
DeleteService(name string) error DeleteService(name string) error
UpdateService(svc api.Service) error UpdateService(svc api.Service) error
UpdateEndpoints(e api.Endpoints) error WatchServices(labels, fields labels.Selector, resourceVersion uint64) (watch.Interface, error)
// TODO: endpoints and their implementation should be separated, setting endpoints should be
// supported via the API, and the endpoints-controller should use the API to update endpoints.
endpoint.Registry
} }

View File

@ -27,6 +27,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion" "github.com/GoogleCloudPlatform/kubernetes/pkg/registry/minion"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// RegistryStorage adapts a service registry into apiserver's RESTStorage model. // RegistryStorage adapts a service registry into apiserver's RESTStorage model.
@ -123,6 +124,12 @@ func (rs *RegistryStorage) List(selector labels.Selector) (interface{}, error) {
return list, err return list, err
} }
// Watch returns Services events via a watch.Interface.
// It implements apiserver.ResourceWatcher.
func (rs *RegistryStorage) Watch(label, field labels.Selector, resourceVersion uint64) (watch.Interface, error) {
return rs.registry.WatchServices(label, field, resourceVersion)
}
func (rs RegistryStorage) New() interface{} { func (rs RegistryStorage) New() interface{} {
return &api.Service{} return &api.Service{}
} }

View File

@ -241,6 +241,122 @@ func TestWatch(t *testing.T) {
} }
} }
func TestWatchEtcdState(t *testing.T) {
type T struct {
Type watch.EventType
Endpoints []string
}
testCases := map[string]struct {
Initial map[string]EtcdResponseWithError
Responses []*etcd.Response
From uint64
Expected []*T
}{
"from not found": {
Initial: map[string]EtcdResponseWithError{},
Responses: []*etcd.Response{
{
Action: "create",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
},
},
},
From: 1,
Expected: []*T{
{watch.Added, nil},
},
},
"from version 1": {
Responses: []*etcd.Response{
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
From: 1,
Expected: []*T{
{watch.Modified, []string{"127.0.0.1:9000"}},
},
},
"from initial state": {
Initial: map[string]EtcdResponseWithError{
"/somekey/foo": {
R: &etcd.Response{
Action: "get",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
EtcdIndex: 1,
},
},
},
Responses: []*etcd.Response{
nil,
{
Action: "compareAndSwap",
Node: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{"127.0.0.1:9000"}})),
CreatedIndex: 1,
ModifiedIndex: 2,
},
PrevNode: &etcd.Node{
Value: string(api.EncodeOrDie(&api.Endpoints{JSONBase: api.JSONBase{ID: "foo"}, Endpoints: []string{}})),
CreatedIndex: 1,
ModifiedIndex: 1,
},
},
},
Expected: []*T{
{watch.Added, nil},
{watch.Modified, []string{"127.0.0.1:9000"}},
},
},
}
for k, testCase := range testCases {
fakeClient := NewFakeEtcdClient(t)
for key, value := range testCase.Initial {
fakeClient.Data[key] = value
}
h := EtcdHelper{fakeClient, codec, versioner}
watching, err := h.Watch("/somekey/foo", testCase.From)
if err != nil {
t.Errorf("%s: unexpected error: %v", k, err)
continue
}
fakeClient.WaitForWatchCompletion()
t.Logf("Testing %v", k)
for i := range testCase.Responses {
if testCase.Responses[i] != nil {
fakeClient.WatchResponse <- testCase.Responses[i]
}
event := <-watching.ResultChan()
if e, a := testCase.Expected[i].Type, event.Type; e != a {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
if e, a := testCase.Expected[i].Endpoints, event.Object.(*api.Endpoints).Endpoints; !reflect.DeepEqual(e, a) {
t.Errorf("%s: expected type %v, got %v", k, e, a)
break
}
}
watching.Stop()
}
}
func TestWatchFromZeroIndex(t *testing.T) { func TestWatchFromZeroIndex(t *testing.T) {
pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} pod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}