diff --git a/pkg/proxy/healthcheck/api.go b/pkg/proxy/healthcheck/api.go new file mode 100644 index 0000000000..5e66ce7b50 --- /dev/null +++ b/pkg/proxy/healthcheck/api.go @@ -0,0 +1,65 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/sets" +) + +// All public API Methods for this package + +// UpdateEndpoints Update the set of local endpoints for a service +func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) { + // Deepcopy the endpoints set with the latest + endpoints := sets.NewString() + for _, e := range endpointUids.List() { + endpoints.Insert(e) + } + req := &proxyMutationRequest{ + serviceName: serviceName, + endpointUids: &endpoints, + } + healthchecker.mutationRequestChannel <- req +} + +func updateServiceListener(serviceName types.NamespacedName, listenPort int, addOrDelete bool) bool { + responseChannel := make(chan bool) + req := &proxyListenerRequest{ + serviceName: serviceName, + listenPort: uint16(listenPort), + add: addOrDelete, + responseChannel: responseChannel, + } + healthchecker.listenerRequestChannel <- req + return <-responseChannel +} + +// AddServiceListener Request addition of a listener for a service's health check +func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool { + return updateServiceListener(serviceName, listenPort, true) +} + +// DeleteServiceListener Request addition of a listener for a service's health check +func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool { + return updateServiceListener(serviceName, listenPort, false) +} + +// Run Start the healthchecker main loop +func Run() { + once.Do(run) +} diff --git a/pkg/proxy/healthcheck/doc.go b/pkg/proxy/healthcheck/doc.go new file mode 100644 index 0000000000..56ecc11e9a --- /dev/null +++ b/pkg/proxy/healthcheck/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies +package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" diff --git a/pkg/proxy/healthcheck/healthcheck.go b/pkg/proxy/healthcheck/healthcheck.go new file mode 100644 index 0000000000..afce7547a7 --- /dev/null +++ b/pkg/proxy/healthcheck/healthcheck.go @@ -0,0 +1,127 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "fmt" + "net" + "net/http" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/sets" +) + +// proxyMutationRequest: Message to request addition/deletion of endpoints for a service +type proxyMutationRequest struct { + serviceName types.NamespacedName + endpointUids *sets.String +} + +// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port +type proxyListenerRequest struct { + serviceName types.NamespacedName + listenPort uint16 + add bool + responseChannel chan bool +} + +// serviceEndpointsList: A list of endpoints for a service +type serviceEndpointsList struct { + serviceName types.NamespacedName + endpoints *sets.String +} + +// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort +type serviceResponder struct { + serviceName types.NamespacedName + listenPort uint16 + listener *net.Listener + server *http.Server +} + +// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests +type proxyHC struct { + serviceEndpointsMap cache.ThreadSafeStore + serviceResponderMap map[types.NamespacedName]serviceResponder + mutationRequestChannel chan *proxyMutationRequest + listenerRequestChannel chan *proxyListenerRequest +} + +// handleHealthCheckRequest - received a health check request - lookup and respond to HC. +func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) { + s, ok := h.serviceEndpointsMap.Get(serviceName) + if !ok { + glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName) + sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Not Found") + return + } + numEndpoints := len(*s.(*serviceEndpointsList).endpoints) + if numEndpoints > 0 { + sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints)) + return + } + sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive") +} + +// handleMutationRequest - receive requests to mutate the table entry for a service +func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) { + numEndpoints := len(*req.endpointUids) + glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v", + req.serviceName, numEndpoints, (*req.endpointUids).List()) + if numEndpoints == 0 { + if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok { + glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String()) + h.serviceEndpointsMap.Delete(req.serviceName.String()) + } + return + } + var entry *serviceEndpointsList + e, exists := h.serviceEndpointsMap.Get(req.serviceName.String()) + if exists { + entry = e.(*serviceEndpointsList) + if entry.endpoints.Equal(*req.endpointUids) { + return + } + // Compute differences just for printing logs about additions and removals + deletedEndpoints := entry.endpoints.Difference(*req.endpointUids) + newEndpoints := req.endpointUids.Difference(*entry.endpoints) + for _, e := range newEndpoints.List() { + glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s", + e, req.serviceName.String()) + } + for _, d := range deletedEndpoints.List() { + glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)", + d, req.serviceName.String(), len(*entry.endpoints)) + } + } + entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids} + h.serviceEndpointsMap.Add(req.serviceName.String(), entry) +} + +// proxyHealthCheckRequest - Factory method to instantiate the health check handler +func proxyHealthCheckFactory() *proxyHC { + glog.V(2).Infof("Initializing kube-proxy health checker") + phc := &proxyHC{ + serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}), + serviceResponderMap: make(map[types.NamespacedName]serviceResponder), + mutationRequestChannel: make(chan *proxyMutationRequest, 1024), + listenerRequestChannel: make(chan *proxyListenerRequest, 1024), + } + return phc +} diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go new file mode 100644 index 0000000000..d8518adec2 --- /dev/null +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -0,0 +1,156 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "fmt" + "io/ioutil" + "math/rand" + "net/http" + "testing" + "time" + + "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/sets" +) + +var ( + start int + end int + choices []byte +) + +type TestCaseData struct { + nodePorts int + numEndpoints int + nodePortList []int + svcNames []types.NamespacedName +} + +func init() { + start = 20000 + end = 40000 + choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") +} + +func generateRandomString(n int) string { + b := make([]byte, n) + l := len(choices) + for i := range b { + b[i] = choices[rand.Intn(l)] + } + return string(b) +} + +func chooseServiceName(tc int, hint int) types.NamespacedName { + var svc types.NamespacedName + svc.Namespace = fmt.Sprintf("ns_%d", tc) + svc.Name = fmt.Sprintf("name_%d", hint) + return svc +} + +func generateEndpointSet(max int) sets.String { + s := sets.NewString() + for i := 0; i < max; i++ { + s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8))) + } + return s +} + +func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool { + var success = true + time.Sleep(100 * time.Millisecond) + for i := 0; i < tc.nodePorts; i++ { + t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i]) + res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i])) + if err != nil { + t.Logf("ERROR: Failed to connect to listening port") + success = false + continue + } + robots, err := ioutil.ReadAll(res.Body) + if res.StatusCode == http.StatusServiceUnavailable { + t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots)) + success = false + continue + } + res.Body.Close() + if err != nil { + t.Logf("Error: reading body of response (%s)", err) + success = false + continue + } + } + if success { + t.Logf("Success: All nodePorts found active") + } + return success +} + +func TestHealthChecker(t *testing.T) { + testcases := []TestCaseData{ + { + nodePorts: 1, + numEndpoints: 2, + }, + { + nodePorts: 10, + numEndpoints: 6, + }, + { + nodePorts: 100, + numEndpoints: 1, + }, + } + + Run() + + ports := start + for n, tc := range testcases { + tc.nodePortList = make([]int, tc.nodePorts) + tc.svcNames = make([]types.NamespacedName, tc.nodePorts) + for i := 0; i < tc.nodePorts; i++ { + tc.svcNames[i] = chooseServiceName(n, i) + t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints) + for { + UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints)) + tc.nodePortList[i] = ports + ports++ + if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) { + break + } + DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) + // Keep searching for a port that works + t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) + } + } + t.Logf("Validating if all nodePorts for tc %d work", n) + if !verifyHealthChecks(&tc, t) { + t.Errorf("Healthcheck validation failed") + } + + for i := 0; i < tc.nodePorts; i++ { + DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) + UpdateEndpoints(tc.svcNames[i], sets.NewString()) + } + + // Ensure that all listeners have been shutdown + if verifyHealthChecks(&tc, t) { + t.Errorf("Healthcheck validation failed") + } + } +} diff --git a/pkg/proxy/healthcheck/http.go b/pkg/proxy/healthcheck/http.go new file mode 100644 index 0000000000..d37153a3e0 --- /dev/null +++ b/pkg/proxy/healthcheck/http.go @@ -0,0 +1,42 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +import ( + "fmt" + "net/http" + + "github.com/golang/glog" +) + +// Satisfies the http.Handler interface needed for each service's http.Server listening port. +type healthCheckHandler struct { + svcName string +} + +// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object +func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) { + rw.Header().Set("Content-Type", "text/plain") + rw.WriteHeader(statusCode) + fmt.Fprint(rw, error) +} + +// ServeHTTP: Interface callback method for net.Listener Handlers +func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) { + glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcName) + healthchecker.handleHealthCheckRequest(response, h.svcName) +} diff --git a/pkg/proxy/healthcheck/listener.go b/pkg/proxy/healthcheck/listener.go new file mode 100644 index 0000000000..607b44c58a --- /dev/null +++ b/pkg/proxy/healthcheck/listener.go @@ -0,0 +1,77 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package healthcheck + +// Create/Delete dynamic listeners on the required nodePorts + +import ( + "fmt" + "net" + "net/http" + + "github.com/golang/glog" +) + +// handleServiceListenerRequest: receive requests to add/remove service health check listening ports +func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool { + sr, serviceFound := h.serviceResponderMap[req.serviceName] + if !req.add { + if !serviceFound { + return false + } + glog.Infof("Deleting HealthCheckListenPort for service %s port %d", + req.serviceName, req.listenPort) + delete(h.serviceResponderMap, req.serviceName) + (*sr.listener).Close() + return true + } else if serviceFound { + if req.listenPort == sr.listenPort { + // Addition requested but responder for service already exists and port is unchanged + return true + } + // Addition requested but responder for service already exists but the listen port has changed + glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port", + req.serviceName, sr.listenPort, req.listenPort) + delete(h.serviceResponderMap, req.serviceName) + (*sr.listener).Close() + } + // Create a service responder object and start listening and serving on the provided port + glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort) + server := http.Server{ + Addr: fmt.Sprintf(":%d", req.listenPort), + Handler: healthCheckHandler{svcName: req.serviceName.String()}, + } + listener, err := net.Listen("tcp", server.Addr) + if err != nil { + glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err) + return false + } + h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName, + listenPort: req.listenPort, + listener: &listener, + server: &server} + go func() { + // Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed + glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort) + if err := server.Serve(listener); err != nil { + glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err) + return + } + glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName) + }() + return true +} diff --git a/pkg/proxy/healthcheck/worker.go b/pkg/proxy/healthcheck/worker.go new file mode 100644 index 0000000000..09a75db649 --- /dev/null +++ b/pkg/proxy/healthcheck/worker.go @@ -0,0 +1,70 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies +package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" + +import ( + "sync" + "time" + + "k8s.io/kubernetes/pkg/util/wait" + + "github.com/golang/glog" +) + +var once sync.Once +var healthchecker *proxyHC + +func run() { + allocatedCh := make(chan bool) + go func() { + healthchecker = proxyHealthCheckFactory() + allocatedCh <- true + wait.Forever(func() { + healthchecker.handlerLoop() + }, 0) + }() + // Return only after factory function allocation + <-allocatedCh +} + +// handlerLoop Serializes all requests to prevent concurrent access to the maps +func (h *proxyHC) handlerLoop() { + ticker := time.NewTicker(1 * time.Minute) + defer ticker.Stop() + for { + select { + case req := <-h.mutationRequestChannel: + h.handleMutationRequest(req) + case req := <-h.listenerRequestChannel: + req.responseChannel <- h.handleServiceListenerRequest(req) + case <-ticker.C: + h.sync() + } + } +} + +func (h *proxyHC) sync() { + glog.V(2).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) + glog.V(2).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) + for _, svc := range h.serviceEndpointsMap.ListKeys() { + if e, ok := h.serviceEndpointsMap.Get(svc); ok { + endpointList := e.(*serviceEndpointsList) + glog.V(2).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) + } + } +} diff --git a/pkg/util/healthcheckparser/doc.go b/pkg/util/healthcheckparser/doc.go new file mode 100644 index 0000000000..39d8e06206 --- /dev/null +++ b/pkg/util/healthcheckparser/doc.go @@ -0,0 +1,18 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthcheckparser provides healthcheck URL generators and parsers +package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser" diff --git a/pkg/util/healthcheckparser/util.go b/pkg/util/healthcheckparser/util.go new file mode 100644 index 0000000000..16cd46ba9e --- /dev/null +++ b/pkg/util/healthcheckparser/util.go @@ -0,0 +1,37 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package healthcheck provides healthcheck URL generators and parsers +package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser" + +import ( + "errors" + "fmt" + "strings" +) + +func GenerateURL(namespace, name string) string { + return fmt.Sprintf("/healthchecks/svc/%s/%s", namespace, name) +} + +func ParseURL(url string) (namespace, name string, err error) { + parts := strings.Split(url, "/") + if len(parts) < 5 { + err = errors.New("Failed to parse url into healthcheck components") + return + } + return parts[3], parts[4], nil +}