From 80ceb5b3d6a1009b23e075e2127443486d7266d0 Mon Sep 17 00:00:00 2001 From: shashidharatd Date: Thu, 18 May 2017 18:46:33 +0530 Subject: [PATCH] Some minor corrections in service controller --- .../service/servicecontroller.go | 48 ++++++++----------- 1 file changed, 20 insertions(+), 28 deletions(-) diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index 10b6c2a2b8..870e449cd5 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -44,6 +44,7 @@ import ( fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util" "k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector" "k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper" + "k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" @@ -95,12 +96,11 @@ type ServiceController struct { flowcontrolBackoff *flowcontrol.Backoff } -// New returns a new service controller to keep DNS provider service resources -// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry. +// New returns a new service controller to keep service objects between +// the federation and member clusters in sync. func New(federationClient fedclientset.Interface) *ServiceController { broadcaster := record.NewBroadcaster() - // federationClient event is not supported yet - // broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")}) + broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient)) recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName}) s := &ServiceController{ @@ -179,10 +179,6 @@ func New(federationClient fedclientset.Interface) *ServiceController { svc := obj.(*v1.Service) orphanDependents := false err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents}) - // IsNotFound error is fine since that means the object is deleted already. - if errors.IsNotFound(err) { - return nil - } return err }) @@ -235,44 +231,40 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj return s.federationClient.Core().Services(service.Namespace).Update(service) } -// Run starts a background goroutine that watches for changes to federation services -// and ensures that they have Kubernetes services created, updated or deleted appropriately. -// federationSyncPeriod controls how often we check the federation's services to -// ensure that the correct Kubernetes services (and associated DNS entries) exist. -// This is only necessary to fudge over failed watches. -// clusterSyncPeriod controls how often we check the federation's underlying clusters and -// their Kubernetes services to ensure that matching services created independently of the Federation -// (e.g. directly via the underlying cluster's API) are correctly accounted for. - -// It's an error to call Run() more than once for a given ServiceController -// object. +// Run starts informers, delay deliverers and workers. Workers continuously watch for events which could +// be from federation or federated clusters and tries to reconcile the service objects from federation to +// federated clusters. func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) { glog.Infof("Starting federation service controller") defer runtime.HandleCrash() + defer s.queue.ShutDown() + s.federatedInformer.Start() + defer s.federatedInformer.Stop() + s.endpointFederatedInformer.Start() + defer s.endpointFederatedInformer.Stop() + s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) { s.queue.Add(item.Value.(string)) }) + defer s.objectDeliverer.Stop() + s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) { s.deliverServicesOnClusterChange() }) + defer s.clusterDeliverer.Stop() + fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh) go s.serviceController.Run(stopCh) for i := 0; i < workers; i++ { go wait.Until(s.fedServiceWorker, time.Second, stopCh) } - go func() { - <-stopCh - glog.Infof("Shutting down Federation Service Controller") - s.queue.ShutDown() - s.federatedInformer.Stop() - s.endpointFederatedInformer.Stop() - s.objectDeliverer.Stop() - s.clusterDeliverer.Stop() - }() + + <-stopCh + glog.Infof("Shutting down federation service controller") } type reconciliationStatus string