Create nodes health checks for non-OnlyLocal services

pull/6/head
Zihong Zheng 2017-05-25 20:30:37 -07:00
parent 7bc6da0b77
commit b4633b0600
6 changed files with 428 additions and 121 deletions

View File

@ -22,6 +22,7 @@ import (
"net/http"
"regexp"
"strings"
"sync"
"time"
"cloud.google.com/go/compute/metadata"
@ -80,7 +81,7 @@ type GCECloud struct {
serviceBeta *computebeta.Service
containerService *container.Service
clientBuilder controller.ControllerClientBuilder
ClusterId ClusterId
ClusterID ClusterID
projectID string
region string
localZone string // The zone in which we are running
@ -92,6 +93,11 @@ type GCECloud struct {
useMetadataServer bool
operationPollRateLimiter flowcontrol.RateLimiter
manager ServiceManager
// sharedResourceLock is used to serialize GCE operations that may mutate shared state to
// prevent inconsistencies. For example, load balancers manipulation methods will take the
// lock to prevent shared resources from being prematurely deleted while the operation is
// in progress.
sharedResourceLock sync.Mutex
}
type ServiceManager interface {
@ -270,10 +276,10 @@ func CreateGCECloud(projectID, region, zone string, managedZones []string, netwo
}
// Initialize takes in a clientBuilder and spawns a goroutine for watching the clusterid configmap.
// This must be called before utilizing the funcs of gce.ClusterId
// This must be called before utilizing the funcs of gce.ClusterID
func (gce *GCECloud) Initialize(clientBuilder controller.ControllerClientBuilder) {
gce.clientBuilder = clientBuilder
go gce.watchClusterId()
go gce.watchClusterID()
}
// LoadBalancer returns an implementation of LoadBalancer for Google Compute Engine.

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -49,18 +49,18 @@ const (
updateFuncFrequency = 10 * time.Minute
)
type ClusterId struct {
type ClusterID struct {
idLock sync.RWMutex
client clientset.Interface
cfgMapKey string
store cache.Store
providerId *string
clusterId *string
providerID *string
clusterID *string
}
// Continually watches for changes to the cluser id config map
func (gce *GCECloud) watchClusterId() {
gce.ClusterId = ClusterId{
// Continually watches for changes to the cluster id config map
func (gce *GCECloud) watchClusterID() {
gce.ClusterID = ClusterID{
cfgMapKey: fmt.Sprintf("%v/%v", UIDNamespace, UIDConfigMapName),
client: gce.clientBuilder.ClientOrDie("cloud-provider"),
}
@ -77,8 +77,8 @@ func (gce *GCECloud) watchClusterId() {
return
}
glog.V(4).Infof("Observed new configmap for clusterid: %v, %v; setting local values", m.Name, m.Data)
gce.ClusterId.setIds(m)
glog.V(4).Infof("Observed new configmap for clusteriD: %v, %v; setting local values", m.Name, m.Data)
gce.ClusterID.update(m)
},
UpdateFunc: func(old, cur interface{}) {
m, ok := cur.(*v1.ConfigMap)
@ -96,71 +96,71 @@ func (gce *GCECloud) watchClusterId() {
return
}
glog.V(4).Infof("Observed updated configmap for clusterid %v, %v; setting local values", m.Name, m.Data)
gce.ClusterId.setIds(m)
glog.V(4).Infof("Observed updated configmap for clusteriD %v, %v; setting local values", m.Name, m.Data)
gce.ClusterID.update(m)
},
}
listerWatcher := cache.NewListWatchFromClient(gce.ClusterId.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything())
listerWatcher := cache.NewListWatchFromClient(gce.ClusterID.client.Core().RESTClient(), "configmaps", UIDNamespace, fields.Everything())
var controller cache.Controller
gce.ClusterId.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler)
gce.ClusterID.store, controller = cache.NewInformer(newSingleObjectListerWatcher(listerWatcher, UIDConfigMapName), &v1.ConfigMap{}, updateFuncFrequency, mapEventHandler)
controller.Run(nil)
}
// GetId returns the id which is unique to this cluster
// GetID returns the id which is unique to this cluster
// if federated, return the provider id (unique to the cluster)
// if not federated, return the cluster id
func (ci *ClusterId) GetId() (string, error) {
func (ci *ClusterID) GetID() (string, error) {
if err := ci.getOrInitialize(); err != nil {
return "", err
}
ci.idLock.RLock()
defer ci.idLock.RUnlock()
if ci.clusterId == nil {
if ci.clusterID == nil {
return "", errors.New("Could not retrieve cluster id")
}
// If provider ID is set, (Federation is enabled) use this field
if ci.providerId != nil && *ci.providerId != *ci.clusterId {
return *ci.providerId, nil
if ci.providerID != nil {
return *ci.providerID, nil
}
// providerId is not set, use the cluster id
return *ci.clusterId, nil
// providerID is not set, use the cluster id
return *ci.clusterID, nil
}
// GetFederationId returns the id which could represent the entire Federation
// or just the cluster if not federated.
func (ci *ClusterId) GetFederationId() (string, bool, error) {
func (ci *ClusterID) GetFederationId() (string, bool, error) {
if err := ci.getOrInitialize(); err != nil {
return "", false, err
}
ci.idLock.RLock()
defer ci.idLock.RUnlock()
if ci.clusterId == nil {
if ci.clusterID == nil {
return "", false, errors.New("Could not retrieve cluster id")
}
// If provider ID is not set, return false
if ci.providerId == nil || *ci.clusterId == *ci.providerId {
if ci.providerID == nil || *ci.clusterID == *ci.providerID {
return "", false, nil
}
return *ci.clusterId, true, nil
return *ci.clusterID, true, nil
}
// getOrInitialize either grabs the configmaps current value or defines the value
// and sets the configmap. This is for the case of the user calling GetClusterId()
// and sets the configmap. This is for the case of the user calling GetClusterID()
// before the watch has begun.
func (ci *ClusterId) getOrInitialize() error {
func (ci *ClusterID) getOrInitialize() error {
if ci.store == nil {
return errors.New("GCECloud.ClusterId is not ready. Call Initialize() before using.")
return errors.New("GCECloud.ClusterID is not ready. Call Initialize() before using.")
}
if ci.clusterId != nil {
if ci.clusterID != nil {
return nil
}
@ -177,7 +177,7 @@ func (ci *ClusterId) getOrInitialize() error {
return err
}
glog.V(4).Infof("Creating clusterid: %v", newId)
glog.V(4).Infof("Creating clusteriD: %v", newId)
cfg := &v1.ConfigMap{
ObjectMeta: metav1.ObjectMeta{
Name: UIDConfigMapName,
@ -194,12 +194,12 @@ func (ci *ClusterId) getOrInitialize() error {
return err
}
glog.V(2).Infof("Created a config map containing clusterid: %v", newId)
ci.setIds(cfg)
glog.V(2).Infof("Created a config map containing clusteriD: %v", newId)
ci.update(cfg)
return nil
}
func (ci *ClusterId) getConfigMap() (bool, error) {
func (ci *ClusterID) getConfigMap() (bool, error) {
item, exists, err := ci.store.GetByKey(ci.cfgMapKey)
if err != nil {
return false, err
@ -214,18 +214,18 @@ func (ci *ClusterId) getConfigMap() (bool, error) {
glog.Error(err)
return false, err
}
ci.setIds(m)
ci.update(m)
return true, nil
}
func (ci *ClusterId) setIds(m *v1.ConfigMap) {
func (ci *ClusterID) update(m *v1.ConfigMap) {
ci.idLock.Lock()
defer ci.idLock.Unlock()
if clusterId, exists := m.Data[UIDCluster]; exists {
ci.clusterId = &clusterId
if clusterID, exists := m.Data[UIDCluster]; exists {
ci.clusterID = &clusterID
}
if provId, exists := m.Data[UIDProvider]; exists {
ci.providerId = &provId
ci.providerID = &provId
}
}

View File

@ -17,11 +17,23 @@ limitations under the License.
package gce
import (
"fmt"
"time"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/master/ports"
utilversion "k8s.io/kubernetes/pkg/util/version"
"github.com/golang/glog"
compute "google.golang.org/api/compute/v1"
)
const (
minNodesHealthCheckVersion = "1.7.0"
nodesHealthCheckPath = "/healthz"
lbNodesHealthCheckPort = ports.ProxyHealthzPort
)
func newHealthcheckMetricContext(request string) *metricContext {
return &metricContext{
start: time.Now(),
@ -178,3 +190,59 @@ func (gce *GCECloud) ListHealthChecks() (*compute.HealthCheckList, error) {
v, err := gce.service.HealthChecks.List(gce.projectID).Do()
return v, mc.Observe(err)
}
// GetNodesHealthCheckPort returns the health check port used by the GCE load
// balancers (l4) for performing health checks on nodes.
func GetNodesHealthCheckPort() int32 {
return lbNodesHealthCheckPort
}
// getNodesHealthCheckPath returns the health check path used by the GCE load
// balancers (l4) for performing health checks on nodes.
func getNodesHealthCheckPath() string {
return nodesHealthCheckPath
}
// makeNodesHealthCheckName returns name of the health check resource used by
// the GCE load balancers (l4) for performing health checks on nodes.
func makeNodesHealthCheckName(clusterID string) string {
return fmt.Sprintf("k8s-%v-node", clusterID)
}
// MakeHealthCheckFirewallName returns the firewall name used by the GCE load
// balancers (l4) for performing health checks.
func MakeHealthCheckFirewallName(clusterID, hcName string, isNodesHealthCheck bool) string {
// TODO: Change below fwName to match the proposed schema: k8s-{clusteriD}-{namespace}-{name}-{shortid}-hc.
fwName := "k8s-" + hcName + "-http-hc"
if isNodesHealthCheck {
fwName = makeNodesHealthCheckName(clusterID) + "-http-hc"
}
return fwName
}
// isAtLeastMinNodesHealthCheckVersion checks if a version is higher than
// `minNodesHealthCheckVersion`.
func isAtLeastMinNodesHealthCheckVersion(vstring string) bool {
minVersion, err := utilversion.ParseGeneric(minNodesHealthCheckVersion)
if err != nil {
glog.Errorf("MinNodesHealthCheckVersion (%s) is not a valid version string: %v", minNodesHealthCheckVersion, err)
return false
}
version, err := utilversion.ParseGeneric(vstring)
if err != nil {
glog.Errorf("vstring (%s) is not a valid version string: %v", vstring, err)
return false
}
return version.AtLeast(minVersion)
}
// supportsNodesHealthCheck returns false if anyone of the nodes has version
// lower than `minNodesHealthCheckVersion`.
func supportsNodesHealthCheck(nodes []*v1.Node) bool {
for _, node := range nodes {
if !isAtLeastMinNodesHealthCheckVersion(node.Status.NodeInfo.KubeProxyVersion) {
return false
}
}
return true
}

View File

@ -0,0 +1,123 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gce
import (
"testing"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestIsAtLeastMinNodesHealthCheckVersion(t *testing.T) {
testCases := []struct {
version string
expect bool
}{
{"v1.7.1", true},
{"v1.7.0-alpha.2.597+276d289b90d322", true},
{"v1.6.0-beta.3.472+831q821c907t31a", false},
{"v1.5.2", false},
}
for _, tc := range testCases {
if res := isAtLeastMinNodesHealthCheckVersion(tc.version); res != tc.expect {
t.Errorf("%v: want %v, got %v", tc.version, tc.expect, res)
}
}
}
func TestSupportsNodesHealthCheck(t *testing.T) {
testCases := []struct {
desc string
nodes []*v1.Node
expect bool
}{
{
"All nodes support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.1",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322",
},
},
},
},
true,
},
{
"All nodes don't support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.6.0-beta.3.472+831q821c907t31a",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.5.2",
},
},
},
},
false,
},
{
"One node doesn't support nodes health check",
[]*v1.Node{
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.1",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.7.0-alpha.2.597+276d289b90d322",
},
},
},
{
Status: v1.NodeStatus{
NodeInfo: v1.NodeSystemInfo{
KubeProxyVersion: "v1.5.2",
},
},
},
},
false,
},
}
for _, tc := range testCases {
if res := supportsNodesHealthCheck(tc.nodes); res != tc.expect {
t.Errorf("%v: want %v, got %v", tc.desc, tc.expect, res)
}
}
}

View File

@ -21,7 +21,6 @@ import (
"fmt"
"net"
"net/http"
"sort"
"strconv"
"strings"
"time"
@ -140,6 +139,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
}
hostNames := nodeNames(nodes)
supportsNodesHealthCheck := supportsNodesHealthCheck(nodes)
hosts, err := gce.getInstancesByNames(hostNames)
if err != nil {
return nil, err
@ -289,13 +289,13 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
// without needing to be deleted and recreated.
if firewallExists {
glog.Infof("EnsureLoadBalancer(%v(%v)): updating firewall", loadBalancerName, serviceName)
if err := gce.updateFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.updateFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): updated firewall", loadBalancerName, serviceName)
} else {
glog.Infof("EnsureLoadBalancer(%v(%v)): creating firewall", loadBalancerName, serviceName)
if err := gce.createFirewall(loadBalancerName, gce.region, desc, sourceRanges, ports, hosts); err != nil {
if err := gce.createFirewall(makeFirewallName(loadBalancerName), gce.region, desc, sourceRanges, ports, hosts); err != nil {
return nil, err
}
glog.Infof("EnsureLoadBalancer(%v(%v)): created firewall", loadBalancerName, serviceName)
@ -310,34 +310,43 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
glog.Infof("Target pool %v for Service %v/%v doesn't exist", loadBalancerName, apiService.Namespace, apiService.Name)
}
// Ensure health checks are created for this target pool to pass to createTargetPool for health check links
// Alternately, if the service has ExternalTrafficPolicy field set from Local to Global, we need to recreate
// the target pool without health checks. This needs to be prior to the forwarding rule deletion below otherwise
// it is not possible to delete just the target pool or http health checks later.
var hcToCreate *compute.HttpHealthCheck
hcExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return nil, fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
}
// Check which health check needs to create and which health check needs to delete.
// Health check management is coupled with target pool operation to prevent leaking.
var hcToCreate, hcToDelete *compute.HttpHealthCheck
hcLocalTrafficExisting, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
return nil, fmt.Errorf("Error checking HTTP health check %s: %v", loadBalancerName, err)
return nil, fmt.Errorf("error checking HTTP health check %s: %v", loadBalancerName, err)
}
if path, healthCheckNodePort := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
glog.V(4).Infof("service %v (%v) needs health checks on :%d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path)
if err != nil {
// This logic exists to detect a transition for a pre-existing service and turn on
// the tpNeedsUpdate flag to delete/recreate fwdrule/tpool adding the health check
// to the target pool.
glog.V(2).Infof("ExternalTrafficPolicy field set to Local on new or pre-existing service")
glog.V(4).Infof("service %v (%v) needs local traffic health checks on: %d%s)", apiService.Name, loadBalancerName, healthCheckNodePort, path)
if hcLocalTrafficExisting == nil {
// This logic exists to detect a transition for non-OnlyLocal to OnlyLocal service
// turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// target pool to use local traffic health check.
glog.V(2).Infof("Updating from nodes health checks to local traffic health checks for service %v LB %v", apiService.Name, loadBalancerName)
if supportsNodesHealthCheck {
hcToDelete = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
tpNeedsUpdate = true
}
hcToCreate, err = gce.ensureHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
if err != nil {
return nil, fmt.Errorf("Failed to ensure health check for localized service %v on node port %v: %v", loadBalancerName, healthCheckNodePort, err)
}
hcToCreate = makeHttpHealthCheck(loadBalancerName, path, healthCheckNodePort)
} else {
glog.V(4).Infof("service %v does not need health checks", apiService.Name)
if err == nil {
glog.V(2).Infof("Deleting stale health checks for service %v LB %v", apiService.Name, loadBalancerName)
glog.V(4).Infof("Service %v needs nodes health checks.", apiService.Name)
if hcLocalTrafficExisting != nil {
// This logic exists to detect a transition from OnlyLocal to non-OnlyLocal service
// and turn on the tpNeedsUpdate flag to delete/recreate fwdrule/tpool updating the
// target pool to use nodes health check.
glog.V(2).Infof("Updating from local traffic health checks to nodes health checks for service %v LB %v", apiService.Name, loadBalancerName)
hcToDelete = hcLocalTrafficExisting
tpNeedsUpdate = true
}
if supportsNodesHealthCheck {
hcToCreate = makeHttpHealthCheck(makeNodesHealthCheckName(clusterID), getNodesHealthCheckPath(), GetNodesHealthCheckPort())
}
}
// Now we get to some slightly more interesting logic.
// First, neither target pools nor forwarding rules can be updated in place -
@ -357,17 +366,12 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", loadBalancerName, serviceName)
}
if tpExists && tpNeedsUpdate {
// Generate the list of health checks for this target pool to pass to deleteTargetPool
if path, _ := apiservice.GetServiceHealthCheckPathPort(apiService); path != "" {
var err error
hcExisting, err = gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
}
// Pass healthchecks to deleteTargetPool to cleanup health checks after cleaning up the target pool itself.
var hcNames []string
if hcToDelete != nil {
hcNames = append(hcNames, hcToDelete.Name)
}
// Pass healthchecks to deleteTargetPool to cleanup health checks prior to cleaning up the target pool itself.
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcExisting); err != nil {
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", loadBalancerName, err)
}
glog.Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", loadBalancerName, serviceName)
@ -381,11 +385,11 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, apiService *v1.Servi
createInstances = createInstances[:maxTargetPoolCreateInstances]
}
// Pass healthchecks to createTargetPool which needs them as health check links in the target pool
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), gce.region, createInstances, affinityType, hcToCreate); err != nil {
if err := gce.createTargetPool(loadBalancerName, serviceName.String(), ipAddress, gce.region, createInstances, affinityType, hcToCreate); err != nil {
return nil, fmt.Errorf("failed to create target pool %s: %v", loadBalancerName, err)
}
if hcToCreate != nil {
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks for target pool", loadBalancerName, serviceName)
glog.Infof("EnsureLoadBalancer(%v(%v)): created health checks %v for target pool", loadBalancerName, serviceName, hcToCreate.Name)
}
if len(hosts) <= maxTargetPoolCreateInstances {
glog.Infof("EnsureLoadBalancer(%v(%v)): created target pool", loadBalancerName, serviceName)
@ -447,18 +451,29 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
glog.V(2).Infof("EnsureLoadBalancerDeleted(%v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, loadBalancerName,
gce.region)
var hc *compute.HttpHealthCheck
var hcNames []string
if path, _ := apiservice.GetServiceHealthCheckPathPort(service); path != "" {
var err error
hc, err = gce.GetHttpHealthCheck(loadBalancerName)
hcToDelete, err := gce.GetHttpHealthCheck(loadBalancerName)
if err != nil && !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Failed to retrieve health check %v:%v", loadBalancerName, err)
return err
}
hcNames = append(hcNames, hcToDelete.Name)
} else {
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID %s: %v", loadBalancerName, err)
}
// EnsureLoadBalancerDeleted() could be triggered by changing service from
// LoadBalancer type to others. In this case we have no idea whether it was
// using local traffic health check or nodes health check. Attempt to delete
// both to prevent leaking.
hcNames = append(hcNames, loadBalancerName)
hcNames = append(hcNames, makeNodesHealthCheckName(clusterID))
}
errs := utilerrors.AggregateGoroutines(
func() error { return gce.deleteFirewall(loadBalancerName, gce.region) },
func() error { return gce.deleteFirewall(makeFirewallName(loadBalancerName), gce.region) },
// Even though we don't hold on to static IPs for load balancers, it's
// possible that EnsureLoadBalancer left one around in a failed
// creation/update attempt, so make sure we clean it up here just in case.
@ -469,7 +484,7 @@ func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.S
if err := gce.deleteForwardingRule(loadBalancerName, gce.region); err != nil {
return err
}
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hc); err != nil {
if err := gce.deleteTargetPool(loadBalancerName, gce.region, hcNames...); err != nil {
return err
}
return nil
@ -509,15 +524,15 @@ func (gce *GCECloud) deleteForwardingRule(name, region string) error {
}
// DeleteTargetPool deletes the given target pool.
func (gce *GCECloud) DeleteTargetPool(name string, hc *compute.HttpHealthCheck) error {
func (gce *GCECloud) DeleteTargetPool(name string, hcNames ...string) error {
region, err := GetGCERegion(gce.localZone)
if err != nil {
return err
}
return gce.deleteTargetPool(name, region, hc)
return gce.deleteTargetPool(name, region, hcNames...)
}
func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealthCheck) error {
func (gce *GCECloud) deleteTargetPool(name, region string, hcNames ...string) error {
mc := newTargetPoolMetricContext("delete", region)
op, err := gce.service.TargetPools.Delete(gce.projectID, region, name).Do()
@ -535,41 +550,86 @@ func (gce *GCECloud) deleteTargetPool(name, region string, hc *compute.HttpHealt
}
// Deletion of health checks is allowed only after the TargetPool reference is deleted
if hc != nil {
glog.Infof("Deleting health check %v", hc.Name)
if err := gce.DeleteHttpHealthCheck(hc.Name); err != nil {
glog.Warningf("Failed to delete health check %v: %v", hc, err)
for _, hcName := range hcNames {
if err = func() error {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hcName != name
if isNodesHealthCheck {
// Lock to prevent deleting necessary nodes health check before it gets attached
// to target pool.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
glog.Infof("Deleting health check %v", hcName)
if err := gce.DeleteHttpHealthCheck(hcName); err != nil {
// Delete nodes health checks will fail if any other target pool is using it.
if isInUsedByError(err) {
glog.V(4).Infof("Health check %v is in used: %v.", hcName, err)
return nil
} else if !isHTTPErrorCode(err, http.StatusNotFound) {
glog.Warningf("Failed to delete health check %v: %v", hcName, err)
return err
}
// StatusNotFound could happen when:
// - This is the first attempt but we pass in a healthcheck that is already deleted
// to prevent leaking.
// - This is the first attempt but user manually deleted the heathcheck.
// - This is a retry and in previous round we failed to delete the healthcheck firewall
// after deleted the healthcheck.
// We continue to delete the healthcheck firewall to prevent leaking.
glog.V(4).Infof("Health check %v is already deleted.", hcName)
}
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
}
// If health check is deleted without error, it means no load-balancer is using it.
// So we should delete the health check firewall as well.
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
glog.Infof("Deleting firewall %v.", fwName)
if err := gce.DeleteFirewall(fwName); err != nil {
if isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(4).Infof("Firewall %v is already deleted.", fwName)
return nil
}
return err
}
return nil
}(); err != nil {
return err
}
} else {
// This is a HC cleanup attempt to prevent stale HCs when errors are encountered
// during HC deletion in a prior pass through EnsureLoadBalancer.
// The HC name matches the load balancer name - normally this is expected to fail.
if err := gce.DeleteHttpHealthCheck(name); err == nil {
// We only print a warning if this deletion actually succeeded (which
// means there was indeed a stale health check with the LB name.
glog.Warningf("Deleted stale http health check for LB: %s", name)
}
}
return nil
}
func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
func (gce *GCECloud) createTargetPool(name, serviceName, ipAddress, region string, hosts []*gceInstance, affinityType v1.ServiceAffinity, hc *compute.HttpHealthCheck) error {
// health check management is coupled with targetPools to prevent leaks. A
// target pool is the only thing that requires a health check, so we delete
// associated checks on teardown, and ensure checks on setup.
hcLinks := []string{}
if hc != nil {
// Check whether it is nodes health check, which has different name from the load-balancer.
isNodesHealthCheck := hc.Name != name
if isNodesHealthCheck {
// Lock to prevent necessary nodes health check / firewall gets deleted.
gce.sharedResourceLock.Lock()
defer gce.sharedResourceLock.Unlock()
}
if err := gce.ensureHttpHealthCheckFirewall(serviceName, ipAddress, gce.region, hosts, hc.Name, int32(hc.Port), isNodesHealthCheck); err != nil {
return err
}
var err error
if hc, err = gce.ensureHttpHealthCheck(name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
if hc, err = gce.ensureHttpHealthCheck(hc.Name, hc.RequestPath, int32(hc.Port)); err != nil || hc == nil {
return fmt.Errorf("Failed to ensure health check for %v port %d path %v: %v", name, hc.Port, hc.RequestPath, err)
}
hcLinks = append(hcLinks, hc.SelfLink)
}
var instances []string
for _, host := range hosts {
instances = append(instances, makeHostURL(gce.projectID, host.Zone, host.Name))
}
glog.Infof("Creating targetpool %v with %d healthchecks", name, len(hcLinks))
pool := &compute.TargetPool{
Name: name,
@ -651,8 +711,8 @@ func (gce *GCECloud) targetPoolURL(name, region string) string {
return fmt.Sprintf("https://www.googleapis.com/compute/v1/projects/%s/regions/%s/targetPools/%s", gce.projectID, region, name)
}
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) {
newHC := &compute.HttpHealthCheck{
func makeHttpHealthCheck(name, path string, port int32) *compute.HttpHealthCheck {
return &compute.HttpHealthCheck{
Name: name,
Port: int64(port),
RequestPath: path,
@ -663,7 +723,10 @@ func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *c
HealthyThreshold: gceHcHealthyThreshold,
UnhealthyThreshold: gceHcUnhealthyThreshold,
}
}
func (gce *GCECloud) ensureHttpHealthCheck(name, path string, port int32) (hc *compute.HttpHealthCheck, err error) {
newHC := makeHttpHealthCheck(name, path, port)
hc, err = gce.GetHttpHealthCheck(name)
if hc == nil || err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.Infof("Did not find health check %v, creating port %v path %v", name, port, path)
@ -843,7 +906,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
if isHTTPErrorCode(err, http.StatusNotFound) {
return false, true, nil
}
return false, false, fmt.Errorf("error getting load balancer's target pool: %v", err)
return false, false, fmt.Errorf("error getting load balancer's firewall: %v", err)
}
if fw.Description != makeFirewallDescription(serviceName, ipAddress) {
return true, true, nil
@ -856,7 +919,7 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
for ix := range ports {
allowedPorts[ix] = strconv.Itoa(int(ports[ix].Port))
}
if !slicesEqual(allowedPorts, fw.Allowed[0].Ports) {
if !equalStringSets(allowedPorts, fw.Allowed[0].Ports) {
return true, true, nil
}
// The service controller already verified that the protocol matches on all ports, no need to check.
@ -875,18 +938,47 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
return true, false, nil
}
func slicesEqual(x, y []string) bool {
if len(x) != len(y) {
return false
func (gce *GCECloud) ensureHttpHealthCheckFirewall(serviceName, ipAddress, region string, hosts []*gceInstance, hcName string, hcPort int32, isNodesHealthCheck bool) error {
clusterID, err := gce.ClusterID.GetID()
if err != nil {
return fmt.Errorf("error getting cluster ID: %v", err)
}
sort.Strings(x)
sort.Strings(y)
for i := range x {
if x[i] != y[i] {
return false
// Prepare the firewall params for creating / checking.
desc := fmt.Sprintf(`{"kubernetes.io/cluster-id":"%s"}`, clusterID)
if !isNodesHealthCheck {
desc = makeFirewallDescription(serviceName, ipAddress)
}
sourceRanges := lbSrcRngsFlag.ipn
ports := []v1.ServicePort{{Protocol: "tcp", Port: hcPort}}
fwName := MakeHealthCheckFirewallName(clusterID, hcName, isNodesHealthCheck)
fw, err := gce.service.Firewalls.Get(gce.projectID, fwName).Do()
if err != nil {
if !isHTTPErrorCode(err, http.StatusNotFound) {
return fmt.Errorf("error getting firewall for health checks: %v", err)
}
glog.Infof("Creating firewall %v for health checks.", fwName)
if err := gce.createFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
return err
}
glog.Infof("Created firewall %v for health checks.", fwName)
return nil
}
return true
// Validate firewall fields.
if fw.Description != desc ||
len(fw.Allowed) != 1 ||
fw.Allowed[0].IPProtocol != string(ports[0].Protocol) ||
!equalStringSets(fw.Allowed[0].Ports, []string{string(ports[0].Port)}) ||
!equalStringSets(fw.SourceRanges, sourceRanges.StringSlice()) {
glog.Warningf("Firewall %v exists but parameters have drifted - updating...", fwName)
if err := gce.updateFirewall(fwName, region, desc, sourceRanges, ports, hosts); err != nil {
glog.Warningf("Failed to reconcile firewall %v parameters.", fwName)
return err
}
glog.V(4).Infof("Corrected firewall %v parameters successful", fwName)
}
return nil
}
func (gce *GCECloud) createForwardingRule(name, serviceName, region, ipAddress string, ports []v1.ServicePort) error {
@ -942,7 +1034,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges nets
if err != nil {
return mc.Observe(err)
}
op, err := gce.service.Firewalls.Update(gce.projectID, makeFirewallName(name), firewall).Do()
op, err := gce.service.Firewalls.Update(gce.projectID, name, firewall).Do()
if err != nil && !isHTTPErrorCode(err, http.StatusConflict) {
return mc.Observe(err)
}
@ -971,7 +1063,7 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges nets
}
firewall := &compute.Firewall{
Name: makeFirewallName(name),
Name: name,
Description: desc,
Network: gce.networkURL,
SourceRanges: sourceRanges.StringSlice(),
@ -1141,17 +1233,16 @@ func (gce *GCECloud) ensureStaticIP(name, serviceName, region, existingIP string
func (gce *GCECloud) deleteFirewall(name, region string) error {
mc := newFirewallMetricContext("delete", region)
fwName := makeFirewallName(name)
op, err := gce.service.Firewalls.Delete(gce.projectID, fwName).Do()
op, err := gce.service.Firewalls.Delete(gce.projectID, name).Do()
if err != nil && isHTTPErrorCode(err, http.StatusNotFound) {
glog.V(2).Infof("Firewall %s already deleted. Continuing to delete other resources.", name)
} else if err != nil {
glog.Warningf("Failed to delete firewall %s, got error %v", fwName, err)
glog.Warningf("Failed to delete firewall %s, got error %v", name, err)
return mc.Observe(err)
} else {
if err := gce.waitForGlobalOp(op, mc); err != nil {
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", fwName, err)
glog.Warningf("Failed waiting for Firewall %s to be deleted. Got error: %v", name, err)
return err
}
}

View File

@ -19,10 +19,12 @@ package gce
import (
"errors"
"fmt"
"net/http"
"regexp"
"strings"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"cloud.google.com/go/compute/metadata"
compute "google.golang.org/api/compute/v1"
@ -105,6 +107,14 @@ func isHTTPErrorCode(err error, code int) bool {
return ok && apiErr.Code == code
}
func isInUsedByError(err error) bool {
apiErr, ok := err.(*googleapi.Error)
if !ok || apiErr.Code != http.StatusBadRequest {
return false
}
return strings.Contains(apiErr.Message, "being used by")
}
// splitProviderID splits a provider's id into core components.
// A providerID is build out of '${ProviderName}://${project-id}/${zone}/${instance-name}'
// See cloudprovider.GetInstanceProviderID.
@ -115,3 +125,12 @@ func splitProviderID(providerID string) (project, zone, instance string, err err
}
return matches[1], matches[2], matches[3], nil
}
func equalStringSets(x, y []string) bool {
if len(x) != len(y) {
return false
}
xString := sets.NewString(x...)
yString := sets.NewString(y...)
return xString.Equal(yString)
}