Load Balancer Health Check responder library for ESIPP

pull/6/head
Girish Kalele 2016-08-12 15:22:45 -07:00
parent db777bb9a3
commit 29188c68d5
9 changed files with 610 additions and 0 deletions

View File

@ -0,0 +1,65 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
// All public API Methods for this package
// UpdateEndpoints Update the set of local endpoints for a service
func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) {
// Deepcopy the endpoints set with the latest
endpoints := sets.NewString()
for _, e := range endpointUids.List() {
endpoints.Insert(e)
}
req := &proxyMutationRequest{
serviceName: serviceName,
endpointUids: &endpoints,
}
healthchecker.mutationRequestChannel <- req
}
func updateServiceListener(serviceName types.NamespacedName, listenPort int, addOrDelete bool) bool {
responseChannel := make(chan bool)
req := &proxyListenerRequest{
serviceName: serviceName,
listenPort: uint16(listenPort),
add: addOrDelete,
responseChannel: responseChannel,
}
healthchecker.listenerRequestChannel <- req
return <-responseChannel
}
// AddServiceListener Request addition of a listener for a service's health check
func AddServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, true)
}
// DeleteServiceListener Request addition of a listener for a service's health check
func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) bool {
return updateServiceListener(serviceName, listenPort, false)
}
// Run Start the healthchecker main loop
func Run() {
once.Do(run)
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"

View File

@ -0,0 +1,127 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"fmt"
"net"
"net/http"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
// proxyMutationRequest: Message to request addition/deletion of endpoints for a service
type proxyMutationRequest struct {
serviceName types.NamespacedName
endpointUids *sets.String
}
// proxyListenerRequest: Message to request addition/deletion of a service responder on a listening port
type proxyListenerRequest struct {
serviceName types.NamespacedName
listenPort uint16
add bool
responseChannel chan bool
}
// serviceEndpointsList: A list of endpoints for a service
type serviceEndpointsList struct {
serviceName types.NamespacedName
endpoints *sets.String
}
// serviceResponder: Contains net/http datastructures necessary for responding to each Service's health check on its aux nodePort
type serviceResponder struct {
serviceName types.NamespacedName
listenPort uint16
listener *net.Listener
server *http.Server
}
// proxyHC: Handler structure for health check, endpoint add/delete and service listener add/delete requests
type proxyHC struct {
serviceEndpointsMap cache.ThreadSafeStore
serviceResponderMap map[types.NamespacedName]serviceResponder
mutationRequestChannel chan *proxyMutationRequest
listenerRequestChannel chan *proxyListenerRequest
}
// handleHealthCheckRequest - received a health check request - lookup and respond to HC.
func (h *proxyHC) handleHealthCheckRequest(rw http.ResponseWriter, serviceName string) {
s, ok := h.serviceEndpointsMap.Get(serviceName)
if !ok {
glog.V(4).Infof("Service %s not found or has no local endpoints", serviceName)
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "No Service Endpoints Not Found")
return
}
numEndpoints := len(*s.(*serviceEndpointsList).endpoints)
if numEndpoints > 0 {
sendHealthCheckResponse(rw, http.StatusOK, fmt.Sprintf("%d Service Endpoints found", numEndpoints))
return
}
sendHealthCheckResponse(rw, http.StatusServiceUnavailable, "0 local Endpoints are alive")
}
// handleMutationRequest - receive requests to mutate the table entry for a service
func (h *proxyHC) handleMutationRequest(req *proxyMutationRequest) {
numEndpoints := len(*req.endpointUids)
glog.V(4).Infof("LB service health check mutation request Service: %s - %d Endpoints %v",
req.serviceName, numEndpoints, (*req.endpointUids).List())
if numEndpoints == 0 {
if _, ok := h.serviceEndpointsMap.Get(req.serviceName.String()); ok {
glog.V(4).Infof("Deleting endpoints map for service %s, all local endpoints gone", req.serviceName.String())
h.serviceEndpointsMap.Delete(req.serviceName.String())
}
return
}
var entry *serviceEndpointsList
e, exists := h.serviceEndpointsMap.Get(req.serviceName.String())
if exists {
entry = e.(*serviceEndpointsList)
if entry.endpoints.Equal(*req.endpointUids) {
return
}
// Compute differences just for printing logs about additions and removals
deletedEndpoints := entry.endpoints.Difference(*req.endpointUids)
newEndpoints := req.endpointUids.Difference(*entry.endpoints)
for _, e := range newEndpoints.List() {
glog.V(4).Infof("Adding local endpoint %s to LB health check for service %s",
e, req.serviceName.String())
}
for _, d := range deletedEndpoints.List() {
glog.V(4).Infof("Deleted endpoint %s from service %s LB health check (%d endpoints left)",
d, req.serviceName.String(), len(*entry.endpoints))
}
}
entry = &serviceEndpointsList{serviceName: req.serviceName, endpoints: req.endpointUids}
h.serviceEndpointsMap.Add(req.serviceName.String(), entry)
}
// proxyHealthCheckRequest - Factory method to instantiate the health check handler
func proxyHealthCheckFactory() *proxyHC {
glog.V(2).Infof("Initializing kube-proxy health checker")
phc := &proxyHC{
serviceEndpointsMap: cache.NewThreadSafeStore(cache.Indexers{}, cache.Indices{}),
serviceResponderMap: make(map[types.NamespacedName]serviceResponder),
mutationRequestChannel: make(chan *proxyMutationRequest, 1024),
listenerRequestChannel: make(chan *proxyListenerRequest, 1024),
}
return phc
}

