Merge pull request #75217 from deads2k/agg-parallel

for aggregated apiserver availability, try multiple endpoints in parallel
k3s-v1.15.3
Kubernetes Prow Robot 2019-04-09 22:00:13 -07:00 committed by GitHub
commit aa74064600
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 44 additions and 21 deletions

View File

@ -242,33 +242,56 @@ func (c *AvailableConditionController) sync(key string) error {
}
// actually try to hit the discovery endpoint when it isn't local and when we're routing as a service.
if apiService.Spec.Service != nil && c.serviceResolver != nil {
discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, apiService.Spec.Service.Port)
if err != nil {
return err
attempts := 5
results := make(chan error, attempts)
for i := 0; i < attempts; i++ {
go func() {
discoveryURL, err := c.serviceResolver.ResolveEndpoint(apiService.Spec.Service.Namespace, apiService.Spec.Service.Name, apiService.Spec.Service.Port)
if err != nil {
results <- err
return
}
errCh := make(chan error)
go func() {
resp, err := c.discoveryClient.Get(discoveryURL.String())
if resp != nil {
resp.Body.Close()
}
errCh <- err
}()
select {
case err = <-errCh:
if err != nil {
results <- fmt.Errorf("no response from %v: %v", discoveryURL, err)
return
}
// we had trouble with slow dial and DNS responses causing us to wait too long.
// we added this as insurance
case <-time.After(6 * time.Second):
results <- fmt.Errorf("timed out waiting for %v", discoveryURL)
return
}
results <- nil
}()
}
errCh := make(chan error)
go func() {
resp, err := c.discoveryClient.Get(discoveryURL.String())
if resp != nil {
resp.Body.Close()
var lastError error
for i := 0; i < attempts; i++ {
lastError = <-results
// if we had at least one success, we are successful overall and we can return now
if lastError == nil {
break
}
errCh <- err
}()
select {
case err = <-errCh:
// we had trouble with slow dial and DNS responses causing us to wait too long.
// we added this as insurance
case <-time.After(6 * time.Second):
err = fmt.Errorf("timed out waiting for %v", discoveryURL)
}
if err != nil {
if lastError != nil {
availableCondition.Status = apiregistration.ConditionFalse
availableCondition.Reason = "FailedDiscoveryCheck"
availableCondition.Message = fmt.Sprintf("no response from %v: %v", discoveryURL, err)
availableCondition.Message = lastError.Error()
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
_, updateErr := updateAPIServiceStatus(c.apiServiceClient, originalAPIService, apiService)
if updateErr != nil {
@ -276,7 +299,7 @@ func (c *AvailableConditionController) sync(key string) error {
}
// force a requeue to make it very obvious that this will be retried at some point in the future
// along with other requeues done via service change, endpoint change, and resync
return err
return lastError
}
}