diff --git a/pkg/proxy/healthcheck/BUILD b/pkg/proxy/healthcheck/BUILD index 5eea7679c2..c7503ac898 100644 --- a/pkg/proxy/healthcheck/BUILD +++ b/pkg/proxy/healthcheck/BUILD @@ -11,20 +11,17 @@ load( go_library( name = "go_default_library", srcs = [ - "api.go", "doc.go", "healthcheck.go", - "http.go", - "listener.go", - "worker.go", ], tags = ["automanaged"], deps = [ "//vendor:github.com/golang/glog", + "//vendor:github.com/renstrom/dedent", "//vendor:k8s.io/apimachinery/pkg/types", - "//vendor:k8s.io/apimachinery/pkg/util/sets", - "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/pkg/api", + "//vendor:k8s.io/client-go/pkg/api/v1", + "//vendor:k8s.io/client-go/tools/record", ], ) @@ -34,6 +31,7 @@ go_test( library = ":go_default_library", tags = ["automanaged"], deps = [ + "//vendor:github.com/davecgh/go-spew/spew", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/sets", ], diff --git a/pkg/proxy/healthcheck/api.go b/pkg/proxy/healthcheck/api.go deleted file mode 100644 index 91aa3bd725..0000000000 --- a/pkg/proxy/healthcheck/api.go +++ /dev/null @@ -1,65 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/apimachinery/pkg/util/wait" -) - -// All public API Methods for this package - -// UpdateEndpoints Update the set of local endpoints for a service -func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) { - req := &proxyMutationRequest{ - serviceName: serviceName, - endpointUids: &endpointUids, - } - healthchecker.mutationRequestChannel <- req -} - -func updateServiceListener(serviceName types.NamespacedName, listenPort int, add bool) bool { - responseChannel := make(chan bool) - req := &proxyListenerRequest{ - serviceName: serviceName, - listenPort: uint16(listenPort), - add: add, - responseChannel: responseChannel, - } - healthchecker.listenerRequestChannel <- req - return <-responseChannel -} - -// AddServiceListener Request addition of a listener for a service's health check -func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool { - return updateServiceListener(serviceName, listenPort, true) -} - -// DeleteServiceListener Request deletion of a listener for a service's health check -func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool { - return updateServiceListener(serviceName, listenPort, false) -} - -// Run Start the healthchecker main loop -func Run() { - healthchecker = proxyHealthCheckFactory() - // Wrap with a wait.Forever to handle panics. - go wait.Forever(func() { - healthchecker.handlerLoop() - }, 0) -} diff --git a/pkg/proxy/healthcheck/doc.go b/pkg/proxy/healthcheck/doc.go index 56ecc11e9a..0a9ea0944b 100644 --- a/pkg/proxy/healthcheck/doc.go +++ b/pkg/proxy/healthcheck/doc.go @@ -14,5 +14,5 @@ See the License for the specific language governing permissions and limitations under the License. */ -// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies +// Package healthcheck provides tools for serving kube-proxy healthchecks. package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go index e9dfe86e75..faaf001fad 100644 --- a/pkg/proxy/healthcheck/healthcheck.go +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -20,108 +20,210 @@ import ( "fmt" "net" "net/http" + "strings" + "sync" "github.com/golang/glog" + "github.com/renstrom/dedent" + "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/sets" - "k8s.io/client-go/tools/cache" + "k8s.io/client-go/pkg/api" + clientv1 "k8s.io/client-go/pkg/api/v1" + "k8s.io/client-go/tools/record" ) -// proxyMutationRequest: Message to request addition/deletion of endpoints for a service -type proxyMutationRequest struct { - serviceName types.NamespacedName - endpointUids *sets.String +// Server serves HTTP endpoints for each service name, with results +// based on the endpoints. If there are 0 endpoints for a service, it returns a +// 503 "Service Unavailable" error (telling LBs not to use this node). If there +// are 1 or more endpoints, it returns a 200 "OK". +type Server interface { + // Make the new set of services be active. Services that were open before + // will be closed. Services that are new will be opened. Service that + // existed and are in the new set will be left alone. The value of the map + // is the healthcheck-port to listen on. + SyncServices(newServices map[types.NamespacedName]uint16) error + // Make the new set of endpoints be active. Endpoints for services that do + // not exist will be dropped. The value of the map is the number of + // endpoints the service has on this node. + SyncEndpoints(newEndpoints map[types.NamespacedName]int) error } -// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port -type proxyListenerRequest struct { - serviceName types.NamespacedName - listenPort uint16 - add bool - responseChannel chan bool +// Listener allows for testing of Server. If the Listener argument +// to NewServer() is nil, the real net.Listen function will be used. +type Listener interface { + // Listen is very much like net.Listen, except the first arg (network) is + // fixed to be "tcp". + Listen(addr string) (net.Listener, error) } -// serviceEndpointsList: A list of endpoints for a service -type serviceEndpointsList struct { - serviceName types.NamespacedName - endpoints *sets.String +// HTTPServerFactory allows for testing of Server. If the +// HTTPServerFactory argument to NewServer() is nil, the real +// http.Server type will be used. +type HTTPServerFactory interface { + // New creates an instance of a type satisfying HTTPServer. This is + // designed to include http.Server. + New(addr string, handler http.Handler) HTTPServer } -// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort -type serviceResponder struct { - serviceName types.NamespacedName - listenPort uint16 - listener *net.Listener - server *http.Server +// HTTPServer allows for testing of Server. +type HTTPServer interface { + // Server is designed so that http.Server satifies this interface, + Serve(listener net.Listener) error } -// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests -type proxyHC struct { - serviceEndpointsMap cache.ThreadSafeStore - serviceResponderMap map[types.NamespacedName]serviceResponder - mutationRequestChannel chan *proxyMutationRequest - listenerRequestChannel chan *proxyListenerRequest -} - -// handleHealthCheckRequest - received a health check request - lookup and respond to HC. -func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) { - s, ok := h.serviceEndpointsMap.Get(serviceName) - if !ok { - glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName) - sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Found") - return +// NewServer allocates a new healthcheck server manager. If either +// of the injected arguments are nil, defaults will be used. +func NewServer(hostname string, recorder record.EventRecorder, listener Listener, httpServerFactory HTTPServerFactory) Server { + if listener == nil { + listener = stdNetListener{} } - numEndpoints := len(*s.(*serviceEndpointsList).endpoints) - if numEndpoints > 0 { - sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints)) - return + if httpServerFactory == nil { + httpServerFactory = stdHTTPServerFactory{} + } + return &server{ + hostname: hostname, + recorder: recorder, + listener: listener, + httpFactory: httpServerFactory, + services: map[types.NamespacedName]*hcInstance{}, } - sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive") } -// handleMutationRequest - receive requests to mutate the table entry for a service -func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) { - numEndpoints := len(*req.endpointUids) - glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v", - req.serviceName, numEndpoints, (*req.endpointUids).List()) - if numEndpoints == 0 { - if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok { - glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String()) - h.serviceEndpointsMap.Delete(req.serviceName.String()) - } - return +// Implement Listener in terms of net.Listen. +type stdNetListener struct{} + +func (stdNetListener) Listen(addr string) (net.Listener, error) { + return net.Listen("tcp", addr) +} + +var _ Listener = stdNetListener{} + +// Implement HTTPServerFactory in terms of http.Server. +type stdHTTPServerFactory struct{} + +func (stdHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { + return &http.Server{ + Addr: addr, + Handler: handler, } - var entry *serviceEndpointsList - e, exists := h.serviceEndpointsMap.Get(req.serviceName.String()) - if exists { - entry = e.(*serviceEndpointsList) - if entry.endpoints.Equal(*req.endpointUids) { - return - } - // Compute differences just for printing logs about additions and removals - deletedEndpoints := entry.endpoints.Difference(*req.endpointUids) - newEndpoints := req.endpointUids.Difference(*entry.endpoints) - for _, e := range newEndpoints.List() { - glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s", - e, req.serviceName.String()) - } - for _, d := range deletedEndpoints.List() { - glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)", - d, req.serviceName.String(), len(*entry.endpoints)) +} + +var _ HTTPServerFactory = stdHTTPServerFactory{} + +type server struct { + hostname string + recorder record.EventRecorder // can be nil + listener Listener + httpFactory HTTPServerFactory + + lock sync.Mutex + services map[types.NamespacedName]*hcInstance +} + +func (hcs *server) SyncServices(newServices map[types.NamespacedName]uint16) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + // Remove any that are not needed any more. + for nsn, svc := range hcs.services { + if port, found := newServices[nsn]; !found || port != svc.port { + glog.V(2).Infof("Closing healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.listener.Close(); err != nil { + glog.Errorf("Close(%v): %v", svc.listener.Addr(), err) + } + delete(hcs.services, nsn) } } - entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids} - h.serviceEndpointsMap.Add(req.serviceName.String(), entry) + + // Add any that are needed. + for nsn, port := range newServices { + if hcs.services[nsn] != nil { + glog.V(3).Infof("Existing healthcheck %q on port %d", nsn.String(), port) + continue + } + + glog.V(2).Infof("Opening healthcheck %q on port %d", nsn.String(), port) + svc := &hcInstance{port: port} + addr := fmt.Sprintf(":%d", port) + svc.server = hcs.httpFactory.New(addr, hcHandler{name: nsn, hcs: hcs}) + var err error + svc.listener, err = hcs.listener.Listen(addr) + if err != nil { + msg := fmt.Sprintf("node %s failed to start healthcheck %q on port %d: %v", hcs.hostname, nsn.String(), port, err) + + if hcs.recorder != nil { + hcs.recorder.Eventf( + &clientv1.ObjectReference{ + Kind: "Service", + Namespace: nsn.Namespace, + Name: nsn.Name, + UID: types.UID(nsn.String()), + }, api.EventTypeWarning, "FailedToStartHealthcheck", msg) + } + glog.Error(msg) + continue + } + hcs.services[nsn] = svc + + go func(nsn types.NamespacedName, svc *hcInstance) { + // Serve() will exit when the listener is closed. + glog.V(3).Infof("Starting goroutine for healthcheck %q on port %d", nsn.String(), svc.port) + if err := svc.server.Serve(svc.listener); err != nil { + glog.V(3).Infof("Healthcheck %q closed: %v", nsn.String(), err) + return + } + glog.V(3).Infof("Healthcheck %q closed", nsn.String()) + }(nsn, svc) + } + return nil } -// proxyHealthCheckRequest - Factory method to instantiate the health check handler -func proxyHealthCheckFactory() *proxyHC { - glog.V(2).Infof("Initializing kube-proxy health checker") - phc := &proxyHC{ - serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), - serviceResponderMap: make(map[types.NamespacedName]serviceResponder), - mutationRequestChannel: make(chan *proxyMutationRequest, 1024), - listenerRequestChannel: make(chan *proxyListenerRequest, 1024), - } - return phc +type hcInstance struct { + port uint16 + listener net.Listener + server HTTPServer + endpoints int // number of local endpoints for a service +} + +type hcHandler struct { + name types.NamespacedName + hcs *server +} + +var _ http.Handler = hcHandler{} + +func (h hcHandler) ServeHTTP(resp http.ResponseWriter, req *http.Request) { + h.hcs.lock.Lock() + count := h.hcs.services[h.name].endpoints + h.hcs.lock.Unlock() + resp.Header().Set("Content-Type", "application/json") + if count == 0 { + resp.WriteHeader(http.StatusServiceUnavailable) + } else { + resp.WriteHeader(http.StatusOK) + } + fmt.Fprintf(resp, strings.Trim(dedent.Dedent(fmt.Sprintf(` + { + "service": { + "namespace": %q, + "name": %q + }, + "localEndpoints": %d + } + `, h.name.Namespace, h.name.Name, count)), "\n")) +} + +func (hcs *server) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + hcs.lock.Lock() + defer hcs.lock.Unlock() + + for nsn, count := range newEndpoints { + if hcs.services[nsn] == nil { + glog.V(3).Infof("Not saving endpoints for unknown healthcheck %q", nsn.String()) + continue + } + glog.V(3).Infof("Reporting %d endpoints for healthcheck %q", count, nsn.String()) + hcs.services[nsn].endpoints = count + } + return nil } diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index 7261558979..208ffb18ad 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -17,142 +17,310 @@ limitations under the License. package healthcheck import ( - "fmt" - "io/ioutil" - "math/rand" + "encoding/json" + "net" "net/http" + "net/http/httptest" "testing" - "time" + + "github.com/davecgh/go-spew/spew" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/sets" ) -type TestCaseData struct { - nodePorts int - numEndpoints int - nodePortList []int - svcNames []types.NamespacedName +type fakeListener struct { + openPorts sets.String } -const ( - startPort = 20000 - endPort = 40000 -) - -var ( - choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -) - -func generateRandomString(n int) string { - - b := make([]byte, n) - l := len(choices) - for i := range b { - b[i] = choices[rand.Intn(l)] - } - return string(b) -} - -func chooseServiceName(tc int, hint int) types.NamespacedName { - var svc types.NamespacedName - svc.Namespace = fmt.Sprintf("ns_%d", tc) - svc.Name = fmt.Sprintf("name_%d", hint) - return svc -} - -func generateEndpointSet(max int) sets.String { - s := sets.NewString() - for i := 0; i < max; i++ { - s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8))) - } - return s -} - -func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool { - var success = true - time.Sleep(100 * time.Millisecond) - for i := 0; i < tc.nodePorts; i++ { - t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i]) - res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i])) - if err != nil { - t.Logf("ERROR: Failed to connect to listening port") - success = false - continue - } - robots, err := ioutil.ReadAll(res.Body) - if res.StatusCode == http.StatusServiceUnavailable { - t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots)) - success = false - continue - } - res.Body.Close() - if err != nil { - t.Logf("Error: reading body of response (%s)", err) - success = false - continue - } - } - if success { - t.Logf("Success: All nodePorts found active") - } - return success -} - -func TestHealthChecker(t *testing.T) { - testcases := []TestCaseData{ - { - nodePorts: 1, - numEndpoints: 2, - }, - { - nodePorts: 10, - numEndpoints: 6, - }, - { - nodePorts: 100, - numEndpoints: 1, - }, - } - - Run() - - ports := startPort - for n, tc := range testcases { - tc.nodePortList = make([]int, tc.nodePorts) - tc.svcNames = make([]types.NamespacedName, tc.nodePorts) - for i := 0; i < tc.nodePorts; i++ { - tc.svcNames[i] = chooseServiceName(n, i) - t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints) - for { - UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints)) - tc.nodePortList[i] = ports - ports++ - if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) { - break - } - DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) - // Keep searching for a port that works - t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) - if ports > endPort { - t.Errorf("Exhausted range of ports available for tests") - return - } - } - } - t.Logf("Validating if all nodePorts for tc %d work", n) - if !verifyHealthChecks(&tc, t) { - t.Errorf("Healthcheck validation failed") - } - - for i := 0; i < tc.nodePorts; i++ { - DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) - UpdateEndpoints(tc.svcNames[i], sets.NewString()) - } - - // Ensure that all listeners have been shutdown - if verifyHealthChecks(&tc, t) { - t.Errorf("Healthcheck validation failed") - } +func newFakeListener() *fakeListener { + return &fakeListener{ + openPorts: sets.String{}, + } +} + +func (fake *fakeListener) hasPort(addr string) bool { + return fake.openPorts.Has(addr) +} + +func (fake *fakeListener) Listen(addr string) (net.Listener, error) { + fake.openPorts.Insert(addr) + return &fakeNetListener{ + parent: fake, + addr: addr, + }, nil +} + +type fakeNetListener struct { + parent *fakeListener + addr string +} + +func (fake *fakeNetListener) Accept() (net.Conn, error) { + // Not implemented + return nil, nil +} + +func (fake *fakeNetListener) Close() error { + fake.parent.openPorts.Delete(fake.addr) + return nil +} + +func (fake *fakeNetListener) Addr() net.Addr { + // Not implemented + return nil +} + +type fakeHTTPServerFactory struct{} + +func newFakeHTTPServerFactory() *fakeHTTPServerFactory { + return &fakeHTTPServerFactory{} +} + +func (fake *fakeHTTPServerFactory) New(addr string, handler http.Handler) HTTPServer { + return &fakeHTTPServer{ + addr: addr, + handler: handler, + } +} + +type fakeHTTPServer struct { + addr string + handler http.Handler +} + +func (fake *fakeHTTPServer) Serve(listener net.Listener) error { + return nil // Cause the goroutine to return +} + +func mknsn(ns, name string) types.NamespacedName { + return types.NamespacedName{ + Namespace: ns, + Name: name, + } +} + +type hcPayload struct { + Service struct { + Namespace string + Name string + } + LocalEndpoints int +} + +func TestServer(t *testing.T) { + listener := newFakeListener() + httpFactory := newFakeHTTPServerFactory() + + hcsi := NewServer("hostname", nil, listener, httpFactory) + hcs := hcsi.(*server) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync nothing + hcs.SyncServices(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + hcs.SyncEndpoints(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync unknown endpoints, should be dropped + hcs.SyncEndpoints(map[types.NamespacedName]int{mknsn("a", "b"): 93}) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync a real service + nsn := mknsn("a", "b") + hcs.SyncServices(map[types.NamespacedName]uint16{nsn: 9376}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + if len(listener.openPorts) != 1 { + t.Errorf("expected 1 open port, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts)) + } + if !listener.hasPort(":9376") { + t.Errorf("expected port :9376 to be open\n%s", spew.Sdump(listener.openPorts)) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // sync an endpoint + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 18 { + t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusOK, 18, t) + + // sync zero endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 0}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // sync nil endpoints + hcs.SyncEndpoints(nil) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn].endpoints) + } + // test the handler + testHandler(hcs, nsn, http.StatusServiceUnavailable, 0, t) + + // put the endpoint back + hcs.SyncEndpoints(map[types.NamespacedName]int{nsn: 18}) + if len(hcs.services) != 1 { + t.Errorf("expected 1 service, got %d", len(hcs.services)) + } + if hcs.services[nsn].endpoints != 18 { + t.Errorf("expected 18 endpoints, got %d", hcs.services[nsn].endpoints) + } + // delete the service + hcs.SyncServices(nil) + if len(hcs.services) != 0 { + t.Errorf("expected 0 services, got %d", len(hcs.services)) + } + + // sync multiple services + nsn1 := mknsn("a", "b") + nsn2 := mknsn("c", "d") + nsn3 := mknsn("e", "f") + nsn4 := mknsn("g", "h") + hcs.SyncServices(map[types.NamespacedName]uint16{ + nsn1: 9376, + nsn2: 12909, + nsn3: 11113, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 service, got %d", len(hcs.services)) + } + if hcs.services[nsn1].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn1].endpoints) + } + if hcs.services[nsn2].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if len(listener.openPorts) != 3 { + t.Errorf("expected 3 open ports, got %d\n%s", len(listener.openPorts), spew.Sdump(listener.openPorts)) + } + // test the handlers + testHandler(hcs, nsn1, http.StatusServiceUnavailable, 0, t) + testHandler(hcs, nsn2, http.StatusServiceUnavailable, 0, t) + + // sync endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{ + nsn1: 9, + nsn2: 3, + nsn3: 7, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 services, got %d", len(hcs.services)) + } + if hcs.services[nsn1].endpoints != 9 { + t.Errorf("expected 9 endpoints, got %d", hcs.services[nsn1].endpoints) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 7 { + t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints) + } + // test the handlers + testHandler(hcs, nsn1, http.StatusOK, 9, t) + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusOK, 7, t) + + // sync new services + hcs.SyncServices(map[types.NamespacedName]uint16{ + //nsn1: 9376, // remove it + nsn2: 12909, // leave it + nsn3: 11114, // change it + nsn4: 11878, // add it + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 service, got %d", len(hcs.services)) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if hcs.services[nsn4].endpoints != 0 { + t.Errorf("expected 0 endpoints, got %d", hcs.services[nsn4].endpoints) + } + // test the handlers + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusServiceUnavailable, 0, t) + testHandler(hcs, nsn4, http.StatusServiceUnavailable, 0, t) + + // sync endpoints + hcs.SyncEndpoints(map[types.NamespacedName]int{ + nsn1: 9, + nsn2: 3, + nsn3: 7, + nsn4: 6, + }) + if len(hcs.services) != 3 { + t.Errorf("expected 3 services, got %d", len(hcs.services)) + } + if hcs.services[nsn2].endpoints != 3 { + t.Errorf("expected 3 endpoints, got %d", hcs.services[nsn2].endpoints) + } + if hcs.services[nsn3].endpoints != 7 { + t.Errorf("expected 7 endpoints, got %d", hcs.services[nsn3].endpoints) + } + if hcs.services[nsn4].endpoints != 6 { + t.Errorf("expected 6 endpoints, got %d", hcs.services[nsn4].endpoints) + } + // test the handlers + testHandler(hcs, nsn2, http.StatusOK, 3, t) + testHandler(hcs, nsn3, http.StatusOK, 7, t) + testHandler(hcs, nsn4, http.StatusOK, 6, t) +} + +func testHandler(hcs *server, nsn types.NamespacedName, status int, endpoints int, t *testing.T) { + handler := hcs.services[nsn].server.(*fakeHTTPServer).handler + req, err := http.NewRequest("GET", "/healthz", nil) + if err != nil { + t.Fatal(err) + } + resp := httptest.NewRecorder() + + handler.ServeHTTP(resp, req) + + if resp.Code != status { + t.Errorf("expected status code %v, got %v", status, resp.Code) + } + var payload hcPayload + if err := json.Unmarshal(resp.Body.Bytes(), &payload); err != nil { + t.Fatal(err) + } + if payload.Service.Name != nsn.Name || payload.Service.Namespace != nsn.Namespace { + t.Errorf("expected payload name %q, got %v", nsn.String(), payload.Service) + } + if payload.LocalEndpoints != endpoints { + t.Errorf("expected %d endpoints, got %d", endpoints, payload.LocalEndpoints) } } diff --git a/pkg/proxy/healthcheck/http.go b/pkg/proxy/healthcheck/http.go deleted file mode 100644 index dd3dcf3a75..0000000000 --- a/pkg/proxy/healthcheck/http.go +++ /dev/null @@ -1,46 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -import ( - "fmt" - "net/http" - - "github.com/golang/glog" -) - -// A healthCheckHandler serves http requests on /healthz on the service health check node port, -// and responds to every request with either: -// 200 OK and the count of endpoints for the given service that are local to this node. -// or -// 503 Service Unavailable If the count is zero or the service does not exist -type healthCheckHandler struct { - svcNsName string -} - -// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object -func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) { - rw.Header().Set("Content-Type", "text/plain") - rw.WriteHeader(statusCode) - fmt.Fprint(rw, error) -} - -// ServeHTTP: Interface callback method for net.Listener Handlers -func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) { - glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName) - healthchecker.handleHealthCheckRequest(response, h.svcNsName) -} diff --git a/pkg/proxy/healthcheck/listener.go b/pkg/proxy/healthcheck/listener.go deleted file mode 100644 index d61e741cc7..0000000000 --- a/pkg/proxy/healthcheck/listener.go +++ /dev/null @@ -1,77 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package healthcheck - -// Create/Delete dynamic listeners on the required nodePorts - -import ( - "fmt" - "net" - "net/http" - - "github.com/golang/glog" -) - -// handleServiceListenerRequest: receive requests to add/remove service health check listening ports -func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool { - sr, serviceFound := h.serviceResponderMap[req.serviceName] - if !req.add { - if !serviceFound { - return false - } - glog.Infof("Deleting HealthCheckListenPort for service %s port %d", - req.serviceName, req.listenPort) - delete(h.serviceResponderMap, req.serviceName) - (*sr.listener).Close() - return true - } else if serviceFound { - if req.listenPort == sr.listenPort { - // Addition requested but responder for service already exists and port is unchanged - return true - } - // Addition requested but responder for service already exists but the listen port has changed - glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port", - req.serviceName, sr.listenPort, req.listenPort) - delete(h.serviceResponderMap, req.serviceName) - (*sr.listener).Close() - } - // Create a service responder object and start listening and serving on the provided port - glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort) - server := http.Server{ - Addr: fmt.Sprintf(":%d", req.listenPort), - Handler: healthCheckHandler{svcNsName: req.serviceName.String()}, - } - listener, err := net.Listen("tcp", server.Addr) - if err != nil { - glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err) - return false - } - h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName, - listenPort: req.listenPort, - listener: &listener, - server: &server} - go func() { - // Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed - glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort) - if err := server.Serve(listener); err != nil { - glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err) - return - } - glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName) - }() - return true -} diff --git a/pkg/proxy/healthcheck/worker.go b/pkg/proxy/healthcheck/worker.go deleted file mode 100644 index 1c1d60a09e..0000000000 --- a/pkg/proxy/healthcheck/worker.go +++ /dev/null @@ -1,53 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies -package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" - -import ( - "time" - - "github.com/golang/glog" -) - -var healthchecker *proxyHC - -// handlerLoop Serializes all requests to prevent concurrent access to the maps -func (h *proxyHC) handlerLoop() { - ticker := time.NewTicker(1 * time.Minute) - defer ticker.Stop() - for { - select { - case req := <-h.mutationRequestChannel: - h.handleMutationRequest(req) - case req := <-h.listenerRequestChannel: - req.responseChannel <- h.handleServiceListenerRequest(req) - case <-ticker.C: - go h.sync() - } - } -} - -func (h *proxyHC) sync() { - glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) - glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) - for _, svc := range h.serviceEndpointsMap.ListKeys() { - if e, ok := h.serviceEndpointsMap.Get(svc); ok { - endpointList := e.(*serviceEndpointsList) - glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) - } - } -} diff --git a/pkg/proxy/iptables/BUILD b/pkg/proxy/iptables/BUILD index 4b64995719..e31f28b86b 100644 --- a/pkg/proxy/iptables/BUILD +++ b/pkg/proxy/iptables/BUILD @@ -50,7 +50,6 @@ go_test( "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/types", "//vendor:k8s.io/apimachinery/pkg/util/intstr", - "//vendor:k8s.io/apimachinery/pkg/util/sets", ], ) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index c22d5369de..5a9ba66c91 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -219,7 +219,7 @@ type Proxier struct { nodeIP net.IP portMapper portOpener recorder record.EventRecorder - healthChecker healthChecker + healthChecker healthcheck.Server } type localPort struct { @@ -251,17 +251,6 @@ func (l *listenPortOpener) OpenLocalPort(lp *localPort) (closeable, error) { return openLocalPort(lp) } -type healthChecker interface { - UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) -} - -// TODO: the healthcheck pkg should offer a type -type globalHealthChecker struct{} - -func (globalHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { - healthcheck.UpdateEndpoints(serviceName, endpointUIDs) -} - // Proxier implements ProxyProvider var _ proxy.ProxyProvider = &Proxier{} @@ -315,8 +304,7 @@ func NewProxier(ipt utiliptables.Interface, glog.Warningf("clusterCIDR not specified, unable to distinguish between internal and external traffic") } - healthChecker := globalHealthChecker{} - go healthcheck.Run() + healthChecker := healthcheck.NewServer(hostname, recorder, nil, nil) // use default implementations of deps var throttle flowcontrol.RateLimiter // Defaulting back to not limit sync rate when minSyncPeriod is 0. @@ -450,18 +438,12 @@ func (proxier *Proxier) SyncLoop() { } } -type healthCheckPort struct { - namespace types.NamespacedName - nodeport int -} - // Accepts a list of Services and the existing service map. Returns the new -// service map, a list of healthcheck ports to add to or remove from the health -// checking listener service, and a set of stale UDP services. -func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, []healthCheckPort, []healthCheckPort, sets.String) { +// service map, a map of healthcheck ports, and a set of stale UDP +// services. +func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMap) (proxyServiceMap, map[types.NamespacedName]uint16, sets.String) { newServiceMap := make(proxyServiceMap) - healthCheckAdd := make([]healthCheckPort, 0) - healthCheckDel := make([]healthCheckPort, 0) + hcPorts := make(map[types.NamespacedName]uint16) for _, service := range allServices { svcName := types.NamespacedName{ @@ -497,12 +479,8 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa glog.V(1).Infof("Updating existing service %q at %s:%d/%s", serviceName, info.clusterIP, servicePort.Port, servicePort.Protocol) } - if !exists || !equal { - if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { - healthCheckAdd = append(healthCheckAdd, healthCheckPort{serviceName.NamespacedName, info.healthCheckNodePort}) - } else { - healthCheckDel = append(healthCheckDel, healthCheckPort{serviceName.NamespacedName, 0}) - } + if info.onlyNodeLocalEndpoints { + hcPorts[svcName] = uint16(info.healthCheckNodePort) } newServiceMap[serviceName] = info @@ -510,6 +488,13 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa } } + for nsn, port := range hcPorts { + if port == 0 { + glog.Errorf("Service %q has no healthcheck nodeport", nsn) + delete(hcPorts, nsn) + } + } + staleUDPServices := sets.NewString() // Remove serviceports missing from the update. for name, info := range oldServiceMap { @@ -518,13 +503,10 @@ func buildNewServiceMap(allServices []*api.Service, oldServiceMap proxyServiceMa if info.protocol == api.ProtocolUDP { staleUDPServices.Insert(info.clusterIP.String()) } - if info.onlyNodeLocalEndpoints && info.healthCheckNodePort > 0 { - healthCheckDel = append(healthCheckDel, healthCheckPort{name.NamespacedName, info.healthCheckNodePort}) - } } } - return newServiceMap, healthCheckAdd, healthCheckDel, staleUDPServices + return newServiceMap, hcPorts, staleUDPServices } // OnServiceUpdate tracks the active set of service proxies. @@ -537,19 +519,11 @@ func (proxier *Proxier) OnServiceUpdate(allServices []*api.Service) { } proxier.allServices = allServices - newServiceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap) - for _, hc := range hcAdd { - glog.V(4).Infof("Adding health check for %+v, port %v", hc.namespace, hc.nodeport) - // Turn on healthcheck responder to listen on the health check nodePort - // FIXME: handle failures from adding the service - healthcheck.AddServiceListener(hc.namespace, hc.nodeport) - } - for _, hc := range hcDel { - // Remove ServiceListener health check nodePorts from the health checker - // TODO - Stats - glog.V(4).Infof("Deleting health check for %+v, port %v", hc.namespace, hc.nodeport) - // FIXME: handle failures from deleting the service - healthcheck.DeleteServiceListener(hc.namespace, hc.nodeport) + newServiceMap, hcPorts, staleUDPServices := buildNewServiceMap(allServices, proxier.serviceMap) + + // update healthcheck ports + if err := proxier.healthChecker.SyncServices(hcPorts); err != nil { + glog.Errorf("Error syncing healtcheck ports: %v", err) } if len(newServiceMap) != len(proxier.serviceMap) || !reflect.DeepEqual(newServiceMap, proxier.serviceMap) { @@ -585,7 +559,7 @@ func (proxier *Proxier) OnEndpointsUpdate(allEndpoints []*api.Endpoints) { // Convert a slice of api.Endpoints objects into a map of service-port -> endpoints. func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap, hostname string, - healthChecker healthChecker) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { + healthChecker healthcheck.Server) (newMap proxyEndpointMap, staleSet map[endpointServicePair]bool) { // return values newMap = make(proxyEndpointMap) @@ -618,41 +592,28 @@ func buildNewEndpointsMap(allEndpoints []*api.Endpoints, curMap proxyEndpointMap return } - // Update service health check. We include entries from the current map, - // with zero-length value, to trigger the healthchecker to stop reporting - // health for that service. - // - // This whole mechanism may be over-designed. It builds a list of endpoints - // per service, filters for local endpoints, builds a string that is the - // same as the name, and then passes each (name, list) pair over a channel. - // - // I am pretty sure that there's no way there can be more than one entry in - // the final list, and passing an empty list as a delete signal is weird. - // It could probably be simplified to a synchronous function call of a set - // of NamespacedNames. I am not making that simplification at this time. - // - // ServicePortName includes the port name, which doesn't matter for - // healthchecks. It's possible that a single update both added and removed - // ports on the same IP, so we need to make sure that removals are counted, - // with additions overriding them. Track all endpoints so we can find local - // ones. - epsBySvcName := map[types.NamespacedName][]*endpointsInfo{} - for svcPort := range curMap { - epsBySvcName[svcPort.NamespacedName] = nil - } + // accumulate local IPs per service, ignoring ports + localIPs := map[types.NamespacedName]sets.String{} for svcPort := range newMap { - epsBySvcName[svcPort.NamespacedName] = append(epsBySvcName[svcPort.NamespacedName], newMap[svcPort]...) - } - for nsn, eps := range epsBySvcName { - // Use a set instead of a slice to provide deduplication - epSet := sets.NewString() - for _, ep := range eps { + for _, ep := range newMap[svcPort] { if ep.isLocal { - // kube-proxy health check only needs local endpoints - epSet.Insert(fmt.Sprintf("%s/%s", nsn.Namespace, nsn.Name)) + nsn := svcPort.NamespacedName + if localIPs[nsn] == nil { + localIPs[nsn] = sets.NewString() + } + ip := strings.Split(ep.endpoint, ":")[0] // just the IP part + localIPs[nsn].Insert(ip) } } - healthChecker.UpdateEndpoints(nsn, epSet) + } + // produce a count per service + localEndpointCounts := map[types.NamespacedName]int{} + for nsn, ips := range localIPs { + localEndpointCounts[nsn] = len(ips) + } + // update healthcheck endpoints + if err := healthChecker.SyncEndpoints(localEndpointCounts); err != nil { + glog.Errorf("Error syncing healthcheck endoints: %v", err) } return newMap, staleSet diff --git a/pkg/proxy/iptables/proxier_test.go b/pkg/proxy/iptables/proxier_test.go index a8c7721035..c938a2c6d3 100644 --- a/pkg/proxy/iptables/proxier_test.go +++ b/pkg/proxy/iptables/proxier_test.go @@ -30,7 +30,6 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/service" "k8s.io/kubernetes/pkg/proxy" @@ -357,21 +356,25 @@ func (f *fakePortOpener) OpenLocalPort(lp *localPort) (closeable, error) { } type fakeHealthChecker struct { - endpoints map[types.NamespacedName]sets.String + services map[types.NamespacedName]uint16 + endpoints map[types.NamespacedName]int } func newFakeHealthChecker() *fakeHealthChecker { return &fakeHealthChecker{ - endpoints: map[types.NamespacedName]sets.String{}, + services: map[types.NamespacedName]uint16{}, + endpoints: map[types.NamespacedName]int{}, } } -func (fake *fakeHealthChecker) UpdateEndpoints(serviceName types.NamespacedName, endpointUIDs sets.String) { - if len(endpointUIDs) == 0 { - delete(fake.endpoints, serviceName) - } else { - fake.endpoints[serviceName] = endpointUIDs - } +func (fake *fakeHealthChecker) SyncServices(newServices map[types.NamespacedName]uint16) error { + fake.services = newServices + return nil +} + +func (fake *fakeHealthChecker) SyncEndpoints(newEndpoints map[types.NamespacedName]int) error { + fake.endpoints = newEndpoints + return nil } const testHostname = "test-hostname" @@ -941,30 +944,18 @@ func TestBuildServiceMapAddRemove(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 8 { t.Errorf("expected service map length 8, got %v", serviceMap) } // The only-local-loadbalancer ones get added - if len(hcAdd) != 2 { - t.Errorf("expected healthcheck add length 2, got %v", hcAdd) + if len(hcPorts) != 1 { + t.Errorf("expected 1 healthcheck port, got %v", hcPorts) } else { - for _, hc := range hcAdd { - if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener added: %v", hc) - } - } - } - - // All the rest get deleted - if len(hcDel) != 6 { - t.Errorf("expected healthcheck del length 6, got %v", hcDel) - } else { - for _, hc := range hcDel { - if hc.namespace.Namespace == "somewhere" && hc.namespace.Name == "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener deleted: %v", hc) - } + nsn := makeNSN("somewhere", "only-local-load-balancer") + if port, found := hcPorts[nsn]; !found || port != 345 { + t.Errorf("expected healthcheck port [%q]=345: got %v", nsn, hcPorts) } } @@ -976,27 +967,13 @@ func TestBuildServiceMapAddRemove(t *testing.T) { // Remove some stuff services = []*api.Service{services[0]} services[0].Spec.Ports = []api.ServicePort{services[0].Spec.Ports[1]} - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(services, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(services, serviceMap) if len(serviceMap) != 1 { t.Errorf("expected service map length 1, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 1, got %v", hcAdd) - } - - // The only OnlyLocal annotation was removed above, so we expect a delete now. - // FIXME: Since the BetaAnnotationHealthCheckNodePort is the same for all - // ServicePorts, we'll get one delete per ServicePort, even though they all - // contain the same information - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) - } else { - for _, hc := range hcDel { - if hc.namespace.Namespace != "somewhere" || hc.namespace.Name != "only-local-load-balancer" { - t.Errorf("unexpected healthcheck listener deleted: %v", hc) - } - } + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } // All services but one were deleted. While you'd expect only the ClusterIPs @@ -1023,17 +1000,14 @@ func TestBuildServiceMapServiceHeadless(t *testing.T) { } // Headless service should be ignored - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { t.Errorf("expected service map length 0, got %d", len(serviceMap)) } // No proxied services, so no healthchecks - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %d", len(hcAdd)) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck del length 0, got %d", len(hcDel)) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %d", len(hcPorts)) } if len(staleUDPServices) != 0 { @@ -1051,16 +1025,13 @@ func TestBuildServiceMapServiceTypeExternalName(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(services, make(proxyServiceMap)) if len(serviceMap) != 0 { t.Errorf("expected service map length 0, got %v", serviceMap) } // No proxied services, so no healthchecks - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck del length 0, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices) @@ -1096,15 +1067,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { }), } - serviceMap, hcAdd, hcDel, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) + serviceMap, hcPorts, staleUDPServices := buildNewServiceMap(first, make(proxyServiceMap)) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { // Services only added, so nothing stale yet @@ -1112,15 +1080,12 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { } // Change service to load-balancer - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 2 { - t.Errorf("expected healthcheck add length 2, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck add length 2, got %v", hcDel) + if len(hcPorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) @@ -1128,30 +1093,24 @@ func TestBuildServiceMapServiceUpdate(t *testing.T) { // No change; make sure the service map stays the same and there are // no health-check changes - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(second, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(second, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 0 { - t.Errorf("expected healthcheck add length 2, got %v", hcDel) + if len(hcPorts) != 1 { + t.Errorf("expected healthcheck ports length 1, got %v", hcPorts) } if len(staleUDPServices) != 0 { t.Errorf("expected stale UDP services length 0, got %v", staleUDPServices.List()) } // And back to ClusterIP - serviceMap, hcAdd, hcDel, staleUDPServices = buildNewServiceMap(first, serviceMap) + serviceMap, hcPorts, staleUDPServices = buildNewServiceMap(first, serviceMap) if len(serviceMap) != 2 { t.Errorf("expected service map length 2, got %v", serviceMap) } - if len(hcAdd) != 0 { - t.Errorf("expected healthcheck add length 0, got %v", hcAdd) - } - if len(hcDel) != 2 { - t.Errorf("expected healthcheck del length 2, got %v", hcDel) + if len(hcPorts) != 0 { + t.Errorf("expected healthcheck ports length 0, got %v", hcPorts) } if len(staleUDPServices) != 0 { // Services only added, so nothing stale yet @@ -1401,13 +1360,14 @@ func makeTestEndpoints(namespace, name string, eptFunc func(*api.Endpoints)) *ap return ept } +func makeNSN(namespace, name string) types.NamespacedName { + return types.NamespacedName{Namespace: namespace, Name: name} +} + func makeServicePortName(ns, name, port string) proxy.ServicePortName { return proxy.ServicePortName{ - NamespacedName: types.NamespacedName{ - Namespace: ns, - Name: name, - }, - Port: port, + NamespacedName: makeNSN(ns, name), + Port: port, } } @@ -1419,14 +1379,14 @@ func Test_buildNewEndpointsMap(t *testing.T) { oldEndpoints map[proxy.ServicePortName][]*endpointsInfo expectedResult map[proxy.ServicePortName][]*endpointsInfo expectedStale []endpointServicePair - expectedHealthchecks map[types.NamespacedName]sets.String + expectedHealthchecks map[types.NamespacedName]int }{{ // Case[0]: nothing newEndpoints: []*api.Endpoints{}, oldEndpoints: map[proxy.ServicePortName][]*endpointsInfo{}, expectedResult: map[proxy.ServicePortName][]*endpointsInfo{}, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[1]: no change, unnamed port newEndpoints: []*api.Endpoints{ @@ -1452,7 +1412,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[2]: no change, named port, local newEndpoints: []*api.Endpoints{ @@ -1480,8 +1440,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, }, }, { // Case[3]: no change, multiple subsets @@ -1523,7 +1483,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[4]: no change, multiple subsets, multiple ports, local newEndpoints: []*api.Endpoints{ @@ -1574,8 +1534,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, }, }, { // Case[5]: no change, multiple endpoints, subsets, IPs, and ports @@ -1682,9 +1642,9 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), - types.NamespacedName{Namespace: "ns2", Name: "ep2"}: sets.NewString("ns2/ep2"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 2, + makeNSN("ns2", "ep2"): 1, }, }, { // Case[6]: add an Endpoints @@ -1708,8 +1668,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, }, }, { // Case[7]: remove an Endpoints @@ -1724,7 +1684,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", ""), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[8]: add an IP and port newEndpoints: []*api.Endpoints{ @@ -1762,8 +1722,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, }, }, { // Case[9]: remove an IP and port @@ -1805,7 +1765,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.2:12", servicePortName: makeServicePortName("ns1", "ep1", "p12"), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[10]: add a subset newEndpoints: []*api.Endpoints{ @@ -1844,8 +1804,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { }, }, expectedStale: []endpointServicePair{}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns1", Name: "ep1"}: sets.NewString("ns1/ep1"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns1", "ep1"): 1, }, }, { // Case[11]: remove a subset @@ -1879,7 +1839,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "2.2.2.2:22", servicePortName: makeServicePortName("ns1", "ep1", "p22"), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[12]: rename a port newEndpoints: []*api.Endpoints{ @@ -1909,7 +1869,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[13]: renumber a port newEndpoints: []*api.Endpoints{ @@ -1939,7 +1899,7 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "1.1.1.1:11", servicePortName: makeServicePortName("ns1", "ep1", "p11"), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{}, + expectedHealthchecks: map[types.NamespacedName]int{}, }, { // Case[14]: complex add and remove newEndpoints: []*api.Endpoints{ @@ -2044,8 +2004,8 @@ func Test_buildNewEndpointsMap(t *testing.T) { endpoint: "4.4.4.6:45", servicePortName: makeServicePortName("ns4", "ep4", "p45"), }}, - expectedHealthchecks: map[types.NamespacedName]sets.String{ - types.NamespacedName{Namespace: "ns4", Name: "ep4"}: sets.NewString("ns4/ep4"), + expectedHealthchecks: map[types.NamespacedName]int{ + makeNSN("ns4", "ep4"): 1, }, }}