Merge pull request #6943 from lavalamp/fix4

Improve endpoint controller
pull/6/head
Prashanth B 2015-04-17 16:30:31 -07:00
commit f2f40b06aa
14 changed files with 866 additions and 361 deletions

View File

@ -208,7 +208,7 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
endpoints := service.NewEndpointController(cl) endpoints := service.NewEndpointController(cl)
// ensure the service endpoints are sync'd several times within the window that the integration tests wait // ensure the service endpoints are sync'd several times within the window that the integration tests wait
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*4) go endpoints.Run(3, util.NeverStop)
controllerManager := replicationControllerPkg.NewReplicationManager(cl) controllerManager := replicationControllerPkg.NewReplicationManager(cl)
@ -285,7 +285,7 @@ func endpointsSet(c *client.Client, serviceNamespace, serviceID string, endpoint
return func() (bool, error) { return func() (bool, error) {
endpoints, err := c.Endpoints(serviceNamespace).Get(serviceID) endpoints, err := c.Endpoints(serviceNamespace).Get(serviceID)
if err != nil { if err != nil {
glog.Infof("Error on creating endpoints: %v", err) glog.Infof("Error getting endpoints: %v", err)
return false, nil return false, nil
} }
count := 0 count := 0

View File

@ -51,6 +51,7 @@ type CMServer struct {
Address util.IP Address util.IP
CloudProvider string CloudProvider string
CloudConfigFile string CloudConfigFile string
ConcurrentEndpointSyncs int
MinionRegexp string MinionRegexp string
NodeSyncPeriod time.Duration NodeSyncPeriod time.Duration
ResourceQuotaSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration
@ -83,6 +84,7 @@ func NewCMServer() *CMServer {
s := CMServer{ s := CMServer{
Port: ports.ControllerManagerPort, Port: ports.ControllerManagerPort,
Address: util.IP(net.ParseIP("127.0.0.1")), Address: util.IP(net.ParseIP("127.0.0.1")),
ConcurrentEndpointSyncs: 5,
NodeSyncPeriod: 10 * time.Second, NodeSyncPeriod: 10 * time.Second,
ResourceQuotaSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 10 * time.Second,
NamespaceSyncPeriod: 5 * time.Minute, NamespaceSyncPeriod: 5 * time.Minute,
@ -102,6 +104,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)") fs.Var(&s.Address, "address", "The IP address to serve on (set to 0.0.0.0 for all interfaces)")
fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudProvider, "cloud_provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.")
fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.StringVar(&s.CloudConfigFile, "cloud_config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.")
fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent_endpoint_syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load")
fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.") fs.StringVar(&s.MinionRegexp, "minion_regexp", s.MinionRegexp, "If non empty, and --cloud_provider is specified, a regular expression for matching minion VMs.")
fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+ fs.DurationVar(&s.NodeSyncPeriod, "node_sync_period", s.NodeSyncPeriod, ""+
"The period for syncing nodes from cloudprovider. Longer periods will result in "+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+
@ -186,7 +189,7 @@ func (s *CMServer) Run(_ []string) error {
}() }()
endpoints := service.NewEndpointController(kubeClient) endpoints := service.NewEndpointController(kubeClient)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient) controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient)
controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod) controllerManager.Run(replicationControllerPkg.DefaultSyncPeriod)

View File

@ -139,7 +139,7 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
} }
endpoints := service.NewEndpointController(cl) endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10) go endpoints.Run(5, util.NeverStop)
controllerManager := controller.NewReplicationManager(cl) controllerManager := controller.NewReplicationManager(cl)
controllerManager.Run(controller.DefaultSyncPeriod) controllerManager.Run(controller.DefaultSyncPeriod)

View File

