Switch service controller to shared informers

pull/6/head
Andy Goldstein 2017-02-13 14:28:12 -05:00
parent 5802799e56
commit 726f18524b
5 changed files with 136 additions and 107 deletions

View File

@ -216,11 +216,17 @@ func StartControllers(s *options.CloudControllerManagerServer, kubeconfig *restc
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
// Start the service controller
serviceController, err := servicecontroller.New(cloud, client("service-controller"), s.ClusterName)
serviceController, err := servicecontroller.New(
cloud,
client("service-controller"),
newSharedInformers.Core().V1().Services(),
newSharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
serviceController.Run(int(s.ConcurrentServiceSyncs))
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -450,11 +450,17 @@ func StartControllers(controllers map[string]InitFunc, s *options.CMServer, root
nodeController.Run()
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))
serviceController, err := servicecontroller.New(cloud, clientBuilder.ClientOrDie("service-controller"), s.ClusterName)
serviceController, err := servicecontroller.New(
cloud,
clientBuilder.ClientOrDie("service-controller"),
newSharedInformers.Core().V1().Services(),
newSharedInformers.Core().V1().Nodes(),
s.ClusterName,
)
if err != nil {
glog.Errorf("Failed to start service controller: %v", err)
} else {
serviceController.Run(int(s.ConcurrentServiceSyncs))
go serviceController.Run(stop, int(s.ConcurrentServiceSyncs))
}
time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter))

View File