View File

@ -0,0 +1,156 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"fmt"
"io/ioutil"
"math/rand"
"net/http"
"testing"
"time"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets"
)
var (
start int
end int
choices []byte
)
type TestCaseData struct {
nodePorts int
numEndpoints int
nodePortList []int
svcNames []types.NamespacedName
}
func init() {
start = 20000
end = 40000
choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
}
func generateRandomString(n int) string {
b := make([]byte, n)
l := len(choices)
for i := range b {
b[i] = choices[rand.Intn(l)]
}
return string(b)
}
func chooseServiceName(tc int, hint int) types.NamespacedName {
var svc types.NamespacedName
svc.Namespace = fmt.Sprintf("ns_%d", tc)
svc.Name = fmt.Sprintf("name_%d", hint)
return svc
}
func generateEndpointSet(max int) sets.String {
s := sets.NewString()
for i := 0; i < max; i++ {
s.Insert(fmt.Sprintf("%d%s", i, generateRandomString(8)))
}
return s
}
func verifyHealthChecks(tc *TestCaseData, t *testing.T) bool {
var success = true
time.Sleep(100 * time.Millisecond)
for i := 0; i < tc.nodePorts; i++ {
t.Logf("Validating HealthCheck works for svc %s nodePort %d\n", tc.svcNames[i], tc.nodePortList[i])
res, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/", tc.nodePortList[i]))
if err != nil {
t.Logf("ERROR: Failed to connect to listening port")
success = false
continue
}
robots, err := ioutil.ReadAll(res.Body)
if res.StatusCode == http.StatusServiceUnavailable {
t.Logf("ERROR: HealthCheck returned %s: %s", res.Status, string(robots))
success = false
continue
}
res.Body.Close()
if err != nil {
t.Logf("Error: reading body of response (%s)", err)
success = false
continue
}
}
if success {
t.Logf("Success: All nodePorts found active")
}
return success
}
func TestHealthChecker(t *testing.T) {
testcases := []TestCaseData{
{
nodePorts: 1,
numEndpoints: 2,
},
{
nodePorts: 10,
numEndpoints: 6,
},
{
nodePorts: 100,
numEndpoints: 1,
},
}
Run()
ports := start
for n, tc := range testcases {
tc.nodePortList = make([]int, tc.nodePorts)
tc.svcNames = make([]types.NamespacedName, tc.nodePorts)
for i := 0; i < tc.nodePorts; i++ {
tc.svcNames[i] = chooseServiceName(n, i)
t.Logf("Updating endpoints map for %s %d", tc.svcNames[i], tc.numEndpoints)
for {
UpdateEndpoints(tc.svcNames[i], generateEndpointSet(tc.numEndpoints))
tc.nodePortList[i] = ports
ports++
if AddServiceListener(tc.svcNames[i], tc.nodePortList[i]) {
break
}
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i])
// Keep searching for a port that works
t.Logf("Failed to bind/listen on port %d...trying next port", ports-1)
}
}
t.Logf("Validating if all nodePorts for tc %d work", n)
if !verifyHealthChecks(&tc, t) {
t.Errorf("Healthcheck validation failed")
}
for i := 0; i < tc.nodePorts; i++ {
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i])
UpdateEndpoints(tc.svcNames[i], sets.NewString())
}
// Ensure that all listeners have been shutdown
if verifyHealthChecks(&tc, t) {
t.Errorf("Healthcheck validation failed")
}
}
}