@ -39,10 +39,16 @@ type StoreToPodLister struct {
Store Store
} }
// TODO Get rid of the selector because that is confusing because the user might not realize that there has already been // Please note that selector is filtering among the pods that have gotten into
// some selection at the caching stage. Also, consistency will facilitate code generation. However, the pkg/client // the store; there may have been some filtering that already happened before
// is inconsistent too. // that.
//
// TODO: converge on the interface in pkg/client.
func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) { func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err error) {
// TODO: it'd be great to just call
// s.Pods(api.NamespaceAll).List(selector), however then we'd have to
// remake the list.Items as a []*api.Pod. So leave this separate for
// now.
for _, m := range s.Store.List() { for _, m := range s.Store.List() {
pod := m.(*api.Pod) pod := m.(*api.Pod)
if selector.Matches(labels.Set(pod.Labels)) { if selector.Matches(labels.Set(pod.Labels)) {
@ -52,6 +58,32 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []*api.Pod, err
return pods, nil return pods, nil
} }
// Pods is taking baby steps to be more like the api in pkg/client
func (s *StoreToPodLister) Pods(namespace string) storePodsNamespacer {
return storePodsNamespacer{s.Store, namespace}
}
type storePodsNamespacer struct {
store Store
namespace string
}
// Please note that selector is filtering among the pods that have gotten into
// the store; there may have been some filtering that already happened before
// that.
func (s storePodsNamespacer) List(selector labels.Selector) (pods api.PodList, err error) {
list := api.PodList{}
for _, m := range s.store.List() {
pod := m.(*api.Pod)
if s.namespace == api.NamespaceAll || s.namespace == pod.Namespace {
if selector.Matches(labels.Set(pod.Labels)) {
list.Items = append(list.Items, *pod)
}
}
}
return list, nil
}
// Exists returns true if a pod matching the namespace/name of the given pod exists in the store. // Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
_, exists, err := s.Store.Get(pod) _, exists, err := s.Store.Get(pod)
@ -116,6 +148,10 @@ func (s *StoreToServiceLister) GetPodServices(pod *api.Pod) (services []api.Serv
if service.Namespace != pod.Namespace { if service.Namespace != pod.Namespace {
continue continue
} }
if service.Spec.Selector == nil {
// services with nil selectors match nothing, not everything.
continue
}
selector = labels.Set(service.Spec.Selector).AsSelector() selector = labels.Set(service.Spec.Selector).AsSelector()
if selector.Matches(labels.Set(pod.Labels)) { if selector.Matches(labels.Set(pod.Labels)) {
services = append(services, service) services = append(services, service)

View File

@ -90,3 +90,34 @@ func TestStoreToPodLister(t *testing.T) {
t.Errorf("Unexpected pod exists") t.Errorf("Unexpected pod exists")
} }
} }
func TestStoreToServiceLister(t *testing.T) {
store := NewStore(MetaNamespaceKeyFunc)
store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
},
})
store.Add(&api.Service{ObjectMeta: api.ObjectMeta{Name: "bar"}})
ssl := StoreToServiceLister{store}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foopod",
Labels: map[string]string{"role": "foo"},
},
}
services, err := ssl.GetPodServices(pod)
if err != nil {
t.Errorf("Unexpected error: %v", err)
}
if len(services) != 1 {
t.Fatalf("Expected 1 service, got %v", len(services))
}
if e, a := "foo", services[0].Name; e != a {
t.Errorf("Expected service %q, got %q", e, a)
}
}

View File

