Fix deletion logic in service controller.

This is a regression from 1.5 exposed by cascading deltions. In order to apply updates, the service controller locks access to a cached service and spawns go routines without waiting for them. When updates and deletions arrive in quick succession, previous goroutines remain active and race with the deletion logic. Coupled with this, the service_helper was not re-evaluating the value of the DeletionTimestamp.

Without this patch, federation will sometimes leak resources at destruction time.
pull/6/head
Christian Bell 2017-03-16 10:17:45 -07:00
parent b4455b5960
commit 3769435a45
2 changed files with 14 additions and 2 deletions

View File

@ -131,9 +131,13 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
if isDeletion {
// cachedService is not reliable here as
// deleting cache is the last step of federation service deletion
_, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
// rebuild service if federation service still exists
if err == nil || !errors.IsNotFound(err) {
if err == nil && service.DeletionTimestamp != nil {
glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name)
return nil
}
return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset)
}
}

View File

@ -487,6 +487,10 @@ func wantsDNSRecords(service *v1.Service) bool {
// update DNS records and update the service info with DNS entries to federation apiserver.
// the function returns any error caught
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
if service.DeletionTimestamp != nil {
glog.V(4).Infof("Service has already been marked for deletion %v", service.Name)
return nil
}
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
// Create or Update k8s Service
err := s.ensureClusterService(cachedService, clusterName, service, client)
@ -508,15 +512,19 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c
}
// handle available clusters one by one
var hasErr bool
hasErr := false
var wg sync.WaitGroup
for clusterName, cache := range s.clusterCache.clientMap {
wg.Add(1)
go func(cache *clusterCache, clusterName string) {
defer wg.Done()
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
if err != nil {
hasErr = true
}
}(cache, clusterName)
}
wg.Wait()
if hasErr {
// detail error has been dumped inside the loop
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable