mirror of https://github.com/k3s-io/k3s
Some minor corrections in service controller
parent
f006dcc9e1
commit
80ceb5b3d6
|
@ -44,6 +44,7 @@ import (
|
|||
fedutil "k8s.io/kubernetes/federation/pkg/federation-controller/util"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/clusterselector"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
|
||||
"k8s.io/kubernetes/federation/pkg/federation-controller/util/eventsink"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/v1"
|
||||
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||
|
@ -95,12 +96,11 @@ type ServiceController struct {
|
|||
flowcontrolBackoff *flowcontrol.Backoff
|
||||
}
|
||||
|
||||
// New returns a new service controller to keep DNS provider service resources
|
||||
// (like Kubernetes Services and DNS server records for service discovery) in sync with the registry.
|
||||
// New returns a new service controller to keep service objects between
|
||||
// the federation and member clusters in sync.
|
||||
func New(federationClient fedclientset.Interface) *ServiceController {
|
||||
broadcaster := record.NewBroadcaster()
|
||||
// federationClient event is not supported yet
|
||||
// broadcaster.StartRecordingToSink(&unversioned_core.EventSinkImpl{Interface: kubeClient.Core().Events("")})
|
||||
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(federationClient))
|
||||
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: UserAgentName})
|
||||
|
||||
s := &ServiceController{
|
||||
|
@ -179,10 +179,6 @@ func New(federationClient fedclientset.Interface) *ServiceController {
|
|||
svc := obj.(*v1.Service)
|
||||
orphanDependents := false
|
||||
err := client.Core().Services(svc.Namespace).Delete(svc.Name, &metav1.DeleteOptions{OrphanDependents: &orphanDependents})
|
||||
// IsNotFound error is fine since that means the object is deleted already.
|
||||
if errors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
return err
|
||||
})
|
||||
|
||||
|
@ -235,44 +231,40 @@ func (s *ServiceController) updateService(obj pkgruntime.Object) (pkgruntime.Obj
|
|||
return s.federationClient.Core().Services(service.Namespace).Update(service)
|
||||
}
|
||||
|
||||
// Run starts a background goroutine that watches for changes to federation services
|
||||
// and ensures that they have Kubernetes services created, updated or deleted appropriately.
|
||||
// federationSyncPeriod controls how often we check the federation's services to
|
||||
// ensure that the correct Kubernetes services (and associated DNS entries) exist.
|
||||
// This is only necessary to fudge over failed watches.
|
||||
// clusterSyncPeriod controls how often we check the federation's underlying clusters and
|
||||
// their Kubernetes services to ensure that matching services created independently of the Federation
|
||||
// (e.g. directly via the underlying cluster's API) are correctly accounted for.
|
||||
|
||||
// It's an error to call Run() more than once for a given ServiceController
|
||||
// object.
|
||||
// Run starts informers, delay deliverers and workers. Workers continuously watch for events which could
|
||||
// be from federation or federated clusters and tries to reconcile the service objects from federation to
|
||||
// federated clusters.
|
||||
func (s *ServiceController) Run(workers int, stopCh <-chan struct{}) {
|
||||
glog.Infof("Starting federation service controller")
|
||||
|
||||
defer runtime.HandleCrash()
|
||||
defer s.queue.ShutDown()
|
||||
|
||||
s.federatedInformer.Start()
|
||||
defer s.federatedInformer.Stop()
|
||||
|
||||
s.endpointFederatedInformer.Start()
|
||||
defer s.endpointFederatedInformer.Stop()
|
||||
|
||||
s.objectDeliverer.StartWithHandler(func(item *fedutil.DelayingDelivererItem) {
|
||||
s.queue.Add(item.Value.(string))
|
||||
})
|
||||
defer s.objectDeliverer.Stop()
|
||||
|
||||
s.clusterDeliverer.StartWithHandler(func(_ *fedutil.DelayingDelivererItem) {
|
||||
s.deliverServicesOnClusterChange()
|
||||
})
|
||||
defer s.clusterDeliverer.Stop()
|
||||
|
||||
fedutil.StartBackoffGC(s.flowcontrolBackoff, stopCh)
|
||||
go s.serviceController.Run(stopCh)
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(s.fedServiceWorker, time.Second, stopCh)
|
||||
}
|
||||
go func() {
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Federation Service Controller")
|
||||
s.queue.ShutDown()
|
||||
s.federatedInformer.Stop()
|
||||
s.endpointFederatedInformer.Stop()
|
||||
s.objectDeliverer.Stop()
|
||||
s.clusterDeliverer.Stop()
|
||||
}()
|
||||
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down federation service controller")
|
||||
}
|
||||
|
||||
type reconciliationStatus string
|
||||
|
|
Loading…
Reference in New Issue