From 282880f549cf74faa9182d087411bf43d902d956 Mon Sep 17 00:00:00 2001 From: Girish Kalele Date: Thu, 18 Aug 2016 10:39:52 -0700 Subject: [PATCH] Code review changes --- hack/.linted_packages | 1 + pkg/proxy/healthcheck/api.go | 14 ++++----- pkg/proxy/healthcheck/healthcheck_test.go | 24 ++++++++------- pkg/proxy/healthcheck/http.go | 12 +++++--- pkg/proxy/healthcheck/listener.go | 2 +- pkg/proxy/healthcheck/worker.go | 25 +++------------ pkg/util/healthcheckparser/doc.go | 18 ----------- pkg/util/healthcheckparser/util.go | 37 ----------------------- 8 files changed, 34 insertions(+), 99 deletions(-) delete mode 100644 pkg/util/healthcheckparser/doc.go delete mode 100644 pkg/util/healthcheckparser/util.go diff --git a/hack/.linted_packages b/hack/.linted_packages index 283832a208..f439e74984 100644 --- a/hack/.linted_packages +++ b/hack/.linted_packages @@ -109,6 +109,7 @@ pkg/kubelet/volumemanager/reconciler pkg/kubelet/volume/populator pkg/kubelet/volume/reconciler pkg/proxy/config +pkg/proxy/healthcheck pkg/quota/install pkg/registry pkg/registry/authorization/util diff --git a/pkg/proxy/healthcheck/api.go b/pkg/proxy/healthcheck/api.go index 5e66ce7b50..bec40dae52 100644 --- a/pkg/proxy/healthcheck/api.go +++ b/pkg/proxy/healthcheck/api.go @@ -19,20 +19,16 @@ package healthcheck import ( "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" ) // All public API Methods for this package // UpdateEndpoints Update the set of local endpoints for a service func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) { - // Deepcopy the endpoints set with the latest - endpoints := sets.NewString() - for _, e := range endpointUids.List() { - endpoints.Insert(e) - } req := &proxyMutationRequest{ serviceName: serviceName, - endpointUids: &endpoints, + endpointUids: &endpointUids, } healthchecker.mutationRequestChannel <- req } @@ -61,5 +57,9 @@ func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) boo // Run Start the healthchecker main loop func Run() { - once.Do(run) + healthchecker = proxyHealthCheckFactory() + // Wrap with a wait.Forever to handle panics. + go wait.Forever(func() { + healthchecker.handlerLoop() + }, 0) } diff --git a/pkg/proxy/healthcheck/healthcheck_test.go b/pkg/proxy/healthcheck/healthcheck_test.go index d8518adec2..6ee018a7b0 100644 --- a/pkg/proxy/healthcheck/healthcheck_test.go +++ b/pkg/proxy/healthcheck/healthcheck_test.go @@ -28,12 +28,6 @@ import ( "k8s.io/kubernetes/pkg/util/sets" ) -var ( - start int - end int - choices []byte -) - type TestCaseData struct { nodePorts int numEndpoints int @@ -41,13 +35,17 @@ type TestCaseData struct { svcNames []types.NamespacedName } -func init() { - start = 20000 - end = 40000 +const ( + startPort = 20000 + endPort = 40000 +) + +var ( choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") -} +) func generateRandomString(n int) string { + b := make([]byte, n) l := len(choices) for i := range b { @@ -119,7 +117,7 @@ func TestHealthChecker(t *testing.T) { Run() - ports := start + ports := startPort for n, tc := range testcases { tc.nodePortList = make([]int, tc.nodePorts) tc.svcNames = make([]types.NamespacedName, tc.nodePorts) @@ -136,6 +134,10 @@ func TestHealthChecker(t *testing.T) { DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) // Keep searching for a port that works t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) + if ports > endPort { + t.Errorf("Exhausted range of ports available for tests") + return + } } } t.Logf("Validating if all nodePorts for tc %d work", n) diff --git a/pkg/proxy/healthcheck/http.go b/pkg/proxy/healthcheck/http.go index d37153a3e0..dd3dcf3a75 100644 --- a/pkg/proxy/healthcheck/http.go +++ b/pkg/proxy/healthcheck/http.go @@ -23,9 +23,13 @@ import ( "github.com/golang/glog" ) -// Satisfies the http.Handler interface needed for each service's http.Server listening port. +// A healthCheckHandler serves http requests on /healthz on the service health check node port, +// and responds to every request with either: +// 200 OK and the count of endpoints for the given service that are local to this node. +// or +// 503 Service Unavailable If the count is zero or the service does not exist type healthCheckHandler struct { - svcName string + svcNsName string } // HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object @@ -37,6 +41,6 @@ func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error strin // ServeHTTP: Interface callback method for net.Listener Handlers func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) { - glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcName) - healthchecker.handleHealthCheckRequest(response, h.svcName) + glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName) + healthchecker.handleHealthCheckRequest(response, h.svcNsName) } diff --git a/pkg/proxy/healthcheck/listener.go b/pkg/proxy/healthcheck/listener.go index 607b44c58a..d61e741cc7 100644 --- a/pkg/proxy/healthcheck/listener.go +++ b/pkg/proxy/healthcheck/listener.go @@ -53,7 +53,7 @@ func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool { glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort) server := http.Server{ Addr: fmt.Sprintf(":%d", req.listenPort), - Handler: healthCheckHandler{svcName: req.serviceName.String()}, + Handler: healthCheckHandler{svcNsName: req.serviceName.String()}, } listener, err := net.Listen("tcp", server.Addr) if err != nil { diff --git a/pkg/proxy/healthcheck/worker.go b/pkg/proxy/healthcheck/worker.go index 09a75db649..1c1d60a09e 100644 --- a/pkg/proxy/healthcheck/worker.go +++ b/pkg/proxy/healthcheck/worker.go @@ -18,30 +18,13 @@ limitations under the License. package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" import ( - "sync" "time" - "k8s.io/kubernetes/pkg/util/wait" - "github.com/golang/glog" ) -var once sync.Once var healthchecker *proxyHC -func run() { - allocatedCh := make(chan bool) - go func() { - healthchecker = proxyHealthCheckFactory() - allocatedCh <- true - wait.Forever(func() { - healthchecker.handlerLoop() - }, 0) - }() - // Return only after factory function allocation - <-allocatedCh -} - // handlerLoop Serializes all requests to prevent concurrent access to the maps func (h *proxyHC) handlerLoop() { ticker := time.NewTicker(1 * time.Minute) @@ -53,18 +36,18 @@ func (h *proxyHC) handlerLoop() { case req := <-h.listenerRequestChannel: req.responseChannel <- h.handleServiceListenerRequest(req) case <-ticker.C: - h.sync() + go h.sync() } } } func (h *proxyHC) sync() { - glog.V(2).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) - glog.V(2).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) + glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) + glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) for _, svc := range h.serviceEndpointsMap.ListKeys() { if e, ok := h.serviceEndpointsMap.Get(svc); ok { endpointList := e.(*serviceEndpointsList) - glog.V(2).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) + glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) } } } diff --git a/pkg/util/healthcheckparser/doc.go b/pkg/util/healthcheckparser/doc.go deleted file mode 100644 index 39d8e06206..0000000000 --- a/pkg/util/healthcheckparser/doc.go +++ /dev/null @@ -1,18 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package healthcheckparser provides healthcheck URL generators and parsers -package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser" diff --git a/pkg/util/healthcheckparser/util.go b/pkg/util/healthcheckparser/util.go deleted file mode 100644 index 16cd46ba9e..0000000000 --- a/pkg/util/healthcheckparser/util.go +++ /dev/null @@ -1,37 +0,0 @@ -/* -Copyright 2016 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -// Package healthcheck provides healthcheck URL generators and parsers -package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser" - -import ( - "errors" - "fmt" - "strings" -) - -func GenerateURL(namespace, name string) string { - return fmt.Sprintf("/healthchecks/svc/%s/%s", namespace, name) -} - -func ParseURL(url string) (namespace, name string, err error) { - parts := strings.Split(url, "/") - if len(parts) < 5 { - err = errors.New("Failed to parse url into healthcheck components") - return - } - return parts[3], parts[4], nil -}