View File

@ -0,0 +1,42 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
import (
"fmt"
"net/http"
"github.com/golang/glog"
)
// Satisfies the http.Handler interface needed for each service's http.Server listening port.
type healthCheckHandler struct {
svcName string
}
// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object
func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error string) {
rw.Header().Set("Content-Type", "text/plain")
rw.WriteHeader(statusCode)
fmt.Fprint(rw, error)
}
// ServeHTTP: Interface callback method for net.Listener Handlers
func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) {
glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcName)
healthchecker.handleHealthCheckRequest(response, h.svcName)
}

View File

@ -0,0 +1,77 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package healthcheck
// Create/Delete dynamic listeners on the required nodePorts
import (
"fmt"
"net"
"net/http"
"github.com/golang/glog"
)
// handleServiceListenerRequest: receive requests to add/remove service health check listening ports
func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool {
sr, serviceFound := h.serviceResponderMap[req.serviceName]
if !req.add {
if !serviceFound {
return false
}
glog.Infof("Deleting HealthCheckListenPort for service %s port %d",
req.serviceName, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
return true
} else if serviceFound {
if req.listenPort == sr.listenPort {
// Addition requested but responder for service already exists and port is unchanged
return true
}
// Addition requested but responder for service already exists but the listen port has changed
glog.Infof("HealthCheckListenPort for service %s changed from %d to %d - closing old listening port",
req.serviceName, sr.listenPort, req.listenPort)
delete(h.serviceResponderMap, req.serviceName)
(*sr.listener).Close()
}
// Create a service responder object and start listening and serving on the provided port
glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort)
server := http.Server{
Addr: fmt.Sprintf(":%d", req.listenPort),
Handler: healthCheckHandler{svcName: req.serviceName.String()},
}
listener, err := net.Listen("tcp", server.Addr)
if err != nil {
glog.Warningf("FAILED to listen on address %s (%s)\n", server.Addr, err)
return false
}
h.serviceResponderMap[req.serviceName] = serviceResponder{serviceName: req.serviceName,
listenPort: req.listenPort,
listener: &listener,
server: &server}
go func() {
// Anonymous goroutine to block on Serve for this listen port - Serve will exit when the listener is closed
glog.V(3).Infof("Goroutine blocking on serving health checks for %s on port %d", req.serviceName, req.listenPort)
if err := server.Serve(listener); err != nil {
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed with error %s\n", req.listenPort, req.serviceName, err)
return
}
glog.V(3).Infof("Proxy HealthCheck listen socket %d for service %s closed\n", req.listenPort, req.serviceName)
}()
return true
}

View File

@ -0,0 +1,70 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck LoadBalancer Healthcheck responder library for kubernetes network proxies
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
import (
"sync"
"time"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
)
var once sync.Once
var healthchecker *proxyHC
func run() {
allocatedCh := make(chan bool)
go func() {
healthchecker = proxyHealthCheckFactory()
allocatedCh <- true
wait.Forever(func() {
healthchecker.handlerLoop()
}, 0)
}()
// Return only after factory function allocation
<-allocatedCh
}
// handlerLoop Serializes all requests to prevent concurrent access to the maps
func (h *proxyHC) handlerLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case req := <-h.mutationRequestChannel:
h.handleMutationRequest(req)
case req := <-h.listenerRequestChannel:
req.responseChannel <- h.handleServiceListenerRequest(req)
case <-ticker.C:
h.sync()
}
}
}
func (h *proxyHC) sync() {
glog.V(2).Infof("%d Health Check Listeners", len(h.serviceResponderMap))
glog.V(2).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List()))
for _, svc := range h.serviceEndpointsMap.ListKeys() {
if e, ok := h.serviceEndpointsMap.Get(svc); ok {
endpointList := e.(*serviceEndpointsList)
glog.V(2).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len())
}
}
}

View File

@ -0,0 +1,18 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheckparser provides healthcheck URL generators and parsers
package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser"

View File

@ -0,0 +1,37 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck provides healthcheck URL generators and parsers
package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser"
import (
"errors"
"fmt"
"strings"
)
func GenerateURL(namespace, name string) string {
return fmt.Sprintf("/healthchecks/svc/%s/%s", namespace, name)
}
func ParseURL(url string) (namespace, name string, err error) {
parts := strings.Split(url, "/")
if len(parts) < 5 {
err = errors.New("Failed to parse url into healthcheck components")
return
}
return parts[3], parts[4], nil
}