mirror of https://github.com/k3s-io/k3s
Limit APIService healthz check to startup
parent
6a314ce3a9
commit
9a8b111c9c
|
@ -50,7 +50,6 @@ go_library(
|
|||
"//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library",
|
||||
|
@ -68,10 +67,12 @@ go_library(
|
|||
"//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers:go_default_library",
|
||||
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library",
|
||||
"//vendor/k8s.io/kube-openapi/pkg/common:go_default_library",
|
||||
],
|
||||
|
|
|
@ -24,21 +24,24 @@ import (
|
|||
"io/ioutil"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
"k8s.io/apiserver/pkg/server/healthz"
|
||||
genericoptions "k8s.io/apiserver/pkg/server/options"
|
||||
kubeexternalinformers "k8s.io/client-go/informers"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1"
|
||||
aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver"
|
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
|
||||
"k8s.io/kube-aggregator/pkg/controllers/autoregister"
|
||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||
"k8s.io/kubernetes/pkg/master/controller/crdregistration"
|
||||
|
@ -106,35 +109,14 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega
|
|||
go crdRegistrationController.Run(5, context.StopCh)
|
||||
return nil
|
||||
})
|
||||
aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error {
|
||||
items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
missing := []apiregistration.APIService{}
|
||||
for _, apiService := range apiServices {
|
||||
found := false
|
||||
for _, item := range items {
|
||||
if item.Name != apiService.Name {
|
||||
continue
|
||||
}
|
||||
if apiregistration.IsAPIServiceConditionTrue(item, apiregistration.Available) {
|
||||
found = true
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if !found {
|
||||
missing = append(missing, *apiService)
|
||||
}
|
||||
}
|
||||
|
||||
if len(missing) > 0 {
|
||||
return fmt.Errorf("missing APIService: %v", missing)
|
||||
}
|
||||
return nil
|
||||
}))
|
||||
aggregatorServer.GenericAPIServer.AddHealthzChecks(
|
||||
makeAPIServiceAvailableHealthzCheck(
|
||||
"autoregister-completion",
|
||||
apiServices,
|
||||
aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(),
|
||||
),
|
||||
)
|
||||
|
||||
return aggregatorServer, nil
|
||||
}
|
||||
|
@ -158,6 +140,45 @@ func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService {
|
|||
}
|
||||
}
|
||||
|
||||
// makeAPIServiceAvailableHealthzCheck returns a healthz check that returns healthy
|
||||
// once all of the specified services have been observed to be available at least once.
|
||||
func makeAPIServiceAvailableHealthzCheck(name string, apiServices []*apiregistration.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthzChecker {
|
||||
// Track the auto-registered API services that have not been observed to be available yet
|
||||
pendingServiceNamesLock := &sync.RWMutex{}
|
||||
pendingServiceNames := sets.NewString()
|
||||
for _, service := range apiServices {
|
||||
pendingServiceNames.Insert(service.Name)
|
||||
}
|
||||
|
||||
// When an APIService in the list is seen as available, remove it from the pending list
|
||||
handleAPIServiceChange := func(service *apiregistration.APIService) {
|
||||
pendingServiceNamesLock.Lock()
|
||||
defer pendingServiceNamesLock.Unlock()
|
||||
if !pendingServiceNames.Has(service.Name) {
|
||||
return
|
||||
}
|
||||
if apiregistration.IsAPIServiceConditionTrue(service, apiregistration.Available) {
|
||||
pendingServiceNames.Delete(service.Name)
|
||||
}
|
||||
}
|
||||
|
||||
// Watch add/update events for APIServices
|
||||
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*apiregistration.APIService)) },
|
||||
UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*apiregistration.APIService)) },
|
||||
})
|
||||
|
||||
// Don't return healthy until the pending list is empty
|
||||
return healthz.NamedCheck(name, func(r *http.Request) error {
|
||||
pendingServiceNamesLock.RLock()
|
||||
defer pendingServiceNamesLock.RUnlock()
|
||||
if pendingServiceNames.Len() > 0 {
|
||||
return fmt.Errorf("missing APIService: %v", pendingServiceNames.List())
|
||||
}
|
||||
return nil
|
||||
})
|
||||
}
|
||||
|
||||
type priority struct {
|
||||
group int32
|
||||
version int32
|
||||
|
|
Loading…
Reference in New Issue