kube-aggregator: periodically resync local specs

Co-authored-by: Dr. Stefan Schimanski <stefan.schimanski@gmail.com>
pull/564/head
Haowei Cai 2019-03-06 20:35:38 -08:00
parent 0ba4050ce0
commit 3079798d03
2 changed files with 43 additions and 5 deletions

View File

@ -19,6 +19,7 @@ package aggregator
import (
"fmt"
"net/http"
"strings"
"sync"
"time"
@ -40,17 +41,35 @@ type SpecAggregator interface {
UpdateAPIServiceSpec(apiServiceName string, spec *spec.Swagger, etag string) error
RemoveAPIServiceSpec(apiServiceName string) error
GetAPIServiceInfo(apiServiceName string) (handler http.Handler, etag string, exists bool)
GetAPIServiceNames() []string
}
const (
aggregatorUser = "system:aggregator"
specDownloadTimeout = 60 * time.Second
localDelegateChainNamePattern = "k8s_internal_local_delegation_chain_%010d"
localDelegateChainNamePrefix = "k8s_internal_local_delegation_chain_"
localDelegateChainNamePattern = localDelegateChainNamePrefix + "%010d"
// A randomly generated UUID to differentiate local and remote eTags.
locallyGeneratedEtagPrefix = "\"6E8F849B434D4B98A569B9D7718876E9-"
)
// IsLocalAPIService returns true for local specs from delegates.
func IsLocalAPIService(apiServiceName string) bool {
return strings.HasPrefix(apiServiceName, localDelegateChainNamePrefix)
}
// GetAPIServicesName returns the names of APIServices recorded in specAggregator.openAPISpecs.
// We use this function to pass the names of local APIServices to the controller in this package,
// so that the controller can periodically sync the OpenAPI spec from delegation API servers.
func (s *specAggregator) GetAPIServiceNames() []string {
names := make([]string, len(s.openAPISpecs))
for key := range s.openAPISpecs {
names = append(names, key)
}
return names
}
// BuildAndRegisterAggregator registered OpenAPI aggregator handler. This function is not thread safe as it only being called on startup.
func BuildAndRegisterAggregator(downloader *Downloader, delegationTarget server.DelegationTarget, webServices []*restful.WebService,
config *common.Config, pathHandler common.PathHandler) (SpecAggregator, error) {
@ -161,6 +180,7 @@ func (s *specAggregator) buildOpenAPISpec() (specToReturn *spec.Swagger, err err
return nil, err
}
}
return specToReturn, nil
}
@ -179,7 +199,14 @@ func (s *specAggregator) updateOpenAPISpec() error {
// tryUpdatingServiceSpecs tries updating openAPISpecs map with specified specInfo, and keeps the map intact
// if the update fails.
func (s *specAggregator) tryUpdatingServiceSpecs(specInfo *openAPISpecInfo) error {
if specInfo == nil {
return fmt.Errorf("invalid input: specInfo must be non-nil")
}
orgSpecInfo, exists := s.openAPISpecs[specInfo.apiService.Name]
// Skip aggregation if OpenAPI spec didn't change
if exists && orgSpecInfo != nil && orgSpecInfo.etag == specInfo.etag {
return nil
}
s.openAPISpecs[specInfo.apiService.Name] = specInfo
if err := s.updateOpenAPISpec(); err != nil {
if exists {

View File

@ -30,8 +30,9 @@ import (
)
const (
successfulUpdateDelay = time.Minute
failedUpdateMaxExpDelay = time.Hour
successfulUpdateDelay = time.Minute
successfulUpdateDelayLocal = time.Second
failedUpdateMaxExpDelay = time.Hour
)
type syncAction int
@ -64,6 +65,11 @@ func NewAggregationController(downloader *aggregator.Downloader, openAPIAggregat
c.syncHandler = c.sync
// update each service at least once, also those which are not coming from APIServices, namely local services
for _, name := range openAPIAggregationManager.GetAPIServiceNames() {
c.queue.AddAfter(name, time.Second)
}
return c
}
@ -104,8 +110,13 @@ func (c *AggregationController) processNextWorkItem() bool {
switch action {
case syncRequeue:
klog.Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
c.queue.AddAfter(key, successfulUpdateDelay)
if aggregator.IsLocalAPIService(key.(string)) {
klog.V(7).Infof("OpenAPI AggregationController: action for local item %s: Requeue after %s.", key, successfulUpdateDelayLocal)
c.queue.AddAfter(key, successfulUpdateDelayLocal)
} else {
klog.V(7).Infof("OpenAPI AggregationController: action for item %s: Requeue.", key)
c.queue.AddAfter(key, successfulUpdateDelay)
}
case syncRequeueRateLimited:
klog.Infof("OpenAPI AggregationController: action for item %s: Rate Limited Requeue.", key)
c.queue.AddRateLimited(key)