Merge pull request #21982 from justinsb/fix_21952

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2016-03-04 16:23:45 -08:00
commit 3f8086192b
1 changed files with 48 additions and 16 deletions

View File

@ -44,13 +44,16 @@ const (
// How long to wait before retrying the processing of a service change.
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
// should be changed appropriately.
processingRetryInterval = 5 * time.Second
minRetryDelay = 5 * time.Second
maxRetryDelay = 300 * time.Second
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
doNotRetry = time.Duration(0)
)
type cachedService struct {
@ -61,6 +64,9 @@ type cachedService struct {
// Ensures only one goroutine can operate on this service at any given time.
mu sync.Mutex
// Controls error back-off
lastRetryDelay time.Duration
}
type serviceCache struct {
@ -184,21 +190,26 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
glog.Errorf("Received nil delta from watcher queue.")
continue
}
err, shouldRetry := s.processDelta(delta)
if shouldRetry {
err, retryDelay := s.processDelta(delta)
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service delta. Retrying: %v", err)
time.Sleep(processingRetryInterval)
serviceQueue.AddIfNotPresent(deltas)
glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err)
go func(deltas cache.Deltas, delay time.Duration) {
time.Sleep(delay)
if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
glog.Errorf("Error requeuing service delta - will not retry: %v", err)
}
}(deltas, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
}
}
}
// Returns an error if processing the delta failed, along with a boolean
// indicator of whether the processing should be retried.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// Returns an error if processing the delta failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
deltaService, ok := delta.Object.(*api.Service)
var namespacedName types.NamespacedName
var cachedService *cachedService
@ -208,11 +219,11 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
// cache for deleting.
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
if !ok {
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry
}
cachedService, ok = s.cache.get(key.Key)
if !ok {
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
}
deltaService = cachedService.lastState
delta.Object = deltaService
@ -236,7 +247,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
if err != nil && !errors.IsNotFound(err) {
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
return err, retryable
return err, cachedService.nextRetryDelay()
} else if errors.IsNotFound(err) {
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
@ -244,11 +255,13 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, retryable
return err, cachedService.nextRetryDelay()
}
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(namespacedName.String())
return nil, notRetryable
cachedService.resetRetryDelay()
return nil, doNotRetry
}
// Update the cached service (used above for populating synthetic deletes)
@ -265,7 +278,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
message += err.Error()
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
return err, retry
return err, cachedService.nextRetryDelay()
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
@ -274,7 +287,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
cachedService.appliedState = service
s.cache.set(namespacedName.String(), cachedService)
return nil, notRetryable
cachedService.resetRetryDelay()
return nil, doNotRetry
}
// Returns whatever error occurred along with a boolean indicator of whether it
@ -738,3 +752,21 @@ func wantsLoadBalancer(service *api.Service) bool {
func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextRetryDelay() time.Duration {
s.lastRetryDelay = s.lastRetryDelay * 2
if s.lastRetryDelay < minRetryDelay {
s.lastRetryDelay = minRetryDelay
}
if s.lastRetryDelay > maxRetryDelay {
s.lastRetryDelay = maxRetryDelay
}
return s.lastRetryDelay
}
// Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}