Code review changes

pull/6/head
Girish Kalele 2016-08-18 10:39:52 -07:00
parent 29188c68d5
commit 282880f549
8 changed files with 34 additions and 99 deletions

View File

@ -109,6 +109,7 @@ pkg/kubelet/volumemanager/reconciler
pkg/kubelet/volume/populator pkg/kubelet/volume/populator
pkg/kubelet/volume/reconciler pkg/kubelet/volume/reconciler
pkg/proxy/config pkg/proxy/config
pkg/proxy/healthcheck
pkg/quota/install pkg/quota/install
pkg/registry pkg/registry
pkg/registry/authorization/util pkg/registry/authorization/util

View File

@ -19,20 +19,16 @@ package healthcheck
import ( import (
"k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/util/wait"
) )
// All public API Methods for this package // All public API Methods for this package
// UpdateEndpoints Update the set of local endpoints for a service // UpdateEndpoints Update the set of local endpoints for a service
func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) { func UpdateEndpoints(serviceName types.NamespacedName, endpointUids sets.String) {
// Deepcopy the endpoints set with the latest
endpoints := sets.NewString()
for _, e := range endpointUids.List() {
endpoints.Insert(e)
}
req := &proxyMutationRequest{ req := &proxyMutationRequest{
serviceName: serviceName, serviceName: serviceName,
endpointUids: &endpoints, endpointUids: &endpointUids,
} }
healthchecker.mutationRequestChannel <- req healthchecker.mutationRequestChannel <- req
} }
@ -61,5 +57,9 @@ func DeleteServiceListener(serviceName types.NamespacedName, listenPort int) boo
// Run Start the healthchecker main loop // Run Start the healthchecker main loop
func Run() { func Run() {
once.Do(run) healthchecker = proxyHealthCheckFactory()
// Wrap with a wait.Forever to handle panics.
go wait.Forever(func() {
healthchecker.handlerLoop()
}, 0)
} }

View File