@ -18,6 +18,8 @@ package cache
import ( import (
"fmt" "fmt"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/meta"
) )
@ -67,6 +69,9 @@ type ExplicitKey string
// keys for API objects which implement meta.Interface. // keys for API objects which implement meta.Interface.
// The key uses the format <namespace>/<name> unless <namespace> is empty, then // The key uses the format <namespace>/<name> unless <namespace> is empty, then
// it's just <name>. // it's just <name>.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
func MetaNamespaceKeyFunc(obj interface{}) (string, error) { func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
if key, ok := obj.(ExplicitKey); ok { if key, ok := obj.(ExplicitKey); ok {
return string(key), nil return string(key), nil
@ -81,6 +86,25 @@ func MetaNamespaceKeyFunc(obj interface{}) (string, error) {
return meta.Name(), nil return meta.Name(), nil
} }
// SplitMetaNamespaceKey returns the namespace and name that
// MetaNamespaceKeyFunc encoded into key.
//
// TODO: replace key-as-string with a key-as-struct so that this
// packing/unpacking won't be necessary.
func SplitMetaNamespaceKey(key string) (namespace, name string, err error) {
parts := strings.Split(key, "/")
switch len(parts) {
case 1:
// name only, no namespace
return "", parts[0], nil
case 2:
// name and namespace
return parts[0], parts[1], nil
}
return "", "", fmt.Errorf("unexpected key format: %q", key)
}
// cache responsibilities are limited to: // cache responsibilities are limited to:
// 1. Computing keys for objects via keyFunc // 1. Computing keys for objects via keyFunc
// 2. Invoking methods of a ThreadSafeStorage interface // 2. Invoking methods of a ThreadSafeStorage interface

View File

@ -35,6 +35,7 @@ type EndpointsInterface interface {
Create(endpoints *api.Endpoints) (*api.Endpoints, error) Create(endpoints *api.Endpoints) (*api.Endpoints, error)
List(selector labels.Selector) (*api.EndpointsList, error) List(selector labels.Selector) (*api.EndpointsList, error)
Get(name string) (*api.Endpoints, error) Get(name string) (*api.Endpoints, error)
Delete(name string) error
Update(endpoints *api.Endpoints) (*api.Endpoints, error) Update(endpoints *api.Endpoints) (*api.Endpoints, error)
Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error)
} }
@ -76,6 +77,11 @@ func (c *endpoints) Get(name string) (result *api.Endpoints, err error) {
return return
} }
// Delete takes the name of the endpoint, and returns an error if one occurs
func (c *endpoints) Delete(name string) error {
return c.r.Delete().Namespace(c.ns).Resource("endpoints").Name(name).Do().Error()
}
// Watch returns a watch.Interface that watches the requested endpoints for a service. // Watch returns a watch.Interface that watches the requested endpoints for a service.
func (c *endpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { func (c *endpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
return c.r.Get(). return c.r.Get().

View File

@ -45,6 +45,11 @@ func (c *FakeEndpoints) Get(name string) (*api.Endpoints, error) {
return obj.(*api.Endpoints), err return obj.(*api.Endpoints), err
} }
func (c *FakeEndpoints) Delete(name string) error {
_, err := c.Fake.Invokes(FakeAction{Action: "delete-endpoints", Value: name}, &api.Endpoints{})
return err
}
func (c *FakeEndpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { func (c *FakeEndpoints) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) {
c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion}) c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-endpoints", Value: resourceVersion})
return c.Fake.Watch, c.Fake.Err return c.Fake.Watch, c.Fake.Err

View File

@ -19,6 +19,7 @@ package service
import ( import (
"fmt" "fmt"
"reflect" "reflect"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/endpoints"
@ -26,135 +27,354 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/controller/framework"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
"github.com/golang/glog" "github.com/golang/glog"
) )
// EndpointController manages selector-based service endpoints. const (
type EndpointController struct { // We'll attempt to recompute EVERY service's endpoints at least this
client *client.Client // often. Higher numbers = lower CPU/network load; lower numbers =
} // shorter amount of time before a mistaken endpoint is corrected.
FullServiceResyncPeriod = 30 * time.Second
// We'll keep pod watches open up to this long. In the unlikely case
// that a watch misdelivers info about a pod, it'll take this long for
// that mistake to be rectified.
PodRelistPeriod = 5 * time.Minute
)
var (
keyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
// NewEndpointController returns a new *EndpointController. // NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *client.Client) *EndpointController { func NewEndpointController(client *client.Client) *EndpointController {
return &EndpointController{ e := &EndpointController{
client: client, client: client,
queue: workqueue.New(),
}
e.serviceStore.Store, e.serviceController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Services(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Services(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Service{},
FullServiceResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
UpdateFunc: func(old, cur interface{}) {
e.enqueueService(cur)
},
DeleteFunc: e.enqueueService,
},
)
e.podStore.Store, e.podController = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return e.client.Pods(api.NamespaceAll).List(labels.Everything())
},
WatchFunc: func(rv string) (watch.Interface, error) {
return e.client.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
},
},
&api.Pod{},
PodRelistPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,
DeleteFunc: e.deletePod,
},
)
return e
}
// EndpointController manages selector-based service endpoints.
type EndpointController struct {
client *client.Client
serviceStore cache.StoreToServiceLister
podStore cache.StoreToPodLister
// Services that need to be updated. A channel is inappropriate here,
// because it allows services with lots of pods to be serviced much
// more often than services with few pods; it also would cause a
// service that's inserted multiple times to be processed more than
// necessary.
queue *workqueue.Type
// Since we join two objects, we'll watch both of them with
// controllers.
serviceController *framework.Controller
podController *framework.Controller
}
// Runs e; will not return until stopCh is closed. workers determines how many
// endpoints will be handled in parallel.
func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) {
defer util.HandleCrash()
go e.serviceController.Run(stopCh)
go e.podController.Run(stopCh)
for i := 0; i < workers; i++ {
go util.Until(e.worker, time.Second, stopCh)
}
<-stopCh
e.queue.ShutDown()
}
func (e *EndpointController) getPodServiceMemberships(pod *api.Pod) (util.StringSet, error) {
set := util.StringSet{}
services, err := e.serviceStore.GetPodServices(pod)
if err != nil {
// don't log this error because this function makes pointless
// errors when no services match.
return set, nil
}
for i := range services {
key, err := keyFunc(&services[i])
if err != nil {
return nil, err
}
set.Insert(key)
}
return set, nil
}
// When a pod is added, figure out what services it will be a member of and
// enqueue them. obj must have *api.Pod type.
func (e *EndpointController) addPod(obj interface{}) {
pod := obj.(*api.Pod)
services, err := e.getPodServiceMemberships(pod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", pod.Namespace, pod.Name, err)
return
}
for key := range services {
e.queue.Add(key)
} }
} }
// SyncServiceEndpoints syncs endpoints for services with selectors. // When a pod is updated, figure out what services it used to be a member of
func (e *EndpointController) SyncServiceEndpoints() error { // and what services it will be a member of, and enqueue the union of these.
services, err := e.client.Services(api.NamespaceAll).List(labels.Everything()) // old and cur must be *api.Pod types.
if err != nil { func (e *EndpointController) updatePod(old, cur interface{}) {
glog.Errorf("Failed to list services: %v", err) if api.Semantic.DeepEqual(old, cur) {
return err return
}
newPod := old.(*api.Pod)
services, err := e.getPodServiceMemberships(newPod)
if err != nil {
glog.Errorf("Unable to get pod %v/%v's service memberships: %v", newPod.Namespace, newPod.Name, err)
return
} }
var resultErr error
for i := range services.Items {
service := &services.Items[i]
if service.Spec.Selector == nil { oldPod := cur.(*api.Pod)
// services without a selector receive no endpoints from this controller; // Only need to get the old services if the labels changed.
// these services will receive the endpoints that are created out-of-band via the REST API. if !reflect.DeepEqual(newPod.Labels, oldPod.Labels) {
continue oldServices, err := e.getPodServiceMemberships(oldPod)
}
glog.V(5).Infof("About to update endpoints for service %s/%s", service.Namespace, service.Name)
pods, err := e.client.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
if err != nil { if err != nil {
glog.Errorf("Error syncing service: %s/%s, skipping", service.Namespace, service.Name) glog.Errorf("Unable to get pod %v/%v's service memberships: %v", oldPod.Namespace, oldPod.Name, err)
resultErr = err return
continue
} }
services = services.Union(oldServices)
}
for key := range services {
e.queue.Add(key)
}
}
subsets := []api.EndpointSubset{} // When a pod is deleted, enqueue the services the pod used to be a member of.
for i := range pods.Items { // obj could be an *api.Pod, or a DeletionFinalStateUnknown marker item.
pod := &pods.Items[i] func (e *EndpointController) deletePod(obj interface{}) {
if _, ok := obj.(*api.Pod); ok {
// Enqueue all the services that the pod used to be a member
// of. This happens to be exactly the same thing we do when a
// pod is added.
e.addPod(obj)
return
}
podKey, err := keyFunc(obj)
if err != nil {
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
glog.Infof("Pod %q was deleted but we don't have a record of its final state, so it will take up to %v before it will be removed from all endpoint records.", podKey, FullServiceResyncPeriod)
for i := range service.Spec.Ports { // TODO: keep a map of pods to services to handle this condition.
servicePort := &service.Spec.Ports[i] }
// TODO: Once v1beta1 and v1beta2 are EOL'ed, // obj could be an *api.Service, or a DeletionFinalStateUnknown marker item.
// this can safely assume that TargetPort is func (e *EndpointController) enqueueService(obj interface{}) {
// populated, and findPort() can be removed. key, err := keyFunc(obj)
_ = v1beta1.Dependency if err != nil {
_ = v1beta2.Dependency glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
}
portName := servicePort.Name e.queue.Add(key)
portProto := servicePort.Protocol }
portNum, err := findPort(pod, servicePort)
if err != nil {
glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if len(pod.Status.PodIP) == 0 {
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue
}
inService := false // worker runs a worker thread that just dequeues items, processes them, and
for _, c := range pod.Status.Conditions { // marks them done. You may run as many of these in parallel as you wish; the
if c.Type == api.PodReady && c.Status == api.ConditionTrue { // workqueue guarantees that they will not end up processing the same service
inService = true // at the same time.
break func (e *EndpointController) worker() {
} for {
} func() {
if !inService { key, quit := e.queue.Get()
glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name) if quit {
continue return
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
} }
} // Use defer: in the unlikely event that there's a
subsets = endpoints.RepackSubsets(subsets) // panic, we'd still like this to get marked done--
// otherwise the controller will not be able to sync
// this service again until it is restarted.
defer e.queue.Done(key)
e.syncService(key.(string))
}()
}
}
// See if there's actually an update here. func (e *EndpointController) syncService(key string) {
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name) startTime := time.Now()
defer func() {
glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime))
}()
obj, exists, err := e.serviceStore.Store.GetByKey(key)
if err != nil || !exists {
// Delete the corresponding endpoint, as the service has been deleted.
// TODO: Please note that this will delete an endpoint when a
// service is deleted. However, if we're down at the time when
// the service is deleted, we will miss that deletion, so this
// doesn't completely solve the problem. See #6877.
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil { if err != nil {
if errors.IsNotFound(err) { glog.Errorf("Need to delete endpoint with key %q, but couldn't understand the key: %v", key, err)
currentEndpoints = &api.Endpoints{ // Don't retry, as the key isn't going to magically become understandable.
ObjectMeta: api.ObjectMeta{ return
Name: service.Name, }
Labels: service.Labels, err = e.client.Endpoints(namespace).Delete(name)
}, if err != nil && !errors.IsNotFound(err) {
} glog.Errorf("Error deleting endpoint %q: %v", key, err)
} else { e.queue.Add(key) // Retry
glog.Errorf("Error getting endpoints: %v", err) }
return
}
service := obj.(*api.Service)
if service.Spec.Selector == nil {
// services without a selector receive no endpoints from this controller;
// these services will receive the endpoints that are created out-of-band via the REST API.
return
}
glog.V(5).Infof("About to update endpoints for service %q", key)
pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelector())
if err != nil {
// Since we're getting stuff from a local cache, it is
// basically impossible to get this error.
glog.Errorf("Error syncing service %q: %v", key, err)
e.queue.Add(key) // Retry
return
}
subsets := []api.EndpointSubset{}
for i := range pods.Items {
pod := &pods.Items[i]
for i := range service.Spec.Ports {
servicePort := &service.Spec.Ports[i]
// TODO: Once v1beta1 and v1beta2 are EOL'ed,
// this can safely assume that TargetPort is
// populated, and findPort() can be removed.
_ = v1beta1.Dependency
_ = v1beta2.Dependency
portName := servicePort.Name
portProto := servicePort.Protocol
portNum, err := findPort(pod, servicePort)
if err != nil {
glog.Errorf("Failed to find port for service %s/%s: %v", service.Namespace, service.Name, err)
continue
}
if len(pod.Status.PodIP) == 0 {
glog.Errorf("Failed to find an IP for pod %s/%s", pod.Namespace, pod.Name)
continue continue
} }
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
continue
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if len(currentEndpoints.ResourceVersion) == 0 { inService := false
// No previous endpoints, create them for _, c := range pod.Status.Conditions {
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints) if c.Type == api.PodReady && c.Status == api.ConditionTrue {
} else { inService = true
// Pre-existing break
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints) }
} }
if err != nil { if !inService {
glog.Errorf("Error updating endpoints: %v", err) glog.V(5).Infof("Pod is out of service: %v/%v", pod.Namespace, pod.Name)
continue continue
}
epp := api.EndpointPort{Name: portName, Port: portNum, Protocol: portProto}
epa := api.EndpointAddress{IP: pod.Status.PodIP, TargetRef: &api.ObjectReference{
Kind: "Pod",
Namespace: pod.ObjectMeta.Namespace,
Name: pod.ObjectMeta.Name,
UID: pod.ObjectMeta.UID,
ResourceVersion: pod.ObjectMeta.ResourceVersion,
}}
subsets = append(subsets, api.EndpointSubset{Addresses: []api.EndpointAddress{epa}, Ports: []api.EndpointPort{epp}})
} }
} }
return resultErr subsets = endpoints.RepackSubsets(subsets)
// See if there's actually an update here.
currentEndpoints, err := e.client.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &api.Endpoints{
ObjectMeta: api.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
glog.Errorf("Error getting endpoints: %v", err)
e.queue.Add(key) // Retry
return
}
}
if reflect.DeepEqual(currentEndpoints.Subsets, subsets) && reflect.DeepEqual(currentEndpoints.Labels, service.Labels) {
glog.V(5).Infof("endpoints are equal for %s/%s, skipping update", service.Namespace, service.Name)
return
}
newEndpoints := currentEndpoints
newEndpoints.Subsets = subsets
newEndpoints.Labels = service.Labels
if len(currentEndpoints.ResourceVersion) == 0 {
// No previous endpoints, create them
_, err = e.client.Endpoints(service.Namespace).Create(newEndpoints)
} else {
// Pre-existing
_, err = e.client.Endpoints(service.Namespace).Update(newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %v", err)
e.queue.Add(key) // Retry
}
} }
func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int { func findDefaultPort(pod *api.Pod, servicePort int, proto api.Protocol) int {

View File

@ -27,16 +27,20 @@ import (
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest" _ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/util"
) )
func newPodList(nPods int, nPorts int) *api.PodList { func addPods(store cache.Store, namespace string, nPods int, nPorts int) {
pods := []api.Pod{}
for i := 0; i < nPods; i++ { for i := 0; i < nPods; i++ {
p := api.Pod{ p := &api.Pod{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()}, TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
ObjectMeta: api.ObjectMeta{Name: fmt.Sprintf("pod%d", i)}, ObjectMeta: api.ObjectMeta{
Namespace: namespace,
Name: fmt.Sprintf("pod%d", i),
Labels: map[string]string{"foo": "bar"},
},
Spec: api.PodSpec{ Spec: api.PodSpec{
Containers: []api.Container{{Ports: []api.ContainerPort{}}}, Containers: []api.Container{{Ports: []api.ContainerPort{}}},
}, },
@ -54,11 +58,7 @@ func newPodList(nPods int, nPorts int) *api.PodList {
p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports, p.Spec.Containers[0].Ports = append(p.Spec.Containers[0].Ports,
api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j}) api.ContainerPort{Name: fmt.Sprintf("port%d", i), ContainerPort: 8080 + j})
} }
pods = append(pods, p) store.Add(p)
}
return &api.PodList{
TypeMeta: api.TypeMeta{APIVersion: testapi.Version(), Kind: "PodList"},
Items: pods,
} }
} }
@ -222,22 +222,12 @@ type serverResponse struct {
obj interface{} obj interface{}
} }
func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) { func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
fakePodHandler := util.FakeHandler{
StatusCode: podResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), podResponse.obj.(runtime.Object)),
}
fakeServiceHandler := util.FakeHandler{
StatusCode: serviceResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), serviceResponse.obj.(runtime.Object)),
}
fakeEndpointsHandler := util.FakeHandler{ fakeEndpointsHandler := util.FakeHandler{
StatusCode: endpointsResponse.statusCode, StatusCode: endpointsResponse.statusCode,
ResponseBody: runtime.EncodeOrDie(testapi.Codec(), endpointsResponse.obj.(runtime.Object)), ResponseBody: runtime.EncodeOrDie(testapi.Codec(), endpointsResponse.obj.(runtime.Object)),
} }
mux := http.NewServeMux() mux := http.NewServeMux()
mux.Handle(testapi.ResourcePath("pods", namespace, ""), &fakePodHandler)
mux.Handle(testapi.ResourcePath("services", "", ""), &fakeServiceHandler)
mux.Handle(testapi.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler) mux.Handle(testapi.ResourcePath("endpoints", namespace, ""), &fakeEndpointsHandler)
mux.Handle(testapi.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler) mux.Handle(testapi.ResourcePath("endpoints/", namespace, ""), &fakeEndpointsHandler)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) { mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
@ -247,47 +237,13 @@ func makeTestServer(t *testing.T, namespace string, podResponse, serviceResponse
return httptest.NewServer(mux), &fakeEndpointsHandler return httptest.NewServer(mux), &fakeEndpointsHandler
} }
func TestSyncEndpointsEmpty(t *testing.T) {
testServer, _ := makeTestServer(t, api.NamespaceDefault,
serverResponse{http.StatusOK, newPodList(0, 0)},
serverResponse{http.StatusOK, &api.ServiceList{}},
serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
}
func TestSyncEndpointsError(t *testing.T) {
testServer, _ := makeTestServer(t, api.NamespaceDefault,
serverResponse{http.StatusOK, newPodList(0, 0)},
serverResponse{http.StatusInternalServerError, &api.ServiceList{}},
serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err == nil {
t.Errorf("unexpected non-error")
}
}
func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
serviceList := api.ServiceList{ ns := api.NamespaceDefault
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo"},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
},
},
}
testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
serverResponse{http.StatusOK, newPodList(0, 0)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
@ -298,30 +254,21 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { endpoints.serviceStore.Store.Add(&api.Service{
t.Errorf("unexpected error: %v", err) ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
} Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 0) endpointsHandler.ValidateRequestCount(t, 0)
} }
func TestSyncEndpointsProtocolTCP(t *testing.T) { func TestSyncEndpointsProtocolTCP(t *testing.T) {
serviceList := api.ServiceList{ ns := "other"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "other",
serverResponse{http.StatusOK, newPodList(0, 0)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
@ -332,30 +279,24 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { endpoints.serviceStore.Store.Add(&api.Service{
t.Errorf("unexpected error: %v", err) ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
} Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80}},
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 0) endpointsHandler.ValidateRequestCount(t, 0)
} }
func TestSyncEndpointsProtocolUDP(t *testing.T) { func TestSyncEndpointsProtocolUDP(t *testing.T) {
serviceList := api.ServiceList{ ns := "other"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "other",
serverResponse{http.StatusOK, newPodList(0, 0)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
@ -366,30 +307,24 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { endpoints.serviceStore.Store.Add(&api.Service{
t.Errorf("unexpected error: %v", err) ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
} Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80}},
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequestCount(t, 0) endpointsHandler.ValidateRequestCount(t, 0)
} }
func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
serviceList := api.ServiceList{ ns := "other"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "other",
serverResponse{http.StatusOK, newPodList(1, 1)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{}, Subsets: []api.EndpointSubset{},
@ -397,40 +332,36 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, ns, 1, 1)
t.Errorf("unexpected error: %v", err) endpoints.serviceStore.Store.Add(&api.Service{
} ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}}, }},
}) })
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", "foo"), "PUT", &data) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
} }
func TestSyncEndpointsItemsPreexisting(t *testing.T) { func TestSyncEndpointsItemsPreexisting(t *testing.T) {
serviceList := api.ServiceList{ ns := "bar"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "bar"},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "bar",
serverResponse{http.StatusOK, newPodList(1, 1)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
@ -441,85 +372,83 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, ns, 1, 1)
t.Errorf("unexpected error: %v", err) endpoints.serviceStore.Store.Add(&api.Service{
} ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}}, }},
}) })
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
} }
func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
serviceList := api.ServiceList{ ns := api.NamespaceDefault
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault, testServer, endpointsHandler := makeTestServer(t, api.NamespaceDefault,
serverResponse{http.StatusOK, newPodList(1, 1)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
ResourceVersion: "1", ResourceVersion: "1",
Name: "foo",
Namespace: ns,
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}}, }},
}}) }})
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1)
t.Errorf("unexpected error: %v", err) endpoints.serviceStore.Store.Add(&api.Service{
} ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", api.NamespaceDefault, "foo"), "GET", nil) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", api.NamespaceDefault, "foo"), "GET", nil)
} }
func TestSyncEndpointsItems(t *testing.T) { func TestSyncEndpointsItems(t *testing.T) {
serviceList := api.ServiceList{ ns := "other"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: "other"},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "other",
serverResponse{http.StatusOK, newPodList(3, 2)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{}}) serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, ns, 3, 2)
t.Errorf("unexpected error: %v", err) addPods(endpoints.podStore.Store, "blah", 5, 2) // make sure these aren't found!
} endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
},
},
})
endpoints.syncService("other/foo")
expectedSubsets := []api.EndpointSubset{{ expectedSubsets := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{ Addresses: []api.EndpointAddress{
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
}, },
Ports: []api.EndpointPort{ Ports: []api.EndpointPort{
{Name: "port0", Port: 8080, Protocol: "TCP"}, {Name: "port0", Port: 8080, Protocol: "TCP"},
@ -534,69 +463,38 @@ func TestSyncEndpointsItems(t *testing.T) {
}) })
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST". // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
endpointsHandler.ValidateRequestCount(t, 2) endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data)
}
func TestSyncEndpointsPodError(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
},
},
}
testServer, _ := makeTestServer(t, api.NamespaceDefault,
serverResponse{http.StatusInternalServerError, &api.PodList{}},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err == nil {
t.Error("Unexpected non-error")
}
} }
func TestSyncEndpointsItemsWithLabels(t *testing.T) { func TestSyncEndpointsItemsWithLabels(t *testing.T) {
serviceList := api.ServiceList{ ns := "other"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "other",
Labels: map[string]string{
"foo": "bar",
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "other",
serverResponse{http.StatusOK, newPodList(3, 2)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{}}) serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, ns, 3, 2)
t.Errorf("unexpected error: %v", err) serviceLabels := map[string]string{"foo": "bar"}
} endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
Labels: serviceLabels,
},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{
{Name: "port0", Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)},
{Name: "port1", Port: 88, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8088)},
},
},
})
endpoints.syncService(ns + "/foo")
expectedSubsets := []api.EndpointSubset{{ expectedSubsets := []api.EndpointSubset{{
Addresses: []api.EndpointAddress{ Addresses: []api.EndpointAddress{
{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}, {IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}},
{IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1"}}, {IP: "1.2.3.5", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod1", Namespace: ns}},
{IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2"}}, {IP: "1.2.3.6", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod2", Namespace: ns}},
}, },
Ports: []api.EndpointPort{ Ports: []api.EndpointPort{
{Name: "port0", Port: 8080, Protocol: "TCP"}, {Name: "port0", Port: 8080, Protocol: "TCP"},
@ -606,39 +504,22 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
ResourceVersion: "", ResourceVersion: "",
Labels: serviceList.Items[0].Labels, Labels: serviceLabels,
}, },
Subsets: endptspkg.SortSubsets(expectedSubsets), Subsets: endptspkg.SortSubsets(expectedSubsets),
}) })
// endpointsHandler should get 2 requests - one for "GET" and the next for "POST". // endpointsHandler should get 2 requests - one for "GET" and the next for "POST".
endpointsHandler.ValidateRequestCount(t, 2) endpointsHandler.ValidateRequestCount(t, 2)
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "other", ""), "POST", &data) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, ""), "POST", &data)
} }
func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
serviceList := api.ServiceList{ ns := "bar"
Items: []api.Service{ testServer, endpointsHandler := makeTestServer(t, ns,
{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: "bar",
Labels: map[string]string{
"baz": "blah",
},
},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
},
},
}
testServer, endpointsHandler := makeTestServer(t, "bar",
serverResponse{http.StatusOK, newPodList(1, 1)},
serverResponse{http.StatusOK, &serviceList},
serverResponse{http.StatusOK, &api.Endpoints{ serverResponse{http.StatusOK, &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
Labels: map[string]string{ Labels: map[string]string{
"foo": "bar", "foo": "bar",
@ -652,19 +533,31 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
defer testServer.Close() defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Version()})
endpoints := NewEndpointController(client) endpoints := NewEndpointController(client)
if err := endpoints.SyncServiceEndpoints(); err != nil { addPods(endpoints.podStore.Store, ns, 1, 1)
t.Errorf("unexpected error: %v", err) serviceLabels := map[string]string{"baz": "blah"}
} endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: ns,
Labels: serviceLabels,
},
Spec: api.ServiceSpec{
Selector: map[string]string{"foo": "bar"},
Ports: []api.ServicePort{{Port: 80, Protocol: "TCP", TargetPort: util.NewIntOrStringFromInt(8080)}},
},
})
endpoints.syncService(ns + "/foo")
data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{ data := runtime.EncodeOrDie(testapi.Codec(), &api.Endpoints{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "foo", Name: "foo",
Namespace: ns,
ResourceVersion: "1", ResourceVersion: "1",
Labels: serviceList.Items[0].Labels, Labels: serviceLabels,
}, },
Subsets: []api.EndpointSubset{{ Subsets: []api.EndpointSubset{{
Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0"}}}, Addresses: []api.EndpointAddress{{IP: "1.2.3.4", TargetRef: &api.ObjectReference{Kind: "Pod", Name: "pod0", Namespace: ns}}},
Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}}, Ports: []api.EndpointPort{{Port: 8080, Protocol: "TCP"}},
}}, }},
}) })
endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", "bar", "foo"), "PUT", &data) endpointsHandler.ValidateRequest(t, testapi.ResourcePathWithQueryParams("endpoints", ns, "foo"), "PUT", &data)
} }