@ -19,18 +19,16 @@ go_library(
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/legacylisters:go_default_library",
"//pkg/client/informers/informers_generated/externalversions/core/v1:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/cloudprovider:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/util/metrics:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/fields",
"//vendor:k8s.io/apimachinery/pkg/runtime",
"//vendor:k8s.io/apimachinery/pkg/labels",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/apimachinery/pkg/watch",
"//vendor:k8s.io/client-go/kubernetes/typed/core/v1",
"//vendor:k8s.io/client-go/pkg/api/v1",
"//vendor:k8s.io/client-go/tools/cache",
@ -48,7 +46,9 @@ go_test(
"//pkg/api/testapi:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/informers/informers_generated/externalversions:go_default_library",
"//pkg/cloudprovider/providers/fake:go_default_library",
"//pkg/controller:go_default_library",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/client-go/tools/record",

View File

@ -26,12 +26,9 @@ import (
"github.com/golang/glog"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
pkgruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
@ -40,7 +37,8 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/client/legacylisters"
coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions/core/v1"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/util/metrics"
@ -80,28 +78,33 @@ type serviceCache struct {
}
type ServiceController struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
servicesToUpdate []*v1.Service
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone
cache *serviceCache
// A store of services, populated by the serviceController
serviceStore listers.StoreToServiceLister
// Watches changes to all services
serviceController cache.Controller
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister listers.StoreToNodeLister
cloud cloudprovider.Interface
knownHosts []*v1.Node
servicesToUpdate []*v1.Service
kubeClient clientset.Interface
clusterName string
balancer cloudprovider.LoadBalancer
zone cloudprovider.Zone
cache *serviceCache
serviceLister corelisters.ServiceLister
serviceListerSynced cache.InformerSynced
eventBroadcaster record.EventBroadcaster
eventRecorder record.EventRecorder
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
// services that need to be synced
workingQueue workqueue.DelayingInterface
}
// New returns a new service controller to keep cloud provider service resources
// (like load balancers) in sync with the registry.
func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterName string) (*ServiceController, error) {
func New(
cloud cloudprovider.Interface,
kubeClient clientset.Interface,
serviceInformer coreinformers.ServiceInformer,
nodeInformer coreinformers.NodeInformer,
clusterName string,
) (*ServiceController, error) {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.Core().RESTClient()).Events("")})
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "service-controller"})
@ -118,22 +121,12 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: listers.StoreToNodeLister{
Store: cache.NewStore(cache.MetaNamespaceKeyFunc),
},
workingQueue: workqueue.NewDelayingQueue(),
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
workingQueue: workqueue.NewNamedDelayingQueue("service"),
}
s.serviceStore.Indexer, s.serviceController = cache.NewIndexerInformer(
&cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (pkgruntime.Object, error) {
return s.kubeClient.Core().Services(metav1.NamespaceAll).List(options)
},
WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
return s.kubeClient.Core().Services(metav1.NamespaceAll).Watch(options)
},
},
&v1.Service{},
serviceSyncPeriod,
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
cache.ResourceEventHandlerFuncs{
AddFunc: s.enqueueService,
UpdateFunc: func(old, cur interface{}) {
@ -145,8 +138,11 @@ func New(cloud cloudprovider.Interface, kubeClient clientset.Interface, clusterN
},
DeleteFunc: s.enqueueService,
},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
serviceSyncPeriod,
)
s.serviceLister = serviceInformer.Lister()
s.serviceListerSynced = serviceInformer.Informer().HasSynced
if err := s.init(); err != nil {
return nil, err
}
@ -173,15 +169,24 @@ func (s *ServiceController) enqueueService(obj interface{}) {
//
// It's an error to call Run() more than once for a given ServiceController
// object.
func (s *ServiceController) Run(workers int) {
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
defer runtime.HandleCrash()
go s.serviceController.Run(wait.NeverStop)
for i := 0; i < workers; i++ {
go wait.Until(s.worker, time.Second, wait.NeverStop)
defer s.workingQueue.ShutDown()
glog.Info("Starting service controller")
if !cache.WaitForCacheSync(stopCh, s.serviceListerSynced, s.nodeListerSynced) {
runtime.HandleError(fmt.Errorf("timed out waiting for caches to sync"))
}
nodeLW := cache.NewListWatchFromClient(s.kubeClient.Core().RESTClient(), "nodes", metav1.NamespaceAll, fields.Everything())
cache.NewReflector(nodeLW, &v1.Node{}, s.nodeLister.Store, 0).Run()
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, wait.NeverStop)
for i := 0; i < workers; i++ {
go wait.Until(s.worker, time.Second, stopCh)
}
go wait.Until(s.nodeSyncLoop, nodeSyncPeriod, stopCh)
<-stopCh
glog.Info("Stopping service controller")
}
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
@ -258,12 +263,13 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
// Save the state so we can avoid a write if it doesn't change
previousState := v1.LoadBalancerStatusDeepCopy(&service.Status.LoadBalancer)
var newState *v1.LoadBalancerStatus
var err error
if !wantsLoadBalancer(service) {
needDelete := true
@ -284,7 +290,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
service.Status.LoadBalancer = v1.LoadBalancerStatus{}
newState = &v1.LoadBalancerStatus{}
} else {
glog.V(2).Infof("Ensuring LB for service %s", key)
@ -292,7 +298,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
// The load balancer doesn't exist yet, so create it.
s.eventRecorder.Event(service, v1.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
err := s.createLoadBalancer(service)
newState, err = s.createLoadBalancer(service)
if err != nil {
return fmt.Errorf("Failed to create load balancer for service %s: %v", key, err), retryable
}
@ -301,7 +307,17 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
// Write the state if changed
// TODO: Be careful here ... what if there were other changes to the service?
if !v1.LoadBalancerStatusEqual(previousState, &service.Status.LoadBalancer) {
if !v1.LoadBalancerStatusEqual(previousState, newState) {
// Make a copy so we don't mutate the shared informer cache
copy, err := api.Scheme.DeepCopy(service)
if err != nil {
return err, retryable
}
service = copy.(*v1.Service)
// Update the status on the copy
service.Status.LoadBalancer = *newState
if err := s.persistUpdate(service); err != nil {
return fmt.Errorf("Failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
}
@ -340,30 +356,23 @@ func (s *ServiceController) persistUpdate(service *v1.Service) error {
return err
}
func (s *ServiceController) createLoadBalancer(service *v1.Service) error {
nodes, err := s.nodeLister.List()
func (s *ServiceController) createLoadBalancer(service *v1.Service) (*v1.LoadBalancerStatus, error) {
nodes, err := s.nodeLister.List(labels.Everything())
if err != nil {
return err
return nil, err
}
lbNodes := []*v1.Node{}
for ix := range nodes.Items {
if includeNodeFromNodeList(&nodes.Items[ix]) {
lbNodes = append(lbNodes, &nodes.Items[ix])
for ix := range nodes {
if includeNodeFromNodeList(nodes[ix]) {
lbNodes = append(lbNodes, nodes[ix])
}
}
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
status, err := s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
if err != nil {
return err
} else {
service.Status.LoadBalancer = *status
}
return nil
return s.balancer.EnsureLoadBalancer(s.clusterName, service, lbNodes)
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
@ -604,7 +613,7 @@ func includeNodeFromNodeList(node *v1.Node) bool {
return !node.Spec.Unschedulable
}
func getNodeConditionPredicate() listers.NodeConditionPredicate {
func getNodeConditionPredicate() corelisters.NodeConditionPredicate {
return func(node *v1.Node) bool {
// We add the master to the node list, but its unschedulable. So we use this to filter
// the master.
@ -631,7 +640,7 @@ func getNodeConditionPredicate() listers.NodeConditionPredicate {
// nodeSyncLoop handles updating the hosts pointed to by all load
// balancers whenever the set of nodes in the cluster changes.
func (s *ServiceController) nodeSyncLoop() {
newHosts, err := s.nodeLister.NodeCondition(getNodeConditionPredicate()).List()
newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate())
if err != nil {
glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)
return
@ -736,30 +745,26 @@ func (s *ServiceController) syncService(key string) error {
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Now().Sub(startTime))
}()
// obj holds the latest service info from apiserver
obj, exists, err := s.serviceStore.Indexer.GetByKey(key)
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)
return err
}
if !exists {
// service holds the latest service info from apiserver
service, err := s.serviceLister.Services(namespace).Get(name)
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
} else {
service, ok := obj.(*v1.Service)
if ok {
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
} else {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("object contained wasn't a service or a deleted key: %#v", obj)
}
glog.Infof("Found tombstone for %v", key)
err, retryDelay = s.processServiceDeletion(tombstone.Key)
}
case err != nil:
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)
return err
default:
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
}
if retryDelay != 0 {

View File

@ -26,7 +26,9 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/externalversions"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
"k8s.io/kubernetes/pkg/controller"
)
const region = "us-central"
@ -35,6 +37,30 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv
return &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: name, Namespace: "namespace", UID: uid, SelfLink: testapi.Default.SelfLink("services", name)}, Spec: v1.ServiceSpec{Type: serviceType}}
}
func alwaysReady() bool { return true }
func newController() (*ServiceController, *fakecloud.FakeCloud, *fake.Clientset) {
cloud := &fakecloud.FakeCloud{}
cloud.Region = region
client := fake.NewSimpleClientset()
informerFactory := informers.NewSharedInformerFactory(client, controller.NoResyncPeriodFunc())
serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes()
controller, _ := New(cloud, client, serviceInformer, nodeInformer, "test-cluster")
controller.nodeListerSynced = alwaysReady
controller.serviceListerSynced = alwaysReady
controller.eventRecorder = record.NewFakeRecorder(100)
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
client.ClearActions() // ignore any client calls made in init()
return controller, cloud, client
}
func TestCreateExternalLoadBalancer(t *testing.T) {
table := []struct {
service *v1.Service
@ -93,14 +119,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
}
for _, item := range table {
cloud := &fakecloud.FakeCloud{}
cloud.Region = region
client := &fake.Clientset{}
controller, _ := New(cloud, client, "test-cluster")
controller.eventRecorder = record.NewFakeRecorder(100)
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
client.ClearActions() // ignore any client calls made in init()
controller, cloud, client := newController()
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
@ -217,14 +236,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
},
}
for _, item := range table {
cloud := &fakecloud.FakeCloud{}
cloud.Region = region
client := &fake.Clientset{}
controller, _ := New(cloud, client, "test-cluster2")
controller.eventRecorder = record.NewFakeRecorder(100)
controller.init()
cloud.Calls = nil // ignore any cloud calls made in init()
controller, cloud, _ := newController()
var services []*v1.Service
for _, service := range item.services {