mirror of https://github.com/k3s-io/k3s
Merge pull request #20731 from Clarifai/ensure-lb-servicename
Auto commit by PR queue botpull/6/head
commit
9520bb5ddf
|
@ -23,6 +23,7 @@ import (
|
|||
"strings"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
// Interface is an abstract, pluggable interface for cloud providers.
|
||||
|
@ -82,8 +83,8 @@ type LoadBalancer interface {
|
|||
// GetLoadBalancer returns whether the specified load balancer exists, and
|
||||
// if so, what its status is.
|
||||
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
|
||||
// EnsureLoadBalancer creates a new load balancer, or updates an existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
UpdateLoadBalancer(name, region string, hosts []string) error
|
||||
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
|
||||
|
|
|
@ -43,6 +43,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/credentialprovider/aws"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
@ -54,6 +55,9 @@ const ProviderName = "aws"
|
|||
// The tag name we use to differentiate multiple logically independent clusters running in the same AZ
|
||||
const TagNameKubernetesCluster = "KubernetesCluster"
|
||||
|
||||
// The tag name we use to differentiate multiple services. Used currently for ELBs only.
|
||||
const TagNameKubernetesService = "kubernetes.io/service-name"
|
||||
|
||||
// We sometimes read to see if something exists; then try to create it if we didn't find it
|
||||
// This can fail once in a consistent system if done in parallel
|
||||
// In an eventually consistent system, it could fail unboundedly
|
||||
|
@ -1775,8 +1779,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {
|
|||
|
||||
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
|
||||
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts)
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName)
|
||||
|
||||
if region != s.region {
|
||||
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
|
||||
|
@ -1823,7 +1827,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
|||
var securityGroupID string
|
||||
{
|
||||
sgName := "k8s-elb-" + name
|
||||
sgDescription := "Security group for Kubernetes ELB " + name
|
||||
sgDescription := fmt.Sprintf("Security group for Kubernetes ELB %s (%v)", name, serviceName)
|
||||
securityGroupID, err = s.ensureSecurityGroup(sgName, sgDescription, vpcId)
|
||||
if err != nil {
|
||||
glog.Error("Error creating load balancer security group: ", err)
|
||||
|
@ -1872,7 +1876,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
|||
}
|
||||
|
||||
// Build the load balancer itself
|
||||
loadBalancer, err := s.ensureLoadBalancer(name, listeners, subnetIDs, securityGroupIDs)
|
||||
loadBalancer, err := s.ensureLoadBalancer(serviceName, name, listeners, subnetIDs, securityGroupIDs)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -1894,7 +1898,7 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
|||
return nil, err
|
||||
}
|
||||
|
||||
glog.V(1).Infof("Loadbalancer %s has DNS name %s", name, orEmpty(loadBalancer.DNSName))
|
||||
glog.V(1).Infof("Loadbalancer %s (%v) has DNS name %s", name, serviceName, orEmpty(loadBalancer.DNSName))
|
||||
|
||||
// TODO: Wait for creation?
|
||||
|
||||
|
@ -2328,3 +2332,8 @@ func (s *AWSCloud) addFilters(filters []*ec2.Filter) []*ec2.Filter {
|
|||
}
|
||||
return filters
|
||||
}
|
||||
|
||||
// Returns the cluster name or an empty string
|
||||
func (s *AWSCloud) getClusterName() string {
|
||||
return s.filterTags[TagNameKubernetesCluster]
|
||||
}
|
||||
|
|
|
@ -24,10 +24,11 @@ import (
|
|||
"github.com/aws/aws-sdk-go/service/ec2"
|
||||
"github.com/aws/aws-sdk-go/service/elb"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
)
|
||||
|
||||
func (s *AWSCloud) ensureLoadBalancer(name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
|
||||
func (s *AWSCloud) ensureLoadBalancer(namespacedName types.NamespacedName, name string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string) (*elb.LoadBalancerDescription, error) {
|
||||
loadBalancer, err := s.describeLoadBalancer(name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -47,7 +48,12 @@ func (s *AWSCloud) ensureLoadBalancer(name string, listeners []*elb.Listener, su
|
|||
|
||||
createRequest.SecurityGroups = stringPointerArray(securityGroupIDs)
|
||||
|
||||
glog.Info("Creating load balancer with name: ", name)
|
||||
createRequest.Tags = []*elb.Tag{
|
||||
{Key: aws.String(TagNameKubernetesCluster), Value: aws.String(s.getClusterName())},
|
||||
{Key: aws.String(TagNameKubernetesService), Value: aws.String(namespacedName.String())},
|
||||
}
|
||||
|
||||
glog.Infof("Creating load balancer for %v with name: ", namespacedName, name)
|
||||
_, err := s.elb.CreateLoadBalancer(createRequest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/aws/aws-sdk-go/service/autoscaling"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const TestClusterId = "clusterid.test"
|
||||
|
@ -712,7 +713,9 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
|
|||
t.Errorf("Expected GetLoadBalancer region mismatch error.")
|
||||
}
|
||||
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, api.ServiceAffinityNone)
|
||||
serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}
|
||||
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone)
|
||||
if err == nil || err.Error() != errorMessage {
|
||||
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const ProviderName = "fake"
|
||||
|
@ -130,7 +131,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
|
|||
|
||||
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
if f.Balancers == nil {
|
||||
f.Balancers = make(map[string]FakeBalancer)
|
||||
|
|
|
@ -32,6 +32,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
utilerrors "k8s.io/kubernetes/pkg/util/errors"
|
||||
"k8s.io/kubernetes/pkg/util/sets"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
@ -431,12 +432,12 @@ func isHTTPErrorCode(err error, code int) bool {
|
|||
// Due to an interesting series of design decisions, this handles both creating
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
portStr := []string{}
|
||||
for _, p := range ports {
|
||||
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames)
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName)
|
||||
|
||||
if len(hostNames) == 0 {
|
||||
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
|
||||
|
@ -483,11 +484,11 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
}
|
||||
if isSafeToReleaseIP {
|
||||
if err := gce.deleteStaticIP(name, region); err != nil {
|
||||
glog.Errorf("failed to release static IP %s for load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
glog.Errorf("failed to release static IP %s for load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v): released static IP %s", name, ipAddress)
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v(%v)): released static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v, %v): %v", ipAddress, name, region, err)
|
||||
glog.Warningf("orphaning static IP %s during update of load balancer (%v(%v), %v): %v", ipAddress, name, serviceName, region, err)
|
||||
}
|
||||
}()
|
||||
|
||||
|
@ -504,7 +505,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
isUserOwnedIP = true
|
||||
isSafeToReleaseIP = false
|
||||
ipAddress = requestedIP.String()
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided static IP %s", name, serviceName, ipAddress)
|
||||
} else if requestedIP.String() == fwdRuleIP {
|
||||
// The requested IP is not a static IP, but is currently assigned
|
||||
// to this forwarding rule, so we can keep it.
|
||||
|
@ -514,13 +515,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to ensure static IP %s: %v", fwdRuleIP, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): using user-provided non-static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): using user-provided non-static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
// The requested IP is not static and it is not assigned to the
|
||||
// current forwarding rule. It might be attached to a different
|
||||
// rule or it might not be part of this project at all. Either
|
||||
// way, we can't use it.
|
||||
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s: %v", requestedIP.String(), name, err)
|
||||
return nil, fmt.Errorf("requested ip %s is neither static nor assigned to LB %s(%v): %v", requestedIP.String(), name, serviceName, err)
|
||||
}
|
||||
} else {
|
||||
// The user did not request a specific IP.
|
||||
|
@ -541,13 +542,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
// use this IP and try to run through the process again, but we
|
||||
// should not release the IP unless it is explicitly flagged as OK.
|
||||
isSafeToReleaseIP = false
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): adopting static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): adopting static IP %s", name, serviceName, ipAddress)
|
||||
} else {
|
||||
// For total clarity. The IP did not pre-exist and the user did
|
||||
// not ask for a particular one, so we can release the IP in case
|
||||
// of failure or success.
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): allocated static IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): allocated static IP %s", name, serviceName, ipAddress)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -567,12 +568,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): updated firewall", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName)
|
||||
} else {
|
||||
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created firewall", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -596,13 +597,13 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
if err := gce.deleteForwardingRule(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing forwarding rule %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted forwarding rule", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted forwarding rule", name, serviceName)
|
||||
}
|
||||
if tpExists && tpNeedsUpdate {
|
||||
if err := gce.deleteTargetPool(name, region); err != nil {
|
||||
return nil, fmt.Errorf("failed to delete existing target pool %s for load balancer update: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): deleted target pool", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): deleted target pool", name, serviceName)
|
||||
}
|
||||
|
||||
// Once we've deleted the resources (if necessary), build them back up (or for
|
||||
|
@ -611,7 +612,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
if err := gce.createTargetPool(name, region, hosts, affinityType); err != nil {
|
||||
return nil, fmt.Errorf("failed to create target pool %s: %v", name, err)
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created target pool", name)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created target pool", name, serviceName)
|
||||
}
|
||||
if tpNeedsUpdate || fwdRuleNeedsUpdate {
|
||||
if err := gce.createForwardingRule(name, region, ipAddress, ports); err != nil {
|
||||
|
@ -622,7 +623,7 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
// of a user-requested IP, the "is user-owned" flag will be set,
|
||||
// preventing it from actually being released.
|
||||
isSafeToReleaseIP = true
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v): created forwarding rule, IP %s", name, ipAddress)
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created forwarding rule, IP %s", name, serviceName, ipAddress)
|
||||
}
|
||||
|
||||
status := &api.LoadBalancerStatus{}
|
||||
|
|
|
@ -46,6 +46,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/resource"
|
||||
"k8s.io/kubernetes/pkg/cloudprovider"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const ProviderName = "openstack"
|
||||
|
@ -654,8 +655,8 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
|
|||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, affinity)
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName)
|
||||
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
|
|
|
@ -321,7 +321,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
|||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||
err := s.createLoadBalancer(service)
|
||||
err := s.createLoadBalancer(service, namespacedName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
||||
}
|
||||
|
@ -371,7 +371,7 @@ func (s *ServiceController) persistUpdate(service *api.Service) error {
|
|||
return err
|
||||
}
|
||||
|
||||
func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
||||
func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName types.NamespacedName) error {
|
||||
ports, err := getPortsForLB(service)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -385,7 +385,7 @@ func (s *ServiceController) createLoadBalancer(service *api.Service) error {
|
|||
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||
// an error for unsupported protocols
|
||||
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
|
||||
ports, hostsFromNodeList(&nodes), service.Spec.SessionAffinity)
|
||||
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
|
Loading…
Reference in New Issue