From 8a17ea4894b6d921e4d57065782014245274744d Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 16:25:04 -0700 Subject: [PATCH 1/4] Add Delete method to endpoints client. --- pkg/client/endpoints.go | 6 ++++++ pkg/client/testclient/fake_endpoints.go | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/pkg/client/endpoints.go b/pkg/client/endpoints.go index a3684468fb..ce634a3cb2 100644 --- a/pkg/client/endpoints.go +++ b/pkg/client/endpoints.go @@ -35,6 +35,7 @@ type EndpointsInterface interface { Create(endpoints *api.Endpoints) (*api.Endpoints, error) List(selector labels.Selector) (*api.EndpointsList, error) Get(name string) (*api.Endpoints, error) + Delete(name string) error Update(endpoints *api.Endpoints) (*api.Endpoints, error) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) } @@ -76,6 +77,11 @@ func (c *endpoints) Get(name string) (result *api.Endpoints, err error) { return } +// Delete takes the name of the endpoint, and returns an error if one occurs +func (c *endpoints) Delete(name string) error { + return c.r.Delete().Namespace(c.ns).Resource("endpoints").Name(name).Do().Error() +} + // Watch returns a watch.Interface that watches the requested endpoints for a service. func (c *endpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { return c.r.Get(). diff --git a/pkg/client/testclient/fake_endpoints.go b/pkg/client/testclient/fake_endpoints.go index 617b9f757b..891dc9664a 100644 --- a/pkg/client/testclient/fake_endpoints.go +++ b/pkg/client/testclient/fake_endpoints.go @@ -45,6 +45,11 @@ func (c *FakeEndpoints) Get(name string) (*api.Endpoints, error) { return obj.(*api.Endpoints), err } +func (c *FakeEndpoints) Delete(name string) error { + _, err := c.Fake.Invokes(FakeAction{Action: "delete-endpoints", Value: name}, &api.Endpoints{}) + return err +} + func (c *FakeEndpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) return c.Fake.Watch, c.Fake.Err From da5111891cc980b14620265c8455a166c2f1cc5e Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 16:25:54 -0700 Subject: [PATCH 2/4] Fix selector handling in listers.go. --- pkg/client/cache/listers.go | 42 +++++++++++++++++++++++++++++--- pkg/client/cache/listers_test.go | 31 +++++++++++++++++++++++ 2 files changed, 70 insertions(+), 3 deletions(-) diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index 4e1dd78272..0afd64b866 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -39,10 +39,16 @@ type StoreToPodLister struct { Store } -// TODO Get rid of the selector because that is confusing because the user might not realize that there has already been -// some selection at the caching stage. Also, consistency will facilitate code generation. However, the pkg/client -// is inconsistent too. +// Please note that selector is filtering among the pods that have gotten into +// the store; there may have been some filtering that already happened before +// that. +// +// TODO: converge on the interface in pkg/client. func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { + // TODO: it'd be great to just call + // s.Pods(api.NamespaceAll).List(selector), however then we'd have to + // remake the list.Items as a []*api.Pod. So leave this separate for + // now. for _, m := range s.Store.List() { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { @@ -52,6 +58,32 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err return pods, nil } +// Pods is taking baby steps to be more like the api in pkg/client +func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer { + return storePodsNamespacer{s.Store, namespace} +} + +type storePodsNamespacer struct { + store Store + namespace string +} + +// Please note that selector is filtering among the pods that have gotten into +// the store; there may have been some filtering that already happened before +// that. +func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) { + list := api.PodList{} + for _, m := range s.store.List() { + pod := m.(*api.Pod) + if s.namespace == api.NamespaceAll || s.namespace == pod.Namespace { + if selector.Matches(labels.Set(pod.Labels)) { + list.Items = append(list.Items, *pod) + } + } + } + return list, nil +} + // Exists returns true if a pod matching the namespace/name of the given pod exists in the store. func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { _, exists, err := s.Store.Get(pod) @@ -116,6 +148,10 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv if service.Namespace != pod.Namespace { continue } + if service.Spec.Selector == nil { + // services with nil selectors match nothing, not everything. + continue + } selector = labels.Set(service.Spec.Selector).AsSelector() if selector.Matches(labels.Set(pod.Labels)) { services = append(services, service) diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index 3f759c3c71..5e57208559 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -90,3 +90,34 @@ func TestStoreToPodLister(t *testing.T) { t.Errorf("Unexpected pod exists") } } + +func TestStoreToServiceLister(t *testing.T) { + store := NewStore(MetaNamespaceKeyFunc) + store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo"}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + }, + }) + store.Add(&api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}}) + ssl := StoreToServiceLister{store} + + pod := &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foopod", + Labels: map[string]string{"role": "foo"}, + }, + } + + services, err := ssl.GetPodServices(pod) + if err != nil { + t.Errorf("Unexpected error: %v", err) + } + + if len(services) != 1 { + t.Fatalf("Expected 1 service, got %v", len(services)) + } + if e, a := "foo", services[0].Name; e != a { + t.Errorf("Expected service %q, got %q", e, a) + } +} From 24a8cceb5cf31827640c691c64cddfd46a73b057 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 14:45:57 -0700 Subject: [PATCH 3/4] add work queue; test coverage 100% --- pkg/util/workqueue/doc.go | 26 +++++++ pkg/util/workqueue/queue.go | 128 +++++++++++++++++++++++++++++++ pkg/util/workqueue/queue_test.go | 115 +++++++++++++++++++++++++++ 3 files changed, 269 insertions(+) create mode 100644 pkg/util/workqueue/doc.go create mode 100644 pkg/util/workqueue/queue.go create mode 100644 pkg/util/workqueue/queue_test.go diff --git a/pkg/util/workqueue/doc.go b/pkg/util/workqueue/doc.go new file mode 100644 index 0000000000..53000cfba7 --- /dev/null +++ b/pkg/util/workqueue/doc.go @@ -0,0 +1,26 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package workqueue provides a simple queue that supports the following +// features: +// * Fair: items processed in the order in which they are added. +// * Stingy: a single item will not be processed multiple times concurrently, +// and if an item is added multiple times before it can be processed, it +// will only be processed once. +// * Multiple consumers and producers. In particular, it is allowed for an +// item to be reenqueued while it is being processed. +// * Shutdown notifications. +package workqueue diff --git a/pkg/util/workqueue/queue.go b/pkg/util/workqueue/queue.go new file mode 100644 index 0000000000..794e4488f4 --- /dev/null +++ b/pkg/util/workqueue/queue.go @@ -0,0 +1,128 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue + +import ( + "sync" +) + +// New constructs a new workqueue (see the package comment). +func New() *Type { + return &Type{ + dirty: set{}, + processing: set{}, + cond: sync.NewCond(&sync.Mutex{}), + } +} + +// Type is a work queue (see the package comment). +type Type struct { + // queue defines the order in which we will work on items. Every + // element of queue should be in the dirty set and not in the + // processing set. + queue []t + + // dirty defines all of the items that need to be processed. + dirty set + + // Things that are currently being processed are in the processing set. + // These things may be simultaneously in the dirty set. When we finish + // processing something and remove it from this set, we'll check if + // it's in the dirty set, and if so, add it to the queue. + processing set + + cond *sync.Cond + + shuttingDown bool +} + +type empty struct{} +type t interface{} +type set map[t]empty + +func (s set) has(item t) bool { + _, exists := s[item] + return exists +} + +func (s set) insert(item t) { + s[item] = empty{} +} + +func (s set) delete(item t) { + delete(s, item) +} + +// Add marks item as needing processing. +func (q *Type) Add(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + if q.shuttingDown { + return + } + if q.dirty.has(item) { + return + } + q.dirty.insert(item) + if q.processing.has(item) { + return + } + q.queue = append(q.queue, item) + q.cond.Signal() +} + +// Get blocks until it can return an item to be processed. If shutdown = true, +// the caller should end their goroutine. You must call Done with item when you +// have finished processing it. +func (q *Type) Get() (item interface{}, shutdown bool) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + for len(q.queue) == 0 && !q.shuttingDown { + q.cond.Wait() + } + if len(q.queue) == 0 { + // We must be shutting down. + return nil, true + } + item, q.queue = q.queue[0], q.queue[1:] + q.processing.insert(item) + q.dirty.delete(item) + return item, false +} + +// Done marks item as done processing, and if it has been marked as dirty again +// while it was being processed, it will be re-added to the queue for +// re-processing. +func (q *Type) Done(item interface{}) { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.processing.delete(item) + if q.dirty.has(item) { + q.queue = append(q.queue, item) + q.cond.Signal() + } +} + +// Shutdown will cause q to ignore all new items added to it. As soon as the +// worker goroutines have drained the existing items in the queue, they will be +// instructed to exit. +func (q *Type) ShutDown() { + q.cond.L.Lock() + defer q.cond.L.Unlock() + q.shuttingDown = true + q.cond.Broadcast() +} diff --git a/pkg/util/workqueue/queue_test.go b/pkg/util/workqueue/queue_test.go new file mode 100644 index 0000000000..f040764a1a --- /dev/null +++ b/pkg/util/workqueue/queue_test.go @@ -0,0 +1,115 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package workqueue_test + +import ( + "sync" + "testing" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" +) + +func TestBasic(t *testing.T) { + // If something is seriously wrong this test will never complete. + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + for j := 0; j < 50; j++ { + q.Add(i) + time.Sleep(time.Millisecond) + } + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + for { + item, quit := q.Get() + if item == "added after shutdown!" { + t.Errorf("Got an item added after shutdown.") + } + if quit { + return + } + t.Logf("Worker %v: begin processing %v", i, item) + time.Sleep(3 * time.Millisecond) + t.Logf("Worker %v: done processing %v", i, item) + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + q.Add("added after shutdown!") + consumerWG.Wait() +} + +func TestAddWhileProcessing(t *testing.T) { + q := workqueue.New() + + // Start producers + const producers = 50 + producerWG := sync.WaitGroup{} + producerWG.Add(producers) + for i := 0; i < producers; i++ { + go func(i int) { + defer producerWG.Done() + q.Add(i) + }(i) + } + + // Start consumers + const consumers = 10 + consumerWG := sync.WaitGroup{} + consumerWG.Add(consumers) + for i := 0; i < consumers; i++ { + go func(i int) { + defer consumerWG.Done() + // Every worker will re-add every item up to two times. + // This tests the dirty-while-processing case. + counters := map[interface{}]int{} + for { + item, quit := q.Get() + if quit { + return + } + counters[item]++ + if counters[item] < 2 { + q.Add(item) + } + q.Done(item) + } + }(i) + } + + producerWG.Wait() + q.ShutDown() + consumerWG.Wait() +} From a2953fdc7ef5a9705f9e595a8c332eab055af4a2 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 16 Apr 2015 16:18:02 -0700 Subject: [PATCH 4/4] Make endpoint controller use framework --- cmd/integration/integration.go | 4 +- .../app/controllermanager.go | 5 +- cmd/kubernetes/kubernetes.go | 2 +- pkg/client/cache/store.go | 24 + pkg/service/endpoints_controller.go | 418 +++++++++++++----- pkg/service/endpoints_controller_test.go | 403 +++++++---------- pkg/util/set.go | 18 + 7 files changed, 516 insertions(+), 358 deletions(-) diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index c5976cfde9..676deee188 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -208,7 +208,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st endpoints := service.NewEndpointController(cl) // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*4) + go endpoints.Run(3, util.NeverStop) controllerManager := replicationControllerPkg.NewReplicationManager(cl) @@ -285,7 +285,7 @@ func endpointsSet(c *client.Client, serviceNamespace, serviceID string, endpoint return func() (bool, error) { endpoints, err := c.Endpoints(serviceNamespace).Get(serviceID) if err != nil { - glog.Infof("Error on creating endpoints: %v", err) + glog.Infof("Error getting endpoints: %v", err) return false, nil } count := 0 diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 060c26239c..8423f761a2 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -50,6 +50,7 @@ type CMServer struct { ClientConfig client.Config CloudProvider string CloudConfigFile string + ConcurrentEndpointSyncs int MinionRegexp string NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -79,6 +80,7 @@ func NewCMServer() *CMServer { s := CMServer{ Port: ports.ControllerManagerPort, Address: util.IP(net.ParseIP("127.0.0.1")), + ConcurrentEndpointSyncs: 5, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second, NamespaceSyncPeriod: 5 * time.Minute, @@ -101,6 +103,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { client.BindClientConfigFlags(fs, &s.ClientConfig) fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") + fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent_endpoint_syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ @@ -171,7 +174,7 @@ func (s *CMServer) Run(_ []string) error { }() endpoints := service.NewEndpointController(kubeClient) - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod) diff --git a/cmd/kubernetes/kubernetes.go b/cmd/kubernetes/kubernetes.go index 8ec247d9b3..65cd939b8c 100644 --- a/cmd/kubernetes/kubernetes.go +++ b/cmd/kubernetes/kubernetes.go @@ -139,7 +139,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU, } endpoints := service.NewEndpointController(cl) - go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) + go endpoints.Run(5, util.NeverStop) controllerManager := controller.NewReplicationManager(cl) controllerManager.Run(controller.DefaultSyncPeriod) diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index b6031d43ca..40aceec467 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -18,6 +18,8 @@ package cache import ( "fmt" + "strings" + "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" ) @@ -67,6 +69,9 @@ type ExplicitKey string // keys for API objects which implement meta.Interface. // The key uses the format / unless is empty, then // it's just . +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. func MetaNamespaceKeyFunc(obj interface{}) (string, error) { if key, ok := obj.(ExplicitKey); ok { return string(key), nil @@ -81,6 +86,25 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) { return meta.Name(), nil } +// SplitMetaNamespaceKey returns the namespace and name that +// MetaNamespaceKeyFunc encoded into key. +// +// TODO: replace key-as-string with a key-as-struct so that this +// packing/unpacking won't be necessary. +func SplitMetaNamespaceKey(key string) (namespace, name string, err error) { + parts := strings.Split(key, "/") + switch len(parts) { + case 1: + // name only, no namespace + return "", parts[0], nil + case 2: + // name and namespace + return parts[0], parts[1], nil + } + + return "", "", fmt.Errorf("unexpected key format: %q", key) +} + // cache responsibilities are limited to: // 1. Computing keys for objects via keyFunc // 2. Invoking methods of a ThreadSafeStorage interface diff --git a/pkg/service/endpoints_controller.go b/pkg/service/endpoints_controller.go index 12f1255798..1be624dc97 100644 --- a/pkg/service/endpoints_controller.go +++ b/pkg/service/endpoints_controller.go @@ -19,6 +19,7 @@ package service import ( "fmt" "reflect" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" @@ -26,135 +27,354 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" ) -// EndpointController manages selector-based service endpoints. -type EndpointController struct { - client *client.Client -} +const ( + // We'll attempt to recompute EVERY service's endpoints at least this + // often. Higher numbers = lower CPU/network load; lower numbers = + // shorter amount of time before a mistaken endpoint is corrected. + FullServiceResyncPeriod = 30 * time.Second + + // We'll keep pod watches open up to this long. In the unlikely case + // that a watch misdelivers info about a pod, it'll take this long for + // that mistake to be rectified. + PodRelistPeriod = 5 * time.Minute +) + +var ( + keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc +) // NewEndpointController returns a new *EndpointController. func NewEndpointController(client *client.Client) *EndpointController { - return &EndpointController{ + e := &EndpointController{ client: client, + queue: workqueue.New(), + } + + e.serviceStore.Store, e.serviceController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Services(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Service{}, + FullServiceResyncPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.enqueueService, + UpdateFunc: func(old, cur interface{}) { + e.enqueueService(cur) + }, + DeleteFunc: e.enqueueService, + }, + ) + + e.podStore.Store, e.podController = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return e.client.Pods(api.NamespaceAll).List(labels.Everything()) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv) + }, + }, + &api.Pod{}, + PodRelistPeriod, + framework.ResourceEventHandlerFuncs{ + AddFunc: e.addPod, + UpdateFunc: e.updatePod, + DeleteFunc: e.deletePod, + }, + ) + + return e +} + +// EndpointController manages selector-based service endpoints. +type EndpointController struct { + client *client.Client + + serviceStore cache.StoreToServiceLister + podStore cache.StoreToPodLister + + // Services that need to be updated. A channel is inappropriate here, + // because it allows services with lots of pods to be serviced much + // more often than services with few pods; it also would cause a + // service that's inserted multiple times to be processed more than + // necessary. + queue *workqueue.Type + + // Since we join two objects, we'll watch both of them with + // controllers. + serviceController *framework.Controller + podController *framework.Controller +} + +// Runs e; will not return until stopCh is closed. workers determines how many +// endpoints will be handled in parallel. +func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { + defer util.HandleCrash() + go e.serviceController.Run(stopCh) + go e.podController.Run(stopCh) + for i := 0; i < workers; i++ { + go util.Until(e.worker, time.Second, stopCh) + } + <-stopCh + e.queue.ShutDown() +} + +func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) { + set := util.StringSet{} + services, err := e.serviceStore.GetPodServices(pod) + if err != nil { + // don't log this error because this function makes pointless + // errors when no services match. + return set, nil + } + for i := range services { + key, err := keyFunc(&services[i]) + if err != nil { + return nil, err + } + set.Insert(key) + } + return set, nil +} + +// When a pod is added, figure out what services it will be a member of and +// enqueue them. obj must have *api.Pod type. +func (e *EndpointController) addPod(obj interface{}) { + pod := obj.(*api.Pod) + services, err := e.getPodServiceMemberships(pod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err) + return + } + for key := range services { + e.queue.Add(key) } } -// SyncServiceEndpoints syncs endpoints for services with selectors. -func (e *EndpointController) SyncServiceEndpoints() error { - services, err := e.client.Services(api.NamespaceAll).List(labels.Everything()) - if err != nil { - glog.Errorf("Failed to list services: %v", err) - return err +// When a pod is updated, figure out what services it used to be a member of +// and what services it will be a member of, and enqueue the union of these. +// old and cur must be *api.Pod types. +func (e *EndpointController) updatePod(old, cur interface{}) { + if api.Semantic.DeepEqual(old, cur) { + return + } + newPod := old.(*api.Pod) + services, err := e.getPodServiceMemberships(newPod) + if err != nil { + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err) + return } - var resultErr error - for i := range services.Items { - service := &services.Items[i] - if service.Spec.Selector == nil { - // services without a selector receive no endpoints from this controller; - // these services will receive the endpoints that are created out-of-band via the REST API. - continue - } - - glog.V(5).Infof("About to update endpoints for service %s/%s", service.Namespace, service.Name) - pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector()) + oldPod := cur.(*api.Pod) + // Only need to get the old services if the labels changed. + if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) { + oldServices, err := e.getPodServiceMemberships(oldPod) if err != nil { - glog.Errorf("Error syncing service: %s/%s, skipping", service.Namespace, service.Name) - resultErr = err - continue + glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err) + return } + services = services.Union(oldServices) + } + for key := range services { + e.queue.Add(key) + } +} - subsets := []api.EndpointSubset{} - for i := range pods.Items { - pod := &pods.Items[i] +// When a pod is deleted, enqueue the services the pod used to be a member of. +// obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item. +func (e *EndpointController) deletePod(obj interface{}) { + if _, ok := obj.(*api.Pod); ok { + // Enqueue all the services that the pod used to be a member + // of. This happens to be exactly the same thing we do when a + // pod is added. + e.addPod(obj) + return + } + podKey, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } + glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod) - for i := range service.Spec.Ports { - servicePort := &service.Spec.Ports[i] + // TODO: keep a map of pods to services to handle this condition. +} - // TODO: Once v1beta1 and v1beta2 are EOL'ed, - // this can safely assume that TargetPort is - // populated, and findPort() can be removed. - _ = v1beta1.Dependency - _ = v1beta2.Dependency +// obj could be an *api.Service, or a DeletionFinalStateUnknown marker item. +func (e *EndpointController) enqueueService(obj interface{}) { + key, err := keyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + } - portName := servicePort.Name - portProto := servicePort.Protocol - portNum, err := findPort(pod, servicePort) - if err != nil { - glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) - continue - } - if len(pod.Status.PodIP) == 0 { - glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) - continue - } + e.queue.Add(key) +} - inService := false - for _, c := range pod.Status.Conditions { - if c.Type == api.PodReady && c.Status == api.ConditionTrue { - inService = true - break - } - } - if !inService { - glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) - continue - } - - epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} - epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ - Kind: "Pod", - Namespace: pod.ObjectMeta.Namespace, - Name: pod.ObjectMeta.Name, - UID: pod.ObjectMeta.UID, - ResourceVersion: pod.ObjectMeta.ResourceVersion, - }} - subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) +// worker runs a worker thread that just dequeues items, processes them, and +// marks them done. You may run as many of these in parallel as you wish; the +// workqueue guarantees that they will not end up processing the same service +// at the same time. +func (e *EndpointController) worker() { + for { + func() { + key, quit := e.queue.Get() + if quit { + return } - } - subsets = endpoints.RepackSubsets(subsets) + // Use defer: in the unlikely event that there's a + // panic, we'd still like this to get marked done-- + // otherwise the controller will not be able to sync + // this service again until it is restarted. + defer e.queue.Done(key) + e.syncService(key.(string)) + }() + } +} - // See if there's actually an update here. - currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) +func (e *EndpointController) syncService(key string) { + startTime := time.Now() + defer func() { + glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) + }() + obj, exists, err := e.serviceStore.Store.GetByKey(key) + if err != nil || !exists { + // Delete the corresponding endpoint, as the service has been deleted. + // TODO: Please note that this will delete an endpoint when a + // service is deleted. However, if we're down at the time when + // the service is deleted, we will miss that deletion, so this + // doesn't completely solve the problem. See #6877. + namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { - if errors.IsNotFound(err) { - currentEndpoints = &api.Endpoints{ - ObjectMeta: api.ObjectMeta{ - Name: service.Name, - Labels: service.Labels, - }, - } - } else { - glog.Errorf("Error getting endpoints: %v", err) + glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err) + // Don't retry, as the key isn't going to magically become understandable. + return + } + err = e.client.Endpoints(namespace).Delete(name) + if err != nil && !errors.IsNotFound(err) { + glog.Errorf("Error deleting endpoint %q: %v", key, err) + e.queue.Add(key) // Retry + } + return + } + + service := obj.(*api.Service) + if service.Spec.Selector == nil { + // services without a selector receive no endpoints from this controller; + // these services will receive the endpoints that are created out-of-band via the REST API. + return + } + + glog.V(5).Infof("About to update endpoints for service %q", key) + pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector()) + if err != nil { + // Since we're getting stuff from a local cache, it is + // basically impossible to get this error. + glog.Errorf("Error syncing service %q: %v", key, err) + e.queue.Add(key) // Retry + return + } + + subsets := []api.EndpointSubset{} + for i := range pods.Items { + pod := &pods.Items[i] + + for i := range service.Spec.Ports { + servicePort := &service.Spec.Ports[i] + + // TODO: Once v1beta1 and v1beta2 are EOL'ed, + // this can safely assume that TargetPort is + // populated, and findPort() can be removed. + _ = v1beta1.Dependency + _ = v1beta2.Dependency + + portName := servicePort.Name + portProto := servicePort.Protocol + portNum, err := findPort(pod, servicePort) + if err != nil { + glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err) + continue + } + if len(pod.Status.PodIP) == 0 { + glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name) continue } - } - if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { - glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) - continue - } - newEndpoints := currentEndpoints - newEndpoints.Subsets = subsets - newEndpoints.Labels = service.Labels - if len(currentEndpoints.ResourceVersion) == 0 { - // No previous endpoints, create them - _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) - } else { - // Pre-existing - _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) - } - if err != nil { - glog.Errorf("Error updating endpoints: %v", err) - continue + inService := false + for _, c := range pod.Status.Conditions { + if c.Type == api.PodReady && c.Status == api.ConditionTrue { + inService = true + break + } + } + if !inService { + glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) + continue + } + + epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto} + epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{ + Kind: "Pod", + Namespace: pod.ObjectMeta.Namespace, + Name: pod.ObjectMeta.Name, + UID: pod.ObjectMeta.UID, + ResourceVersion: pod.ObjectMeta.ResourceVersion, + }} + subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}}) } } - return resultErr + subsets = endpoints.RepackSubsets(subsets) + + // See if there's actually an update here. + currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) + if err != nil { + if errors.IsNotFound(err) { + currentEndpoints = &api.Endpoints{ + ObjectMeta: api.ObjectMeta{ + Name: service.Name, + Labels: service.Labels, + }, + } + } else { + glog.Errorf("Error getting endpoints: %v", err) + e.queue.Add(key) // Retry + return + } + } + if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) { + glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name) + return + } + newEndpoints := currentEndpoints + newEndpoints.Subsets = subsets + newEndpoints.Labels = service.Labels + + if len(currentEndpoints.ResourceVersion) == 0 { + // No previous endpoints, create them + _, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) + } else { + // Pre-existing + _, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) + } + if err != nil { + glog.Errorf("Error updating endpoints: %v", err) + e.queue.Add(key) // Retry + } } func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int { diff --git a/pkg/service/endpoints_controller_test.go b/pkg/service/endpoints_controller_test.go index 0794b1b56e..ade1ce34c7 100644 --- a/pkg/service/endpoints_controller_test.go +++ b/pkg/service/endpoints_controller_test.go @@ -27,16 +27,20 @@ import ( _ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -func newPodList(nPods int, nPorts int) *api.PodList { - pods := []api.Pod{} +func addPods(store cache.Store, namespace string, nPods int, nPorts int) { for i := 0; i < nPods; i++ { - p := api.Pod{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, - ObjectMeta: api.ObjectMeta{Name: fmt.Sprintf("pod%d", i)}, + p := &api.Pod{ + TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, + ObjectMeta: api.ObjectMeta{ + Namespace: namespace, + Name: fmt.Sprintf("pod%d", i), + Labels: map[string]string{"foo": "bar"}, + }, Spec: api.PodSpec{ Containers: []api.Container{{Ports: []api.ContainerPort{}}}, }, @@ -54,11 +58,7 @@ func newPodList(nPods int, nPorts int) *api.PodList { p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j}) } - pods = append(pods, p) - } - return &api.PodList{ - TypeMeta: api.TypeMeta{APIVersion: testapi.Version(), Kind: "PodList"}, - Items: pods, + store.Add(p) } } @@ -222,22 +222,12 @@ type serverResponse struct { obj interface{} } -func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { - fakePodHandler := util.FakeHandler{ - StatusCode: podResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Codec(), podResponse.obj.(runtime.Object)), - } - fakeServiceHandler := util.FakeHandler{ - StatusCode: serviceResponse.statusCode, - ResponseBody: runtime.EncodeOrDie(testapi.Codec(), serviceResponse.obj.(runtime.Object)), - } +func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { fakeEndpointsHandler := util.FakeHandler{ StatusCode: endpointsResponse.statusCode, ResponseBody: runtime.EncodeOrDie(testapi.Codec(), endpointsResponse.obj.(runtime.Object)), } mux := http.NewServeMux() - mux.Handle(testapi.ResourcePath("pods", namespace, ""), &fakePodHandler) - mux.Handle(testapi.ResourcePath("services", "", ""), &fakeServiceHandler) mux.Handle(testapi.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) mux.Handle(testapi.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler) mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { @@ -247,47 +237,13 @@ func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse return httptest.NewServer(mux), &fakeEndpointsHandler } -func TestSyncEndpointsEmpty(t *testing.T) { - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &api.ServiceList{}}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } -} - -func TestSyncEndpointsError(t *testing.T) { - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusInternalServerError, &api.ServiceList{}}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err == nil { - t.Errorf("unexpected non-error") - } -} - func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo"}, - Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := api.NamespaceDefault + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -298,30 +254,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsProtocolTCP(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -332,30 +279,24 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsProtocolUDP(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(0, 0)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -366,30 +307,24 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequestCount(t, 0) } func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{}, @@ -397,40 +332,36 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } func TestSyncEndpointsItemsPreexisting(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "bar", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "bar" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ @@ -441,85 +372,83 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } + ns := api.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "1", + Name: "foo", + Namespace: ns, }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1) + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", api.NamespaceDefault, "foo"), "GET", nil) } func TestSyncEndpointsItems(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{ - {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, - {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, - }, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(3, 2)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 3, 2) + addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found! + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{ + {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, + }, + }, + }) + endpoints.syncService("other/foo") expectedSubsets := []api.EndpointSubset{{ Addresses: []api.EndpointAddress{ - {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, - {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, - {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, + {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, + {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []api.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, @@ -534,69 +463,38 @@ func TestSyncEndpointsItems(t *testing.T) { }) // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". endpointsHandler.ValidateRequestCount(t, 2) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) -} - -func TestSyncEndpointsPodError(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, _ := makeTestServer(t, api.NamespaceDefault, - serverResponse{http.StatusInternalServerError, &api.PodList{}}, - serverResponse{http.StatusOK, &serviceList}, - serverResponse{http.StatusOK, &api.Endpoints{}}) - defer testServer.Close() - client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) - endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err == nil { - t.Error("Unexpected non-error") - } + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsWithLabels(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "other", - Labels: map[string]string{ - "foo": "bar", - }, - }, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{ - {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, - {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, - }, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "other", - serverResponse{http.StatusOK, newPodList(3, 2)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "other" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 3, 2) + serviceLabels := map[string]string{"foo": "bar"} + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + Labels: serviceLabels, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{ + {Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}, + {Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)}, + }, + }, + }) + endpoints.syncService(ns + "/foo") expectedSubsets := []api.EndpointSubset{{ Addresses: []api.EndpointAddress{ - {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, - {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, - {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, + {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}, + {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}}, + {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}}, }, Ports: []api.EndpointPort{ {Name: "port0", Port: 8080, Protocol: "TCP"}, @@ -606,39 +504,22 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ ResourceVersion: "", - Labels: serviceList.Items[0].Labels, + Labels: serviceLabels, }, Subsets: endptspkg.SortSubsets(expectedSubsets), }) // endpointsHandler should get 2 requests - one for "GET" and the next for "POST". endpointsHandler.ValidateRequestCount(t, 2) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data) } func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { - serviceList := api.ServiceList{ - Items: []api.Service{ - { - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: "bar", - Labels: map[string]string{ - "baz": "blah", - }, - }, - Spec: api.ServiceSpec{ - Selector: map[string]string{"foo": "bar"}, - Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, - }, - }, - }, - } - testServer, endpointsHandler := makeTestServer(t, "bar", - serverResponse{http.StatusOK, newPodList(1, 1)}, - serverResponse{http.StatusOK, &serviceList}, + ns := "bar" + testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", Labels: map[string]string{ "foo": "bar", @@ -652,19 +533,31 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) endpoints := NewEndpointController(client) - if err := endpoints.SyncServiceEndpoints(); err != nil { - t.Errorf("unexpected error: %v", err) - } + addPods(endpoints.podStore.Store, ns, 1, 1) + serviceLabels := map[string]string{"baz": "blah"} + endpoints.serviceStore.Store.Add(&api.Service{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: ns, + Labels: serviceLabels, + }, + Spec: api.ServiceSpec{ + Selector: map[string]string{"foo": "bar"}, + Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}}, + }, + }) + endpoints.syncService(ns + "/foo") data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ ObjectMeta: api.ObjectMeta{ Name: "foo", + Namespace: ns, ResourceVersion: "1", - Labels: serviceList.Items[0].Labels, + Labels: serviceLabels, }, Subsets: []api.EndpointSubset{{ - Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, + Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, }}, }) - endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) + endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data) } diff --git a/pkg/util/set.go b/pkg/util/set.go index 67e807a0fd..5141cdba10 100644 --- a/pkg/util/set.go +++ b/pkg/util/set.go @@ -91,6 +91,24 @@ func (s StringSet) Difference(s2 StringSet) StringSet { return result } +// Union returns a new set which includes items in either s1 or s2. +// vof objects that are not in s2 +// For example: +// s1 = {1, 2} +// s2 = {3, 4} +// s1.Union(s2) = {1, 2, 3, 4} +// s2.Union(s1) = {1, 2, 3, 4} +func (s1 StringSet) Union(s2 StringSet) StringSet { + result := NewStringSet() + for key := range s1 { + result.Insert(key) + } + for key := range s2 { + result.Insert(key) + } + return result +} + // IsSuperset returns true iff s1 is a superset of s2. func (s1 StringSet) IsSuperset(s2 StringSet) bool { for item := range s2 {