mirror of https://github.com/k3s-io/k3s
Merge pull request #31828 from girishkalele/esipp_toggle
Automatic merge from submit-queue Service behaviour not correct when ESIPP annotation is toggled or deleted Fixes #31649 GCE controller: Correctly add/remove http health checks based on whether the annotation was added or removed kube-proxy: Create/Delete XLB local-endpoints balancer chain and change jump actions from KUBE-FW- chain based on the annotation.pull/6/head
commit
65f3fa9caf
|
@ -719,6 +719,37 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
|
||||||
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
|
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Ensure health checks are created for this target pool to pass to createTargetPool for health check links
|
||||||
|
// Alternately, if the annotation on the service was removed, we need to recreate the target pool without
|
||||||
|
// health checks. This needs to be prior to the forwarding rule deletion below otherwise it is not possible
|
||||||
|
// to delete just the target pool or http health checks later.
|
||||||
|
var hcToCreate *compute.HttpHealthCheck
|
||||||
|
hcExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
|
||||||
|
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
|
return nil, fmt.Errorf("Error checking HTTP health check %s: %v", loadBalancerName, err)
|
||||||
|
}
|
||||||
|
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
|
||||||
|
glog.V(4).Infof("service %v needs health checks on :%d/%s)", apiService.Name, healthCheckNodePort, path)
|
||||||
|
if err != nil {
|
||||||
|
// This logic exists to detect a transition for a pre-existing service and turn on
|
||||||
|
// the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check
|
||||||
|
// to the target pool.
|
||||||
|
glog.V(2).Infof("Annotation %s=%s added to new or pre-existing service",
|
||||||
|
apiservice.AnnotationExternalTraffic,
|
||||||
|
apiservice.AnnotationValueExternalTrafficLocal)
|
||||||
|
tpNeedsUpdate = true
|
||||||
|
}
|
||||||
|
hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to ensure health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
glog.V(4).Infof("service %v does not need health checks", apiService.Name)
|
||||||
|
if err == nil {
|
||||||
|
glog.V(2).Infof("Deleting stale health checks for service %v LB %v", apiService.Name, loadBalancerName)
|
||||||
|
tpNeedsUpdate = true
|
||||||
|
}
|
||||||
|
}
|
||||||
// Now we get to some slightly more interesting logic.
|
// Now we get to some slightly more interesting logic.
|
||||||
// First, neither target pools nor forwarding rules can be updated in place -
|
// First, neither target pools nor forwarding rules can be updated in place -
|
||||||
// they have to be deleted and recreated.
|
// they have to be deleted and recreated.
|
||||||
|
@ -738,17 +769,16 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
|
||||||
}
|
}
|
||||||
if tpExists && tpNeedsUpdate {
|
if tpExists && tpNeedsUpdate {
|
||||||
// Generate the list of health checks for this target pool to pass to deleteTargetPool
|
// Generate the list of health checks for this target pool to pass to deleteTargetPool
|
||||||
var hc *compute.HttpHealthCheck
|
|
||||||
if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
|
if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
|
||||||
var err error
|
var err error
|
||||||
hc, err = gce.GetHttpHealthCheck(loadBalancerName)
|
hcExisting, err = gce.GetHttpHealthCheck(loadBalancerName)
|
||||||
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
|
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
|
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself.
|
// Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself.
|
||||||
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil {
|
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcExisting); err != nil {
|
||||||
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
|
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
|
||||||
}
|
}
|
||||||
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
|
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
|
||||||
|
@ -761,21 +791,13 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *api.Serv
|
||||||
if len(hosts) > maxTargetPoolCreateInstances {
|
if len(hosts) > maxTargetPoolCreateInstances {
|
||||||
createInstances = createInstances[:maxTargetPoolCreateInstances]
|
createInstances = createInstances[:maxTargetPoolCreateInstances]
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create health checks for this target pool to pass to createTargetPool for health check links
|
|
||||||
var hc *compute.HttpHealthCheck
|
|
||||||
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
|
|
||||||
glog.Infof("service %v needs health checks on :%d/%s)", apiService.Name, healthCheckNodePort, path)
|
|
||||||
var err error
|
|
||||||
hc, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("Failed to create health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
|
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
|
||||||
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hc); err != nil {
|
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hcToCreate); err != nil {
|
||||||
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
|
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
|
||||||
}
|
}
|
||||||
|
if hcToCreate != nil {
|
||||||
|
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks for target pool", loadBalancerName, serviceName)
|
||||||
|
}
|
||||||
if len(hosts) <= maxTargetPoolCreateInstances {
|
if len(hosts) <= maxTargetPoolCreateInstances {
|
||||||
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
|
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
|
||||||
} else {
|
} else {
|
||||||
|
@ -838,17 +860,21 @@ func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *c
|
||||||
glog.Errorf("Failed to get http health check %v", err)
|
glog.Errorf("Failed to get http health check %v", err)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
glog.Infof("Created HTTP health check %v healthCheckNodePort: %d", name, port)
|
||||||
return hc, nil
|
return hc, nil
|
||||||
}
|
}
|
||||||
// Validate health check fields
|
// Validate health check fields
|
||||||
|
glog.V(4).Infof("Checking http health check params %s", name)
|
||||||
drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name)
|
drift := hc.Port != int64(port) || hc.RequestPath != path || hc.Description != makeHealthCheckDescription(name)
|
||||||
drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds
|
drift = drift || hc.CheckIntervalSec != gceHcCheckIntervalSeconds || hc.TimeoutSec != gceHcTimeoutSeconds
|
||||||
drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold
|
drift = drift || hc.UnhealthyThreshold != gceHcUnhealthyThreshold || hc.HealthyThreshold != gceHcHealthyThreshold
|
||||||
if drift {
|
if drift {
|
||||||
glog.Infof("Health check %v exists but parameters have drifted - updating", name)
|
glog.Warningf("Health check %v exists but parameters have drifted - updating...", name)
|
||||||
if err := gce.UpdateHttpHealthCheck(newHC); err != nil {
|
if err := gce.UpdateHttpHealthCheck(newHC); err != nil {
|
||||||
|
glog.Warningf("Failed to reconcile http health check %v parameters", name)
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
glog.V(4).Infof("Corrected health check %v parameters successful", name)
|
||||||
}
|
}
|
||||||
return hc, nil
|
return hc, nil
|
||||||
}
|
}
|
||||||
|
@ -1421,13 +1447,6 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
|
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
|
||||||
if hc != nil {
|
|
||||||
glog.Infof("Deleting health check %v", hc.Name)
|
|
||||||
if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil {
|
|
||||||
glog.Warningf("Failed to delete health check %v: %v", hc, err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
|
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
|
||||||
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
|
||||||
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
|
glog.Infof("Target pool %s already deleted. Continuing to delete other resources.", name)
|
||||||
|
@ -1440,6 +1459,23 @@ func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealt
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Deletion of health checks is allowed only after the TargetPool reference is deleted
|
||||||
|
if hc != nil {
|
||||||
|
glog.Infof("Deleting health check %v", hc.Name)
|
||||||
|
if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil {
|
||||||
|
glog.Warningf("Failed to delete health check %v: %v", hc, err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// This is a HC cleanup attempt to prevent stale HCs when errors are encountered
|
||||||
|
// during HC deletion in a prior pass through EnsureLoadBalancer.
|
||||||
|
// The HC name matches the load balancer name - normally this is expected to fail.
|
||||||
|
if err := gce.DeleteHttpHealthCheck(name); err == nil {
|
||||||
|
// We only print a warning if this deletion actually succeeded (which
|
||||||
|
// means there was indeed a stale health check with the LB name.
|
||||||
|
glog.Warningf("Deleted stale http health check for LB: %s", name)
|
||||||
|
}
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -346,6 +346,10 @@ func (proxier *Proxier) sameConfig(info *serviceInfo, service *api.Service, port
|
||||||
if info.sessionAffinityType != service.Spec.SessionAffinity {
|
if info.sessionAffinityType != service.Spec.SessionAffinity {
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
onlyNodeLocalEndpoints := apiservice.NeedsHealthCheck(service) && featuregate.DefaultFeatureGate.ExternalTrafficLocalOnly()
|
||||||
|
if info.onlyNodeLocalEndpoints != onlyNodeLocalEndpoints {
|
||||||
|
return false
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -446,6 +450,9 @@ func (proxier *Proxier) OnServiceUpdate(allServices []api.Service) {
|
||||||
// Turn on healthcheck responder to listen on the health check nodePort
|
// Turn on healthcheck responder to listen on the health check nodePort
|
||||||
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
|
healthcheck.AddServiceListener(serviceName.NamespacedName, info.healthCheckNodePort)
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
// Delete healthcheck responders, if any, previously listening for this service
|
||||||
|
healthcheck.DeleteServiceListener(serviceName.NamespacedName, 0)
|
||||||
}
|
}
|
||||||
proxier.serviceMap[serviceName] = info
|
proxier.serviceMap[serviceName] = info
|
||||||
|
|
||||||
|
@ -895,6 +902,9 @@ func (proxier *Proxier) syncProxyRules() {
|
||||||
writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
|
writeLine(natChains, utiliptables.MakeChainLine(svcXlbChain))
|
||||||
}
|
}
|
||||||
activeNATChains[svcXlbChain] = true
|
activeNATChains[svcXlbChain] = true
|
||||||
|
} else if activeNATChains[svcXlbChain] {
|
||||||
|
// Cleanup the previously created XLB chain for this service
|
||||||
|
delete(activeNATChains, svcXlbChain)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Capture the clusterIP.
|
// Capture the clusterIP.
|
||||||
|
|
Loading…
Reference in New Issue