aggregation availability should ensure that discovery responds non-failing

k3s-v1.15.3
David Eads 2019-03-07 12:55:41 -05:00
parent 3ec638d3cd
commit f9e162086f
4 changed files with 141 additions and 73 deletions

View File

@ -182,14 +182,19 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
s.GenericAPIServer.Handler.NonGoRestfulMux.UnlistedHandle("/apis/", apisHandler)
apiserviceRegistrationController := NewAPIServiceRegistrationController(informerFactory.Apiregistration().InternalVersion().APIServices(), s)
availableController := statuscontrollers.NewAvailableConditionController(
availableController, err := statuscontrollers.NewAvailableConditionController(
informerFactory.Apiregistration().InternalVersion().APIServices(),
c.GenericConfig.SharedInformerFactory.Core().V1().Services(),
c.GenericConfig.SharedInformerFactory.Core().V1().Endpoints(),
apiregistrationClient.Apiregistration(),
c.ExtraConfig.ProxyTransport,
c.ExtraConfig.ProxyClientCert,
c.ExtraConfig.ProxyClientKey,
s.serviceResolver,
)
if err != nil {
return nil, err
}
s.GenericAPIServer.AddPostStartHookOrDie("start-kube-aggregator-informers", func(context genericapiserver.PostStartHookContext) error {
informerFactory.Start(context.StopCh)

View File

@ -26,7 +26,9 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/rest:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/transport:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library",
"//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library",

View File

@ -17,15 +17,12 @@ limitations under the License.
package apiserver
import (
"crypto/tls"
"fmt"
"net/http"
"net/url"
"time"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
@ -36,9 +33,11 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
v1informers "k8s.io/client-go/informers/core/v1"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/transport"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/kube-aggregator/pkg/apis/apiregistration"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion"
informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion"
@ -81,8 +80,10 @@ func NewAvailableConditionController(
endpointsInformer v1informers.EndpointsInformer,
apiServiceClient apiregistrationclient.APIServicesGetter,
proxyTransport *http.Transport,
proxyClientCert []byte,
proxyClientKey []byte,
serviceResolver ServiceResolver,
) *AvailableConditionController {
) (*AvailableConditionController, error) {
c := &AvailableConditionController{
apiServiceClient: apiServiceClient,
apiServiceLister: apiServiceInformer.Lister(),
@ -100,19 +101,28 @@ func NewAvailableConditionController(
"AvailableConditionController"),
}
// if a particular transport was specified, use that otherwise build one
// construct an http client that will ignore TLS verification (if someone owns the network and messes with your status
// that's not so bad) and sets a very short timeout.
discoveryClient := &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
// that's not so bad) and sets a very short timeout. This is a best effort GET that provides no additional information
restConfig := &rest.Config{
TLSClientConfig: rest.TLSClientConfig{
Insecure: true,
CertData: proxyClientCert,
KeyData: proxyClientKey,
},
}
if proxyTransport != nil && proxyTransport.DialContext != nil {
restConfig.Dial = proxyTransport.DialContext
}
transport, err := rest.TransportFor(restConfig)
if err != nil {
return nil, err
}
c.discoveryClient = &http.Client{
Transport: transport,
// the request should happen quickly.
Timeout: 5 * time.Second,
}
if proxyTransport != nil {
discoveryClient.Transport = proxyTransport
}
c.discoveryClient = discoveryClient
// resync on this one because it is low cardinality and rechecking the actual discovery
// allows us to detect health in a more timely fashion when network connectivity to
@ -140,7 +150,7 @@ func NewAvailableConditionController(
c.syncFn = c.sync
return c
return c, nil
}
func (c *AvailableConditionController) sync(key string) error {
@ -254,17 +264,31 @@ func (c *AvailableConditionController) sync(key string) error {
errCh := make(chan error)
go func() {
resp, err := c.discoveryClient.Get(discoveryURL.String())
newReq, err := http.NewRequest("GET", discoveryURL.String(), nil)
if err != nil {
errCh <- err
return
}
// setting the system-masters identity ensures that we will always have access rights
transport.SetAuthProxyHeaders(newReq, "system:kube-aggregator", []string{"system:masters"}, nil)
resp, err := c.discoveryClient.Do(newReq)
if resp != nil {
resp.Body.Close()
// we should always been in the 200s or 300s
if resp.StatusCode < http.StatusOK || resp.StatusCode >= http.StatusMultipleChoices {
errCh <- fmt.Errorf("bad status from %v: %v", discoveryURL, resp.StatusCode)
return
}
}
errCh <- err
}()
select {
case err = <-errCh:
if err != nil {
results <- fmt.Errorf("no response from %v: %v", discoveryURL, err)
results <- fmt.Errorf("failing or missing response from %v: %v", discoveryURL, err)
return
}

View File

@ -18,11 +18,15 @@ package apiserver
import (
"fmt"
"net/http"
"net/http/httptest"
"net/url"
"strings"
"testing"
"github.com/davecgh/go-spew/spew"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1listers "k8s.io/client-go/listers/core/v1"
clienttesting "k8s.io/client-go/testing"
@ -103,6 +107,8 @@ func TestSync(t *testing.T) {
apiServices []*apiregistration.APIService
services []*v1.Service
endpoints []*v1.Endpoints
forceDiscoveryFail bool
expectedAvailability apiregistration.APIServiceCondition
}{
{
@ -200,9 +206,24 @@ func TestSync(t *testing.T) {
Message: `all checks passed`,
},
},
{
name: "remote-bad-return",
apiServiceName: "remote.group",
apiServices: []*apiregistration.APIService{newRemoteAPIService("remote.group")},
services: []*v1.Service{newService("foo", "bar", testServicePort, testServicePortName)},
endpoints: []*v1.Endpoints{newEndpointsWithAddress("foo", "bar", testServicePort, testServicePortName)},
forceDiscoveryFail: true,
expectedAvailability: apiregistration.APIServiceCondition{
Type: apiregistration.Available,
Status: apiregistration.ConditionFalse,
Reason: "FailedDiscoveryCheck",
Message: `failing or missing response from`,
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
fakeClient := fake.NewSimpleClientset()
apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
serviceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
@ -217,29 +238,36 @@ func TestSync(t *testing.T) {
endpointsIndexer.Add(obj)
}
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if !tc.forceDiscoveryFail {
w.WriteHeader(http.StatusOK)
}
w.WriteHeader(http.StatusForbidden)
}))
defer testServer.Close()
c := AvailableConditionController{
apiServiceClient: fakeClient.Apiregistration(),
apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer),
serviceLister: v1listers.NewServiceLister(serviceIndexer),
endpointsLister: v1listers.NewEndpointsLister(endpointsIndexer),
discoveryClient: testServer.Client(),
serviceResolver: &fakeServiceResolver{url: testServer.URL},
}
c.sync(tc.apiServiceName)
// ought to have one action writing status
if e, a := 1, len(fakeClient.Actions()); e != a {
t.Errorf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
continue
t.Fatalf("%v expected %v, got %v", tc.name, e, fakeClient.Actions())
}
action, ok := fakeClient.Actions()[0].(clienttesting.UpdateAction)
if !ok {
t.Errorf("%v got %v", tc.name, ok)
continue
t.Fatalf("%v got %v", tc.name, ok)
}
if e, a := 1, len(action.GetObject().(*apiregistration.APIService).Status.Conditions); e != a {
t.Errorf("%v expected %v, got %v", tc.name, e, action.GetObject())
continue
t.Fatalf("%v expected %v, got %v", tc.name, e, action.GetObject())
}
condition := action.GetObject().(*apiregistration.APIService).Status.Conditions[0]
if e, a := tc.expectedAvailability.Type, condition.Type; e != a {
@ -251,15 +279,24 @@ func TestSync(t *testing.T) {
if e, a := tc.expectedAvailability.Reason, condition.Reason; e != a {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if e, a := tc.expectedAvailability.Message, condition.Message; e != a {
if e, a := tc.expectedAvailability.Message, condition.Message; !strings.HasPrefix(a, e) {
t.Errorf("%v expected %v, got %#v", tc.name, e, condition)
}
if condition.LastTransitionTime.IsZero() {
t.Error("expected lastTransitionTime to be non-zero")
}
})
}
}
type fakeServiceResolver struct {
url string
}
func (f *fakeServiceResolver) ResolveEndpoint(namespace, name string, port int32) (*url.URL, error) {
return url.Parse(f.url)
}
func TestUpdateAPIServiceStatus(t *testing.T) {
foo := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "foo"}}}}
bar := &apiregistration.APIService{Status: apiregistration.APIServiceStatus{Conditions: []apiregistration.APIServiceCondition{{Type: "bar"}}}}