@ -28,12 +28,6 @@ import (
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
var (
start int
end int
choices []byte
)
type TestCaseData struct { type TestCaseData struct {
nodePorts int nodePorts int
numEndpoints int numEndpoints int
@ -41,13 +35,17 @@ type TestCaseData struct {
svcNames []types.NamespacedName svcNames []types.NamespacedName
} }
func init() { const (
start = 20000 startPort = 20000
end = 40000 endPort = 40000
)
var (
choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") choices = []byte("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")
} )
func generateRandomString(n int) string { func generateRandomString(n int) string {
b := make([]byte, n) b := make([]byte, n)
l := len(choices) l := len(choices)
for i := range b { for i := range b {
@ -119,7 +117,7 @@ func TestHealthChecker(t *testing.T) {
Run() Run()
ports := start ports := startPort
for n, tc := range testcases { for n, tc := range testcases {
tc.nodePortList = make([]int, tc.nodePorts) tc.nodePortList = make([]int, tc.nodePorts)
tc.svcNames = make([]types.NamespacedName, tc.nodePorts) tc.svcNames = make([]types.NamespacedName, tc.nodePorts)
@ -136,6 +134,10 @@ func TestHealthChecker(t *testing.T) {
DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i]) DeleteServiceListener(tc.svcNames[i], tc.nodePortList[i])
// Keep searching for a port that works // Keep searching for a port that works
t.Logf("Failed to bind/listen on port %d...trying next port", ports-1) t.Logf("Failed to bind/listen on port %d...trying next port", ports-1)
if ports > endPort {
t.Errorf("Exhausted range of ports available for tests")
return
}
} }
} }
t.Logf("Validating if all nodePorts for tc %d work", n) t.Logf("Validating if all nodePorts for tc %d work", n)

View File

@ -23,9 +23,13 @@ import (
"github.com/golang/glog" "github.com/golang/glog"
) )
// Satisfies the http.Handler interface needed for each service's http.Server listening port. // A healthCheckHandler serves http requests on /healthz on the service health check node port,
// and responds to every request with either:
// 200 OK and the count of endpoints for the given service that are local to this node.
// or
// 503 Service Unavailable If the count is zero or the service does not exist
type healthCheckHandler struct { type healthCheckHandler struct {
svcName string svcNsName string
} }
// HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object // HTTP Utility function to send the required statusCode and error text to a http.ResponseWriter object
@ -37,6 +41,6 @@ func sendHealthCheckResponse(rw http.ResponseWriter, statusCode int, error strin
// ServeHTTP: Interface callback method for net.Listener Handlers // ServeHTTP: Interface callback method for net.Listener Handlers
func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) { func (h healthCheckHandler) ServeHTTP(response http.ResponseWriter, req *http.Request) {
glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcName) glog.V(4).Infof("Received HC Request Service %s from Cloud Load Balancer", h.svcNsName)
healthchecker.handleHealthCheckRequest(response, h.svcName) healthchecker.handleHealthCheckRequest(response, h.svcNsName)
} }

View File

@ -53,7 +53,7 @@ func (h *proxyHC) handleServiceListenerRequest(req *proxyListenerRequest) bool {
glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort) glog.V(2).Infof("Adding health check listener for service %s on nodePort %d", req.serviceName, req.listenPort)
server := http.Server{ server := http.Server{
Addr: fmt.Sprintf(":%d", req.listenPort), Addr: fmt.Sprintf(":%d", req.listenPort),
Handler: healthCheckHandler{svcName: req.serviceName.String()}, Handler: healthCheckHandler{svcNsName: req.serviceName.String()},
} }
listener, err := net.Listen("tcp", server.Addr) listener, err := net.Listen("tcp", server.Addr)
if err != nil { if err != nil {

View File

@ -18,30 +18,13 @@ limitations under the License.
package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck" package healthcheck // import "k8s.io/kubernetes/pkg/proxy/healthcheck"
import ( import (
"sync"
"time" "time"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog" "github.com/golang/glog"
) )
var once sync.Once
var healthchecker *proxyHC var healthchecker *proxyHC
func run() {
allocatedCh := make(chan bool)
go func() {
healthchecker = proxyHealthCheckFactory()
allocatedCh <- true
wait.Forever(func() {
healthchecker.handlerLoop()
}, 0)
}()
// Return only after factory function allocation
<-allocatedCh
}
// handlerLoop Serializes all requests to prevent concurrent access to the maps // handlerLoop Serializes all requests to prevent concurrent access to the maps
func (h *proxyHC) handlerLoop() { func (h *proxyHC) handlerLoop() {
ticker := time.NewTicker(1 * time.Minute) ticker := time.NewTicker(1 * time.Minute)
@ -53,18 +36,18 @@ func (h *proxyHC) handlerLoop() {
case req := <-h.listenerRequestChannel: case req := <-h.listenerRequestChannel:
req.responseChannel <- h.handleServiceListenerRequest(req) req.responseChannel <- h.handleServiceListenerRequest(req)
case <-ticker.C: case <-ticker.C:
h.sync() go h.sync()
} }
} }
} }
func (h *proxyHC) sync() { func (h *proxyHC) sync() {
glog.V(2).Infof("%d Health Check Listeners", len(h.serviceResponderMap)) glog.V(4).Infof("%d Health Check Listeners", len(h.serviceResponderMap))
glog.V(2).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List())) glog.V(4).Infof("%d Services registered for health checking", len(h.serviceEndpointsMap.List()))
for _, svc := range h.serviceEndpointsMap.ListKeys() { for _, svc := range h.serviceEndpointsMap.ListKeys() {
if e, ok := h.serviceEndpointsMap.Get(svc); ok { if e, ok := h.serviceEndpointsMap.Get(svc); ok {
endpointList := e.(*serviceEndpointsList) endpointList := e.(*serviceEndpointsList)
glog.V(2).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len()) glog.V(4).Infof("Service %s has %d local endpoints", svc, endpointList.endpoints.Len())
} }
} }
} }

View File

@ -1,18 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheckparser provides healthcheck URL generators and parsers
package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser"

View File

@ -1,37 +0,0 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package healthcheck provides healthcheck URL generators and parsers
package healthcheckparser // import "k8s.io/kubernetes/pkg/util/healthcheckparser"
import (
"errors"
"fmt"
"strings"
)
func GenerateURL(namespace, name string) string {
return fmt.Sprintf("/healthchecks/svc/%s/%s", namespace, name)
}
func ParseURL(url string) (namespace, name string, err error) {
parts := strings.Split(url, "/")
if len(parts) < 5 {
err = errors.New("Failed to parse url into healthcheck components")
return
}
return parts[3], parts[4], nil
}