View File

@ -91,6 +91,24 @@ func (s StringSet) Difference(s2 StringSet) StringSet {
return result return result
} }
// Union returns a new set which includes items in either s1 or s2.
// vof objects that are not in s2
// For example:
// s1 = {1, 2}
// s2 = {3, 4}
// s1.Union(s2) = {1, 2, 3, 4}
// s2.Union(s1) = {1, 2, 3, 4}
func (s1 StringSet) Union(s2 StringSet) StringSet {
result := NewStringSet()
for key := range s1 {
result.Insert(key)
}
for key := range s2 {
result.Insert(key)
}
return result
}
// IsSuperset returns true iff s1 is a superset of s2. // IsSuperset returns true iff s1 is a superset of s2.
func (s1 StringSet) IsSuperset(s2 StringSet) bool { func (s1 StringSet) IsSuperset(s2 StringSet) bool {
for item := range s2 { for item := range s2 {

26
pkg/util/workqueue/doc.go Normal file
View File

@ -0,0 +1,26 @@
/*
Copyright 2014 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package workqueue provides a simple queue that supports the following
// features:
// * Fair: items processed in the order in which they are added.
// * Stingy: a single item will not be processed multiple times concurrently,
// and if an item is added multiple times before it can be processed, it
// will only be processed once.
// * Multiple consumers and producers. In particular, it is allowed for an
// item to be reenqueued while it is being processed.
// * Shutdown notifications.
package workqueue

128
pkg/util/workqueue/queue.go Normal file
View File

@ -0,0 +1,128 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"sync"
)
// New constructs a new workqueue (see the package comment).
func New() *Type {
return &Type{
dirty: set{},
processing: set{},
cond: sync.NewCond(&sync.Mutex{}),
}
}
// Type is a work queue (see the package comment).
type Type struct {
// queue defines the order in which we will work on items. Every
// element of queue should be in the dirty set and not in the
// processing set.
queue []t
// dirty defines all of the items that need to be processed.
dirty set
// Things that are currently being processed are in the processing set.
// These things may be simultaneously in the dirty set. When we finish
// processing something and remove it from this set, we'll check if
// it's in the dirty set, and if so, add it to the queue.
processing set
cond *sync.Cond
shuttingDown bool
}
type empty struct{}
type t interface{}
type set map[t]empty
func (s set) has(item t) bool {
_, exists := s[item]
return exists
}
func (s set) insert(item t) {
s[item] = empty{}
}
func (s set) delete(item t) {
delete(s, item)
}
// Add marks item as needing processing.
func (q *Type) Add(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
if q.shuttingDown {
return
}
if q.dirty.has(item) {
return
}
q.dirty.insert(item)
if q.processing.has(item) {
return
}
q.queue = append(q.queue, item)
q.cond.Signal()
}
// Get blocks until it can return an item to be processed. If shutdown = true,
// the caller should end their goroutine. You must call Done with item when you
// have finished processing it.
func (q *Type) Get() (item interface{}, shutdown bool) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
for len(q.queue) == 0 && !q.shuttingDown {
q.cond.Wait()
}
if len(q.queue) == 0 {
// We must be shutting down.
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
q.processing.insert(item)
q.dirty.delete(item)
return item, false
}
// Done marks item as done processing, and if it has been marked as dirty again
// while it was being processed, it will be re-added to the queue for
// re-processing.
func (q *Type) Done(item interface{}) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.processing.delete(item)
if q.dirty.has(item) {
q.queue = append(q.queue, item)
q.cond.Signal()
}
}
// Shutdown will cause q to ignore all new items added to it. As soon as the
// worker goroutines have drained the existing items in the queue, they will be
// instructed to exit.
func (q *Type) ShutDown() {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.shuttingDown = true
q.cond.Broadcast()
}

View File

@ -0,0 +1,115 @@
/*
Copyright 2015 Google Inc. All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue_test
import (
"sync"
"testing"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/workqueue"
)
func TestBasic(t *testing.T) {
// If something is seriously wrong this test will never complete.
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
for j := 0; j < 50; j++ {
q.Add(i)
time.Sleep(time.Millisecond)
}
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
for {
item, quit := q.Get()
if item == "added after shutdown!" {
t.Errorf("Got an item added after shutdown.")
}
if quit {
return
}
t.Logf("Worker %v: begin processing %v", i, item)
time.Sleep(3 * time.Millisecond)
t.Logf("Worker %v: done processing %v", i, item)
q.Done(item)
}
}(i)
}
producerWG.Wait()
q.ShutDown()
q.Add("added after shutdown!")
consumerWG.Wait()
}
func TestAddWhileProcessing(t *testing.T) {
q := workqueue.New()
// Start producers
const producers = 50
producerWG := sync.WaitGroup{}
producerWG.Add(producers)
for i := 0; i < producers; i++ {
go func(i int) {
defer producerWG.Done()
q.Add(i)
}(i)
}
// Start consumers
const consumers = 10
consumerWG := sync.WaitGroup{}
consumerWG.Add(consumers)
for i := 0; i < consumers; i++ {
go func(i int) {
defer consumerWG.Done()
// Every worker will re-add every item up to two times.
// This tests the dirty-while-processing case.
counters := map[interface{}]int{}
for {
item, quit := q.Get()
if quit {
return
}
counters[item]++
if counters[item] < 2 {
q.Add(item)
}
q.Done(item)
}
}(i)
}
producerWG.Wait()
q.ShutDown()
consumerWG.Wait()
}