Fix service health check node port leaks

pull/6/head
Girish Kalele 2016-09-01 14:13:12 -07:00
parent 943d7aa5aa
commit 02754547fb
1 changed files with 108 additions and 1 deletions

View File

@ -37,6 +37,7 @@ import (
"k8s.io/kubernetes/pkg/runtime"
featuregate "k8s.io/kubernetes/pkg/util/config"
utilnet "k8s.io/kubernetes/pkg/util/net"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/validation/field"
"k8s.io/kubernetes/pkg/watch"
)
@ -157,7 +158,7 @@ func (rs *REST) Create(ctx api.Context, obj runtime.Object) (runtime.Object, err
}
}
if featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly() && shouldCheckOrAssignHealthCheckNodePort(service) {
if shouldCheckOrAssignHealthCheckNodePort(service) {
var healthCheckNodePort int
var err error
if l, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok {
@ -234,6 +235,16 @@ func (rs *REST) Delete(ctx api.Context, id string) (runtime.Object, error) {
}
}
if shouldCheckOrAssignHealthCheckNodePort(service) {
nodePort := apiservice.GetServiceHealthCheckNodePort(service)
if nodePort > 0 {
err := rs.serviceNodePorts.Release(int(nodePort))
if err != nil {
// these should be caught by an eventual reconciliation / restart
utilruntime.HandleError(fmt.Errorf("Error releasing service health check %s node port %d: %v", service.Name, nodePort, err))
}
}
}
return &unversioned.Status{Status: unversioned.StatusSuccess}, nil
}
@ -265,6 +276,97 @@ func (*REST) NewList() runtime.Object {
return &api.ServiceList{}
}
func (rs *REST) healthCheckNodePortUpdate(oldService, service *api.Service) (bool, error) {
// Health Check Node Port handling during updates
//
// Case 1. Transition from globalTraffic to OnlyLocal for the ESIPP annotation
//
// Allocate a health check node port or attempt to reserve the user-specified one, if provided.
// Insert health check node port as an annotation into the service's annotations
//
// Case 2. Transition from OnlyLocal to Global for the ESIPP annotation
//
// Free the existing healthCheckNodePort and clear the health check nodePort annotation
//
// Case 3. No change (Global ---stays--> Global) but prevent invalid annotation manipulations
//
// Reject insertion of the "service.alpha.kubernetes.io/healthcheck-nodeport" annotation
//
// Case 4. No change (OnlyLocal ---stays--> OnlyLocal) but prevent invalid annotation manipulations
//
// Reject deletion of the "service.alpha.kubernetes.io/healthcheck-nodeport" annotation
// Reject changing the value of the healthCheckNodePort annotation
//
oldServiceHasHealthCheckNodePort := shouldCheckOrAssignHealthCheckNodePort(oldService)
oldHealthCheckNodePort := apiservice.GetServiceHealthCheckNodePort(oldService)
assignHealthCheckNodePort := shouldCheckOrAssignHealthCheckNodePort(service)
requestedHealthCheckNodePort := apiservice.GetServiceHealthCheckNodePort(service)
switch {
case !oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort:
glog.Infof("Transition from Global LB service to OnlyLocal service")
if requestedHealthCheckNodePort > 0 {
// If the request has a health check nodePort in mind, attempt to reserve it
err := rs.serviceNodePorts.Allocate(int(requestedHealthCheckNodePort))
if err != nil {
errmsg := fmt.Sprintf("Failed to allocate requested HealthCheck nodePort %v:%v",
requestedHealthCheckNodePort, err)
el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"),
apiservice.AnnotationHealthCheckNodePort, errmsg)}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
glog.Infof("Reserved user requested nodePort: %d", requestedHealthCheckNodePort)
} else {
// If the request has no health check nodePort specified, allocate any
healthCheckNodePort, err := rs.serviceNodePorts.AllocateNext()
if err != nil {
// TODO: what error should be returned here? It's not a
// field-level validation failure (the field is valid), and it's
// not really an internal error.
return false, errors.NewInternalError(fmt.Errorf("failed to allocate a nodePort: %v", err))
}
// Insert the newly allocated health check port as an annotation (plan of record for Alpha)
service.Annotations[apiservice.AnnotationHealthCheckNodePort] = fmt.Sprintf("%d", healthCheckNodePort)
glog.Infof("Reserved health check nodePort: %d", healthCheckNodePort)
}
case oldServiceHasHealthCheckNodePort && !assignHealthCheckNodePort:
glog.Infof("Transition from OnlyLocal LB service to Global service")
err := rs.serviceNodePorts.Release(int(oldHealthCheckNodePort))
if err != nil {
glog.Warningf("Error releasing service health check %s node port %d: %v", service.Name, oldHealthCheckNodePort, err)
return false, errors.NewInternalError(fmt.Errorf("failed to free health check nodePort: %v", err))
} else {
delete(service.Annotations, apiservice.AnnotationHealthCheckNodePort)
glog.Infof("Freed health check nodePort: %d", oldHealthCheckNodePort)
}
case !oldServiceHasHealthCheckNodePort && !assignHealthCheckNodePort:
if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; ok {
glog.Warningf("Attempt to insert health check node port annotation DENIED")
el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"),
apiservice.AnnotationHealthCheckNodePort, "Cannot insert healthcheck nodePort annotation")}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
case oldServiceHasHealthCheckNodePort && assignHealthCheckNodePort:
if _, ok := service.Annotations[apiservice.AnnotationHealthCheckNodePort]; !ok {
glog.Warningf("Attempt to delete health check node port annotation DENIED")
el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"),
apiservice.AnnotationHealthCheckNodePort, "Cannot delete healthcheck nodePort annotation")}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
if oldHealthCheckNodePort != requestedHealthCheckNodePort {
glog.Warningf("Attempt to change value of health check node port annotation DENIED")
el := field.ErrorList{field.Invalid(field.NewPath("metadata", "annotations"),
apiservice.AnnotationHealthCheckNodePort, "Cannot change healthcheck nodePort during update")}
return false, errors.NewInvalid(api.Kind("Service"), service.Name, el)
}
}
return true, nil
}
func (rs *REST) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectInfo) (runtime.Object, bool, error) {
oldService, err := rs.registry.GetService(ctx, name)
if err != nil {
@ -342,6 +444,11 @@ func (rs *REST) Update(ctx api.Context, name string, objInfo rest.UpdatedObjectI
service.Status.LoadBalancer = api.LoadBalancerStatus{}
}
success, err := rs.healthCheckNodePortUpdate(oldService, service)
if !success {
return nil, false, err
}
out, err := rs.registry.UpdateService(ctx, service)
if err == nil {