mirror of https://github.com/k3s-io/k3s
Merge pull request #45432 from deads2k/agg-30-status
Automatic merge from submit-queue (batch tested with PRs 44798, 45537, 45448, 45432) use apiservice.status to break apart controller and handling concerns Still needs tests. This starts breaking the handler and controller aspects of the aggregator by making use of status and conditions instead of actually running a specific check on demand. @kubernetes/sig-api-machinery-pr-reviews @luxas since you've been askingpull/6/head
commit
97889d4ff9
|
@ -66,3 +66,37 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion {
|
|||
tokens := strings.SplitN(apiServiceName, ".", 2)
|
||||
return schema.GroupVersion{Group: tokens[1], Version: tokens[0]}
|
||||
}
|
||||
|
||||
// SetAPIServiceCondition sets the status condition. It either overwrites the existing one or
|
||||
// creates a new one
|
||||
func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) {
|
||||
var existingCondition *APIServiceCondition
|
||||
for i := range apiService.Status.Conditions {
|
||||
if apiService.Status.Conditions[i].Type == newCondition.Type {
|
||||
existingCondition = &apiService.Status.Conditions[i]
|
||||
break
|
||||
}
|
||||
}
|
||||
if existingCondition == nil {
|
||||
apiService.Status.Conditions = append(apiService.Status.Conditions, newCondition)
|
||||
return
|
||||
}
|
||||
|
||||
if existingCondition.Status != newCondition.Status {
|
||||
existingCondition.Status = newCondition.Status
|
||||
existingCondition.LastTransitionTime = newCondition.LastTransitionTime
|
||||
}
|
||||
|
||||
existingCondition.Reason = newCondition.Reason
|
||||
existingCondition.Message = newCondition.Message
|
||||
}
|
||||
|
||||
// IsAPIServiceConditionTrue indicates if the condition is present and strictly true
|
||||
func IsAPIServiceConditionTrue(apiService *APIService, conditionType APIServiceConditionType) bool {
|
||||
for _, condition := range apiService.Status.Conditions {
|
||||
if condition.Type == conditionType && condition.Status == ConditionTrue {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -24,10 +24,10 @@ import (
|
|||
utilvalidation "k8s.io/apimachinery/pkg/util/validation"
|
||||
"k8s.io/apimachinery/pkg/util/validation/field"
|
||||
|
||||
discoveryapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
)
|
||||
|
||||
func ValidateAPIService(apiService *discoveryapi.APIService) field.ErrorList {
|
||||
func ValidateAPIService(apiService *apiregistration.APIService) field.ErrorList {
|
||||
requiredName := apiService.Spec.Version + "." + apiService.Spec.Group
|
||||
|
||||
allErrs := validation.ValidateObjectMeta(&apiService.ObjectMeta, false,
|
||||
|
@ -86,9 +86,30 @@ func ValidateAPIService(apiService *discoveryapi.APIService) field.ErrorList {
|
|||
return allErrs
|
||||
}
|
||||
|
||||
func ValidateAPIServiceUpdate(newAPIService *discoveryapi.APIService, oldAPIService *discoveryapi.APIService) field.ErrorList {
|
||||
func ValidateAPIServiceUpdate(newAPIService *apiregistration.APIService, oldAPIService *apiregistration.APIService) field.ErrorList {
|
||||
allErrs := validation.ValidateObjectMetaUpdate(&newAPIService.ObjectMeta, &oldAPIService.ObjectMeta, field.NewPath("metadata"))
|
||||
allErrs = append(allErrs, ValidateAPIService(newAPIService)...)
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func ValidateAPIServiceStatus(status *apiregistration.APIServiceStatus, fldPath *field.Path) field.ErrorList {
|
||||
allErrs := field.ErrorList{}
|
||||
|
||||
for i, condition := range status.Conditions {
|
||||
if condition.Status != apiregistration.ConditionTrue &&
|
||||
condition.Status != apiregistration.ConditionFalse &&
|
||||
condition.Status != apiregistration.ConditionUnknown {
|
||||
allErrs = append(allErrs, field.NotSupported(fldPath.Child("conditions").Index(i).Child("status"), condition.Status, []string{
|
||||
string(apiregistration.ConditionTrue), string(apiregistration.ConditionFalse), string(apiregistration.ConditionUnknown)}))
|
||||
}
|
||||
}
|
||||
|
||||
return allErrs
|
||||
}
|
||||
|
||||
func ValidateAPIServiceStatusUpdate(newAPIService *apiregistration.APIService, oldAPIService *apiregistration.APIService) field.ErrorList {
|
||||
allErrs := validation.ValidateObjectMetaUpdate(&newAPIService.ObjectMeta, &oldAPIService.ObjectMeta, field.NewPath("metadata"))
|
||||
allErrs = append(allErrs, ValidateAPIServiceStatus(&newAPIService.Status, field.NewPath("status"))...)
|
||||
return allErrs
|
||||
}
|
||||
|
|
|
@ -81,6 +81,7 @@ go_library(
|
|||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers/status:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice/etcd:go_default_library",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -32,7 +32,6 @@ import (
|
|||
genericapiserver "k8s.io/apiserver/pkg/server"
|
||||
kubeinformers "k8s.io/client-go/informers"
|
||||
kubeclientset "k8s.io/client-go/kubernetes"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/version"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
|
@ -41,6 +40,7 @@ import (
|
|||
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
statuscontrollers "k8s.io/kube-aggregator/pkg/controllers/status"
|
||||
apiservicestorage "k8s.io/kube-aggregator/pkg/registry/apiservice/etcd"
|
||||
)
|
||||
|
||||
|
@ -102,11 +102,6 @@ type APIAggregator struct {
|
|||
// controller state
|
||||
lister listers.APIServiceLister
|
||||
|
||||
// serviceLister is used by the aggregator handler to determine whether or not to try to expose the group
|
||||
serviceLister v1listers.ServiceLister
|
||||
// endpointsLister is used by the aggregator handler to determine whether or not to try to expose the group
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
// provided for easier embedding
|
||||
APIRegistrationInformers informers.SharedInformerFactory
|
||||
}
|
||||
|
@ -140,30 +135,34 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||
return nil, err
|
||||
}
|
||||
|
||||
apiregistrationClient, err := internalclientset.NewForConfig(c.Config.GenericConfig.LoopbackClientConfig)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
informerFactory := informers.NewSharedInformerFactory(
|
||||
internalclientset.NewForConfigOrDie(c.Config.GenericConfig.LoopbackClientConfig),
|
||||
apiregistrationClient,
|
||||
5*time.Minute, // this is effectively used as a refresh interval right now. Might want to do something nicer later on.
|
||||
)
|
||||
kubeInformers := kubeinformers.NewSharedInformerFactory(c.CoreAPIServerClient, 5*time.Minute)
|
||||
|
||||
s := &APIAggregator{
|
||||
GenericAPIServer: genericServer,
|
||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
serviceLister: kubeInformers.Core().V1().Services().Lister(),
|
||||
endpointsLister: kubeInformers.Core().V1().Endpoints().Lister(),
|
||||
GenericAPIServer: genericServer,
|
||||
delegateHandler: delegationTarget.UnprotectedHandler(),
|
||||
contextMapper: c.GenericConfig.RequestContextMapper,
|
||||
proxyClientCert: c.ProxyClientCert,
|
||||
proxyClientKey: c.ProxyClientKey,
|
||||
proxyHandlers: map[string]*proxyHandler{},
|
||||
handledGroups: sets.String{},
|
||||
lister: informerFactory.Apiregistration().InternalVersion().APIServices().Lister(),
|
||||
APIRegistrationInformers: informerFactory,
|
||||
}
|
||||
|
||||
apiGroupInfo := genericapiserver.NewDefaultAPIGroupInfo(apiregistration.GroupName, registry, Scheme, metav1.ParameterCodec, Codecs)
|
||||
apiGroupInfo.GroupMeta.GroupVersion = v1alpha1.SchemeGroupVersion
|
||||
v1alpha1storage := map[string]rest.Storage{}
|
||||
v1alpha1storage["apiservices"] = apiservicestorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
|
||||
apiServiceREST := apiservicestorage.NewREST(Scheme, c.GenericConfig.RESTOptionsGetter)
|
||||
v1alpha1storage["apiservices"] = apiServiceREST
|
||||
v1alpha1storage["apiservices/status"] = apiservicestorage.NewStatusREST(Scheme, apiServiceREST)
|
||||
apiGroupInfo.VersionedResourcesStorageMap["v1alpha1"] = v1alpha1storage
|
||||
|
||||
if err := s.GenericAPIServer.InstallAPIGroup(&apiGroupInfo); err != nil {
|
||||
|
@ -171,15 +170,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||
}
|
||||
|
||||
apisHandler := &apisHandler{
|
||||
codecs: Codecs,
|
||||
lister: s.lister,
|
||||
serviceLister: s.serviceLister,
|
||||
endpointsLister: s.endpointsLister,
|
||||
codecs: Codecs,
|
||||
lister: s.lister,
|
||||
}
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle("/apis", apisHandler)
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
|
||||
|
||||
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), kubeInformers.Core().V1().Services(), s)
|
||||
availableController := statuscontrollers.NewAvailableConditionController(
|
||||
informerFactory.Apiregistration().InternalVersion().APIServices(),
|
||||
kubeInformers.Core().V1().Services(),
|
||||
kubeInformers.Core().V1().Endpoints(),
|
||||
apiregistrationClient.Apiregistration(),
|
||||
)
|
||||
|
||||
s.GenericAPIServer.AddPostStartHook("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
|
||||
informerFactory.Start(stopCh)
|
||||
|
@ -190,6 +193,10 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
|
|||
go apiserviceRegistrationController.Run(stopCh)
|
||||
return nil
|
||||
})
|
||||
s.GenericAPIServer.AddPostStartHook("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
|
||||
go availableController.Run(stopCh)
|
||||
return nil
|
||||
})
|
||||
|
||||
return s, nil
|
||||
}
|
||||
|
@ -235,12 +242,10 @@ func (s *APIAggregator) AddAPIService(apiService *apiregistration.APIService, de
|
|||
// it's time to register the group aggregation endpoint
|
||||
groupPath := "/apis/" + apiService.Spec.Group
|
||||
groupDiscoveryHandler := &apiGroupHandler{
|
||||
codecs: Codecs,
|
||||
groupName: apiService.Spec.Group,
|
||||
lister: s.lister,
|
||||
serviceLister: s.serviceLister,
|
||||
endpointsLister: s.endpointsLister,
|
||||
delegate: s.delegateHandler,
|
||||
codecs: Codecs,
|
||||
groupName: apiService.Spec.Group,
|
||||
lister: s.lister,
|
||||
delegate: s.delegateHandler,
|
||||
}
|
||||
// aggregation is protected
|
||||
s.GenericAPIServer.Handler.PostGoRestfulMux.Handle(groupPath, groupDiscoveryHandler)
|
||||
|
|
|
@ -96,6 +96,13 @@ func (c *APIServiceRegistrationController) sync(key string) error {
|
|||
return err
|
||||
}
|
||||
|
||||
// remove registration handling for APIServices which are not available
|
||||
if !apiregistration.IsAPIServiceConditionTrue(apiService, apiregistration.Available) {
|
||||
c.apiHandlerManager.RemoveAPIService(key)
|
||||
return nil
|
||||
}
|
||||
|
||||
// TODO move the destination host to status so that you can see where its going
|
||||
c.apiHandlerManager.AddAPIService(apiService, c.getDestinationHost(apiService))
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -25,7 +25,6 @@ import (
|
|||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
"k8s.io/apimachinery/pkg/runtime/serializer"
|
||||
"k8s.io/apiserver/pkg/endpoints/handlers/responsewriters"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
|
||||
apiregistrationapi "k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
apiregistrationv1alpha1api "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1alpha1"
|
||||
|
@ -37,9 +36,6 @@ import (
|
|||
type apisHandler struct {
|
||||
codecs serializer.CodecFactory
|
||||
lister listers.APIServiceLister
|
||||
|
||||
serviceLister v1listers.ServiceLister
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
}
|
||||
|
||||
var discoveryGroup = metav1.APIGroup{
|
||||
|
@ -74,7 +70,7 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
if len(apiGroupServers[0].Spec.Group) == 0 {
|
||||
continue
|
||||
}
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers, r.serviceLister, r.endpointsLister)
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiGroupServers)
|
||||
if discoveryGroup != nil {
|
||||
discoveryGroupList.Groups = append(discoveryGroupList.Groups, *discoveryGroup)
|
||||
}
|
||||
|
@ -85,33 +81,14 @@ func (r *apisHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
|
||||
// convertToDiscoveryAPIGroup takes apiservices in a single group and returns a discovery compatible object.
|
||||
// if none of the services are available, it will return nil.
|
||||
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService, serviceLister v1listers.ServiceLister, endpointsLister v1listers.EndpointsLister) *metav1.APIGroup {
|
||||
func convertToDiscoveryAPIGroup(apiServices []*apiregistrationapi.APIService) *metav1.APIGroup {
|
||||
apiServicesByGroup := apiregistrationapi.SortedByGroup(apiServices)[0]
|
||||
|
||||
var discoveryGroup *metav1.APIGroup
|
||||
|
||||
for _, apiService := range apiServicesByGroup {
|
||||
if apiService.Spec.Service != nil {
|
||||
// skip any API services without actual services
|
||||
if _, err := serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name); err != nil {
|
||||
continue
|
||||
}
|
||||
|
||||
hasActiveEndpoints := false
|
||||
endpoints, err := endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
// skip any API services without endpoints
|
||||
if err != nil {
|
||||
continue
|
||||
}
|
||||
for _, subset := range endpoints.Subsets {
|
||||
if len(subset.Addresses) > 0 {
|
||||
hasActiveEndpoints = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasActiveEndpoints {
|
||||
continue
|
||||
}
|
||||
if !apiregistrationapi.IsAPIServiceConditionTrue(apiService, apiregistrationapi.Available) {
|
||||
continue
|
||||
}
|
||||
|
||||
// the first APIService which is valid becomes the default
|
||||
|
@ -143,9 +120,6 @@ type apiGroupHandler struct {
|
|||
|
||||
lister listers.APIServiceLister
|
||||
|
||||
serviceLister v1listers.ServiceLister
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
|
||||
delegate http.Handler
|
||||
}
|
||||
|
||||
|
@ -172,7 +146,7 @@ func (r *apiGroupHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
|||
return
|
||||
}
|
||||
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup, r.serviceLister, r.endpointsLister)
|
||||
discoveryGroup := convertToDiscoveryAPIGroup(apiServicesForGroup)
|
||||
if discoveryGroup == nil {
|
||||
http.Error(w, "", http.StatusNotFound)
|
||||
return
|
||||
|
|
|
@ -27,8 +27,6 @@ import (
|
|||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/diff"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
corev1 "k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
|
@ -65,6 +63,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 10,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
|
@ -77,6 +80,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroupList{
|
||||
|
@ -126,6 +134,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 20,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
|
||||
|
@ -138,6 +151,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v2",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
|
||||
|
@ -150,6 +168,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v2",
|
||||
Priority: 1,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
|
@ -162,6 +185,11 @@ func TestAPIs(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroupList{
|
||||
|
@ -209,25 +237,13 @@ func TestAPIs(t *testing.T) {
|
|||
|
||||
for _, tc := range tests {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
handler := &apisHandler{
|
||||
codecs: Codecs,
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
}
|
||||
for _, o := range tc.apiservices {
|
||||
indexer.Add(o)
|
||||
}
|
||||
serviceIndexer.Add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"}})
|
||||
endpointsIndexer.Add(&corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"},
|
||||
Subsets: []corev1.EndpointSubset{
|
||||
{Addresses: []corev1.EndpointAddress{{}}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
@ -319,6 +335,11 @@ func TestAPIGroup(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 20,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.bar"},
|
||||
|
@ -331,6 +352,11 @@ func TestAPIGroup(t *testing.T) {
|
|||
Version: "v2",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v2.foo"},
|
||||
|
@ -343,6 +369,11 @@ func TestAPIGroup(t *testing.T) {
|
|||
Version: "v2",
|
||||
Priority: 1,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: "v1.bar"},
|
||||
|
@ -355,6 +386,11 @@ func TestAPIGroup(t *testing.T) {
|
|||
Version: "v1",
|
||||
Priority: 11,
|
||||
},
|
||||
Status: apiregistration.APIServiceStatus{
|
||||
Conditions: []apiregistration.APIServiceCondition{
|
||||
{Type: apiregistration.Available, Status: apiregistration.ConditionTrue},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
expected: &metav1.APIGroup{
|
||||
|
@ -380,26 +416,14 @@ func TestAPIGroup(t *testing.T) {
|
|||
|
||||
for _, tc := range tests {
|
||||
indexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
handler := &apiGroupHandler{
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
groupName: "foo",
|
||||
codecs: Codecs,
|
||||
lister: listers.NewAPIServiceLister(indexer),
|
||||
groupName: "foo",
|
||||
}
|
||||
for _, o := range tc.apiservices {
|
||||
indexer.Add(o)
|
||||
}
|
||||
serviceIndexer.Add(&corev1.Service{ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"}})
|
||||
endpointsIndexer.Add(&corev1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: "ns", Name: "api"},
|
||||
Subsets: []corev1.EndpointSubset{
|
||||
{Addresses: []corev1.EndpointAddress{{}}},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
server := httptest.NewServer(handler)
|
||||
defer server.Close()
|
||||
|
|
|
@ -0,0 +1,53 @@
|
|||
package(default_visibility = ["//visibility:public"])
|
||||
|
||||
licenses(["notice"])
|
||||
|
||||
load(
|
||||
"@io_bazel_rules_go//go:def.bzl",
|
||||
"go_library",
|
||||
"go_test",
|
||||
)
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["available_controller.go"],
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/github.com/golang/glog:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/api/meta:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/conversion:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library",
|
||||
"//vendor/k8s.io/client-go/informers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/controllers:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = ["available_controller_test.go"],
|
||||
library = ":go_default_library",
|
||||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/listers/core/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/pkg/api/v1:go_default_library",
|
||||
"//vendor/k8s.io/client-go/testing:go_default_library",
|
||||
"//vendor/k8s.io/client-go/tools/cache:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion:go_default_library",
|
||||
],
|
||||
)
|
|
@ -0,0 +1,364 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/conversion"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
v1informers "k8s.io/client-go/informers/core/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
|
||||
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
"k8s.io/kube-aggregator/pkg/controllers"
|
||||
)
|
||||
|
||||
var cloner = conversion.NewCloner()
|
||||
|
||||
type AvailableConditionController struct {
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter
|
||||
|
||||
apiServiceLister listers.APIServiceLister
|
||||
apiServiceSynced cache.InformerSynced
|
||||
|
||||
// serviceLister is used to get the IP to create the transport for
|
||||
serviceLister v1listers.ServiceLister
|
||||
servicesSynced cache.InformerSynced
|
||||
|
||||
endpointsLister v1listers.EndpointsLister
|
||||
endpointsSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
|
||||
queue workqueue.RateLimitingInterface
|
||||
}
|
||||
|
||||
func NewAvailableConditionController(
|
||||
apiServiceInformer informers.APIServiceInformer,
|
||||
serviceInformer v1informers.ServiceInformer,
|
||||
endpointsInformer v1informers.EndpointsInformer,
|
||||
apiServiceClient apiregistrationclient.APIServicesGetter,
|
||||
) *AvailableConditionController {
|
||||
c := &AvailableConditionController{
|
||||
apiServiceClient: apiServiceClient,
|
||||
apiServiceLister: apiServiceInformer.Lister(),
|
||||
apiServiceSynced: apiServiceInformer.Informer().HasSynced,
|
||||
serviceLister: serviceInformer.Lister(),
|
||||
servicesSynced: serviceInformer.Informer().HasSynced,
|
||||
endpointsLister: endpointsInformer.Lister(),
|
||||
endpointsSynced: endpointsInformer.Informer().HasSynced,
|
||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "AvailableConditionController"),
|
||||
}
|
||||
|
||||
apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addAPIService,
|
||||
UpdateFunc: c.updateAPIService,
|
||||
DeleteFunc: c.deleteAPIService,
|
||||
})
|
||||
|
||||
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addService,
|
||||
UpdateFunc: c.updateService,
|
||||
DeleteFunc: c.deleteService,
|
||||
})
|
||||
|
||||
endpointsInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: c.addEndpoints,
|
||||
UpdateFunc: c.updateEndpoints,
|
||||
DeleteFunc: c.deleteEndpoints,
|
||||
})
|
||||
|
||||
c.syncFn = c.sync
|
||||
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) sync(key string) error {
|
||||
inAPIService, err := c.apiServiceLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
apiService := &apiregistration.APIService{}
|
||||
if err := apiregistration.DeepCopy_apiregistration_APIService(inAPIService, apiService, cloner); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
availableCondition := apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
LastTransitionTime: metav1.Now(),
|
||||
}
|
||||
|
||||
// local API services are always considered available
|
||||
if apiService.Spec.Service == nil {
|
||||
availableCondition.Status = apiregistration.ConditionTrue
|
||||
availableCondition.Reason = "Local"
|
||||
availableCondition.Message = "Local APIServices are always available"
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
service, err := c.serviceLister.Services(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "ServiceNotFound"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q is not present", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
} else if err != nil {
|
||||
availableCondition.Status = apiregistration.ConditionUnknown
|
||||
availableCondition.Reason = "ServiceAccessError"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
if service.Spec.Type == v1.ServiceTypeClusterIP {
|
||||
endpoints, err := c.endpointsLister.Endpoints(apiService.Spec.Service.Namespace).Get(apiService.Spec.Service.Name)
|
||||
if apierrors.IsNotFound(err) {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "EndpointsNotFound"
|
||||
availableCondition.Message = fmt.Sprintf("cannot find endpoints for service/%s in %q", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
} else if err != nil {
|
||||
availableCondition.Status = apiregistration.ConditionUnknown
|
||||
availableCondition.Reason = "EndpointsAccessError"
|
||||
availableCondition.Message = fmt.Sprintf("service/%s in %q cannot be checked due to: %v", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace, err)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
hasActiveEndpoints := false
|
||||
for _, subset := range endpoints.Subsets {
|
||||
if len(subset.Addresses) > 0 {
|
||||
hasActiveEndpoints = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !hasActiveEndpoints {
|
||||
availableCondition.Status = apiregistration.ConditionFalse
|
||||
availableCondition.Reason = "MissingEndpoints"
|
||||
availableCondition.Message = fmt.Sprintf("endpoints for service/%s in %q have no addresses", apiService.Spec.Service.Name, apiService.Spec.Service.Namespace)
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err := c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO actually try to hit the discovery endpoint
|
||||
|
||||
availableCondition.Reason = "Passed"
|
||||
availableCondition.Message = "all checks passed"
|
||||
apiregistration.SetAPIServiceCondition(apiService, availableCondition)
|
||||
_, err = c.apiServiceClient.APIServices().UpdateStatus(apiService)
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) Run(stopCh <-chan struct{}) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
glog.Infof("Starting AvailableConditionController")
|
||||
defer glog.Infof("Shutting down AvailableConditionController")
|
||||
|
||||
if !controllers.WaitForCacheSync("AvailableConditionController", stopCh, c.apiServiceSynced, c.servicesSynced, c.endpointsSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
// only start one worker thread since its a slow moving API and the aggregation server adding bits
|
||||
// aren't threadsafe
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
|
||||
<-stopCh
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *AvailableConditionController) processNextWorkItem() bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key.(string))
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
}
|
||||
|
||||
utilruntime.HandleError(fmt.Errorf("%v failed with: %v", key, err))
|
||||
c.queue.AddRateLimited(key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) enqueue(obj *apiregistration.APIService) {
|
||||
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
|
||||
c.queue.Add(key)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) addAPIService(obj interface{}) {
|
||||
castObj := obj.(*apiregistration.APIService)
|
||||
glog.V(4).Infof("Adding %s", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateAPIService(obj, _ interface{}) {
|
||||
castObj := obj.(*apiregistration.APIService)
|
||||
glog.V(4).Infof("Updating %s", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteAPIService(obj interface{}) {
|
||||
castObj, ok := obj.(*apiregistration.APIService)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*apiregistration.APIService)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
glog.V(4).Infof("Deleting %q", castObj.Name)
|
||||
c.enqueue(castObj)
|
||||
}
|
||||
|
||||
// there aren't very many apiservices, just check them all.
|
||||
func (c *AvailableConditionController) getAPIServicesFor(obj runtime.Object) []*apiregistration.APIService {
|
||||
metadata, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
utilruntime.HandleError(err)
|
||||
return nil
|
||||
}
|
||||
|
||||
var ret []*apiregistration.APIService
|
||||
apiServiceList, _ := c.apiServiceLister.List(labels.Everything())
|
||||
for _, apiService := range apiServiceList {
|
||||
if apiService.Spec.Service == nil {
|
||||
continue
|
||||
}
|
||||
if apiService.Spec.Service.Namespace == metadata.GetNamespace() && apiService.Spec.Service.Name == metadata.GetName() {
|
||||
ret = append(ret, apiService)
|
||||
}
|
||||
}
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
// TODO, think of a way to avoid checking on every service manipulation
|
||||
|
||||
func (c *AvailableConditionController) addService(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateService(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Service)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteService(obj interface{}) {
|
||||
castObj, ok := obj.(*v1.Service)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v1.Service)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) addEndpoints(obj interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) updateEndpoints(obj, _ interface{}) {
|
||||
for _, apiService := range c.getAPIServicesFor(obj.(*v1.Endpoints)) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
||||
|
||||
func (c *AvailableConditionController) deleteEndpoints(obj interface{}) {
|
||||
castObj, ok := obj.(*v1.Endpoints)
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %#v", obj)
|
||||
return
|
||||
}
|
||||
castObj, ok = tombstone.Obj.(*v1.Endpoints)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not expected %#v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
for _, apiService := range c.getAPIServicesFor(castObj) {
|
||||
c.enqueue(apiService)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,206 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package apiserver
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
v1listers "k8s.io/client-go/listers/core/v1"
|
||||
"k8s.io/client-go/pkg/api/v1"
|
||||
clienttesting "k8s.io/client-go/testing"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake"
|
||||
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion"
|
||||
)
|
||||
|
||||
func newEndpoints(namespace, name string) *v1.Endpoints {
|
||||
return &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
}
|
||||
}
|
||||
|
||||
func newEndpointsWithAddress(namespace, name string) *v1.Endpoints {
|
||||
return &v1.Endpoints{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
Subsets: []v1.EndpointSubset{
|
||||
{
|
||||
Addresses: []v1.EndpointAddress{
|
||||
{
|
||||
IP: "val",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newService(namespace, name string) *v1.Service {
|
||||
return &v1.Service{
|
||||
ObjectMeta: metav1.ObjectMeta{Namespace: namespace, Name: name},
|
||||
Spec: v1.ServiceSpec{
|
||||
Type: v1.ServiceTypeClusterIP,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newLocalAPIService(name string) *apiregistration.APIService {
|
||||
return &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
}
|
||||
}
|
||||
|
||||
func newRemoteAPIService(name string) *apiregistration.APIService {
|
||||
return &apiregistration.APIService{
|
||||
ObjectMeta: metav1.ObjectMeta{Name: name},
|
||||
Spec: apiregistration.APIServiceSpec{
|
||||
Service: &apiregistration.ServiceReference{
|
||||
Namespace: "foo",
|
||||
Name: "bar",
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func TestSync(t *testing.T) {
|
||||
tests := []struct {
|
||||
name string
|
||||
|
||||
apiServiceName string
|
||||
apiServices []*apiregistration.APIService
|
||||
services []*v1.Service
|
||||
endpoints []*v1.Endpoints
|
||||
expectedAvailability apiregistration.APIServiceCondition
|
||||
}{
|
||||
{
|
||||
name: "local",
|
||||
apiServiceName: "local.group",
|
||||
apiServices: []*apiregistration.APIService{newLocalAPIService("local.group")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
Reason: "Local",
|
||||
Message: "Local APIServices are always available",
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no service",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "not-bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "ServiceNotFound",
|
||||
Message: `service/bar in "foo" is not present`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "no endpoints",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "EndpointsNotFound",
|
||||
Message: `cannot find endpoints for service/bar in "foo"`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "missing endpoints",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
endpoints: []*v1.Endpoints{newEndpoints("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionFalse,
|
||||
Reason: "MissingEndpoints",
|
||||
Message: `endpoints for service/bar in "foo" have no addresses`,
|
||||
},
|
||||
},
|
||||
{
|
||||
name: "remote",
|
||||
apiServiceName: "remote.group",
|
||||
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
|
||||
services: []*v1.Service{newService("foo", "bar")},
|
||||
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar")},
|
||||
expectedAvailability: apiregistration.APIServiceCondition{
|
||||
Type: apiregistration.Available,
|
||||
Status: apiregistration.ConditionTrue,
|
||||
Reason: "Passed",
|
||||
Message: `all checks passed`,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
fakeClient := fake.NewSimpleClientset()
|
||||
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
endpointsIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
for _, obj := range tc.apiServices {
|
||||
apiServiceIndexer.Add(obj)
|
||||
}
|
||||
for _, obj := range tc.services {
|
||||
serviceIndexer.Add(obj)
|
||||
}
|
||||
for _, obj := range tc.endpoints {
|
||||
endpointsIndexer.Add(obj)
|
||||
}
|
||||
|
||||
c := AvailableConditionController{
|
||||
apiServiceClient: fakeClient.Apiregistration(),
|
||||
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
|
||||
serviceLister: v1listers.NewServiceLister(serviceIndexer),
|
||||
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
|
||||
}
|
||||
c.sync(tc.apiServiceName)
|
||||
|
||||
// ought to have one action writing status
|
||||
if e, a := 1, len(fakeClient.Actions()); e != a {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
|
||||
continue
|
||||
}
|
||||
|
||||
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
|
||||
if !ok {
|
||||
t.Errorf("%v got %v", tc.name, ok)
|
||||
continue
|
||||
}
|
||||
|
||||
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
|
||||
t.Errorf("%v expected %v, got %v", tc.name, e, action.GetObject())
|
||||
continue
|
||||
}
|
||||
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
|
||||
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Status, condition.Status; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
if e, a := tc.expectedAvailability.Message, condition.Message; e != a {
|
||||
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
|
||||
}
|
||||
}
|
||||
}
|
|
@ -13,8 +13,10 @@ go_library(
|
|||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library",
|
||||
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
|
||||
"//vendor/k8s.io/kube-aggregator/pkg/registry/apiservice:go_default_library",
|
||||
],
|
||||
|
|
|
@ -18,8 +18,10 @@ package etcd
|
|||
|
||||
import (
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
genericapirequest "k8s.io/apiserver/pkg/endpoints/request"
|
||||
"k8s.io/apiserver/pkg/registry/generic"
|
||||
genericregistry "k8s.io/apiserver/pkg/registry/generic/registry"
|
||||
"k8s.io/apiserver/pkg/registry/rest"
|
||||
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
|
||||
"k8s.io/kube-aggregator/pkg/registry/apiservice"
|
||||
)
|
||||
|
@ -49,3 +51,28 @@ func NewREST(scheme *runtime.Scheme, optsGetter generic.RESTOptionsGetter) *REST
|
|||
}
|
||||
return &REST{store}
|
||||
}
|
||||
|
||||
// NewStatusREST makes a RESTStorage for status that has more limited options.
|
||||
// It is based on the original REST so that we can share the same underlying store
|
||||
func NewStatusREST(scheme *runtime.Scheme, rest *REST) *StatusREST {
|
||||
statusStore := *rest.Store
|
||||
statusStore.CreateStrategy = nil
|
||||
statusStore.DeleteStrategy = nil
|
||||
statusStore.UpdateStrategy = apiservice.NewStatusStrategy(scheme)
|
||||
return &StatusREST{store: &statusStore}
|
||||
}
|
||||
|
||||
type StatusREST struct {
|
||||
store *genericregistry.Store
|
||||
}
|
||||
|
||||
var _ = rest.Updater(&StatusREST{})
|
||||
|
||||
func (r *StatusREST) New() runtime.Object {
|
||||
return &apiregistration.APIService{}
|
||||
}
|
||||
|
||||
// Update alters the status subset of an object.
|
||||
func (r *StatusREST) Update(ctx genericapirequest.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
|
||||
return r.store.Update(ctx, name, objInfo)
|
||||
}
|
||||
|
|
|
@ -46,7 +46,8 @@ func (apiServerStrategy) NamespaceScoped() bool {
|
|||
}
|
||||
|
||||
func (apiServerStrategy) PrepareForCreate(ctx genericapirequest.Context, obj runtime.Object) {
|
||||
_ = obj.(*apiregistration.APIService)
|
||||
apiservice := obj.(*apiregistration.APIService)
|
||||
apiservice.Status = apiregistration.APIServiceStatus{}
|
||||
}
|
||||
|
||||
func (apiServerStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) {
|
||||
|
@ -74,6 +75,44 @@ func (apiServerStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old
|
|||
return validation.ValidateAPIServiceUpdate(obj.(*apiregistration.APIService), old.(*apiregistration.APIService))
|
||||
}
|
||||
|
||||
type apiServerStatusStrategy struct {
|
||||
runtime.ObjectTyper
|
||||
names.NameGenerator
|
||||
}
|
||||
|
||||
func NewStatusStrategy(typer runtime.ObjectTyper) apiServerStatusStrategy {
|
||||
return apiServerStatusStrategy{typer, names.SimpleNameGenerator}
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) NamespaceScoped() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) {
|
||||
newAPIService := obj.(*apiregistration.APIService)
|
||||
oldAPIService := old.(*apiregistration.APIService)
|
||||
newAPIService.Spec = oldAPIService.Spec
|
||||
newAPIService.Labels = oldAPIService.Labels
|
||||
newAPIService.Annotations = oldAPIService.Annotations
|
||||
newAPIService.Finalizers = oldAPIService.Finalizers
|
||||
newAPIService.OwnerReferences = oldAPIService.OwnerReferences
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) AllowCreateOnUpdate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) AllowUnconditionalUpdate() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) Canonicalize(obj runtime.Object) {
|
||||
}
|
||||
|
||||
func (apiServerStatusStrategy) ValidateUpdate(ctx genericapirequest.Context, obj, old runtime.Object) field.ErrorList {
|
||||
return validation.ValidateAPIServiceStatusUpdate(obj.(*apiregistration.APIService), old.(*apiregistration.APIService))
|
||||
}
|
||||
|
||||
func GetAttrs(obj runtime.Object) (labels.Set, fields.Set, error) {
|
||||
apiserver, ok := obj.(*apiregistration.APIService)
|
||||
if !ok {
|
||||
|
|
Loading…
Reference in New Issue