From 7ffb123abebb93c2483ff500863b30f139ca2fbe Mon Sep 17 00:00:00 2001 From: Minhan Xia Date: Wed, 17 Feb 2016 14:15:49 -0800 Subject: [PATCH] add source range support for loadbalancer on gce --- pkg/cloudprovider/cloud.go | 9 ++- pkg/cloudprovider/providers/aws/aws.go | 4 +- pkg/cloudprovider/providers/aws/aws_test.go | 2 +- pkg/cloudprovider/providers/fake/fake.go | 2 +- pkg/cloudprovider/providers/gce/gce.go | 67 ++++++++++++++----- .../providers/openstack/openstack.go | 4 +- pkg/controller/service/servicecontroller.go | 9 ++- 7 files changed, 73 insertions(+), 24 deletions(-) diff --git a/pkg/cloudprovider/cloud.go b/pkg/cloudprovider/cloud.go index e145b26dea..7141441729 100644 --- a/pkg/cloudprovider/cloud.go +++ b/pkg/cloudprovider/cloud.go @@ -84,7 +84,7 @@ type LoadBalancer interface { // if so, what its status is. GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error) // EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer - EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) + EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations ServiceAnnotation) (*api.LoadBalancerStatus, error) // UpdateLoadBalancer updates hosts under the specified load balancer. UpdateLoadBalancer(name, region string, hosts []string) error // EnsureLoadBalancerDeleted deletes the specified load balancer if it @@ -160,3 +160,10 @@ type Zones interface { // GetZone returns the Zone containing the current failure zone and locality region that the program is running in GetZone() (Zone, error) } + +type ServiceAnnotation map[string]string + +func (s ServiceAnnotation) GetValue(key string) (string, bool) { + val, ok := s[key] + return val, ok +} diff --git a/pkg/cloudprovider/providers/aws/aws.go b/pkg/cloudprovider/providers/aws/aws.go index 811c2e107e..93f42b9568 100644 --- a/pkg/cloudprovider/providers/aws/aws.go +++ b/pkg/cloudprovider/providers/aws/aws.go @@ -1818,8 +1818,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) { // EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer // TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway. -func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName) +func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) { + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations) if region != s.region { return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region) diff --git a/pkg/cloudprovider/providers/aws/aws_test.go b/pkg/cloudprovider/providers/aws/aws_test.go index 3151c1246b..58c6d062a6 100644 --- a/pkg/cloudprovider/providers/aws/aws_test.go +++ b/pkg/cloudprovider/providers/aws/aws_test.go @@ -715,7 +715,7 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) { serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"} - _, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone) + _, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil) if err == nil || err.Error() != errorMessage { t.Errorf("Expected EnsureLoadBalancer region mismatch error.") } diff --git a/pkg/cloudprovider/providers/fake/fake.go b/pkg/cloudprovider/providers/fake/fake.go index 498dce53c3..a3cd416f37 100644 --- a/pkg/cloudprovider/providers/fake/fake.go +++ b/pkg/cloudprovider/providers/fake/fake.go @@ -132,7 +132,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu // EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer. // It adds an entry "create" into the internal method call record. -func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) { f.addCall("create") if f.Balancers == nil { f.Balancers = make(map[string]FakeBalancer) diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index f7bd672fb1..d970a040a1 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -61,8 +61,30 @@ const ( operationPollInterval = 3 * time.Second operationPollTimeoutDuration = 30 * time.Minute + + defaultLBSourceRange = "0.0.0.0/0" + + //Expected annotations for GCE + gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges" ) +//validateAllowSourceRange validates annotation of allow source ranges +func validateSourceRangeAnnotation(annotation cloudprovider.ServiceAnnotation) error { + val := annotation[gceLBAllowSourceRange] + errMsg := fmt.Errorf("Service annotation %s:%s is not valid. Expecting source IP ranges. Comma Seperated. For example, 0.0.0.0/0,192.168.2.0/24", gceLBAllowSourceRange, val) + ranges := strings.Split(val, ",") + if len(ranges) <= 0 { + return errMsg + } + for _, subnet := range ranges { + _, _, err := net.ParseCIDR(subnet) + if err != nil { + return errMsg + } + } + return nil +} + // GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine. type GCECloud struct { service *compute.Service @@ -432,12 +454,12 @@ func isHTTPErrorCode(err error, code int) bool { // Due to an interesting series of design decisions, this handles both creating // new load balancers and updating existing load balancers, recognizing when // each is needed. -func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) { +func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) { portStr := []string{} for _, p := range ports { portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port)) } - glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName) + glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations) if len(hostNames) == 0 { return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts") @@ -555,7 +577,17 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // Deal with the firewall next. The reason we do this here rather than last // is because the forwarding rule is used as the indicator that the load // balancer is fully created - it's what getLoadBalancer checks for. - firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports) + // Check if user specified the allow source range + sourceRanges := []string{defaultLBSourceRange} + val, ok := annotations.GetValue(gceLBAllowSourceRange) + if ok { + if err := validateSourceRangeAnnotation(annotations); err != nil { + return nil, err + } + sourceRanges = strings.Split(val, ",") + } + + firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports, sourceRanges) if err != nil { return nil, err } @@ -565,12 +597,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, // Unlike forwarding rules and target pools, firewalls can be updated // without needing to be deleted and recreated. if firewallExists { - if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil { + if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName) } else { - if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil { + if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil { return nil, err } glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName) @@ -713,7 +745,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string { } } -func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) { +func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) { fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do() if err != nil { if isHTTPErrorCode(err, http.StatusNotFound) { @@ -737,6 +769,9 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [ } // The service controller already verified that the protocol matches on all ports, no need to check. + if !slicesEqual(fw.SourceRanges, sourceRanges) { + return true, true, nil + } return true, false, nil } @@ -810,8 +845,8 @@ func (gce *GCECloud) createTargetPool(name, region string, hosts []*gceInstance, return nil } -func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error { - firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts) +func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error { + firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return err } @@ -828,8 +863,8 @@ func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports [ return nil } -func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error { - firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts) +func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error { + firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts) if err != nil { return err } @@ -846,7 +881,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports [ return nil } -func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { +func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) { allowedPorts := make([]string, len(ports)) for ix := range ports { allowedPorts[ix] = strconv.Itoa(ports[ix].Port) @@ -859,7 +894,7 @@ func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports [ Name: makeFirewallName(name), Description: desc, Network: gce.networkURL, - SourceRanges: []string{srcRange}, + SourceRanges: sourceRanges, TargetTags: hostTags, Allowed: []*compute.FirewallAllowed{ { @@ -1138,7 +1173,7 @@ func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) { } // CreateFirewall creates the given firewall rule. -func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error { +func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error { region, err := GetGCERegion(gce.localZone) if err != nil { return err @@ -1153,7 +1188,7 @@ func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64, if err != nil { return err } - return gce.createFirewall(name, region, desc, srcRange, svcPorts, hosts) + return gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts) } // DeleteFirewall deletes the given firewall rule. @@ -1167,7 +1202,7 @@ func (gce *GCECloud) DeleteFirewall(name string) error { // UpdateFirewall applies the given firewall rule as an update to an existing // firewall rule with the same name. -func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error { +func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error { region, err := GetGCERegion(gce.localZone) if err != nil { return err @@ -1182,7 +1217,7 @@ func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64, if err != nil { return err } - return gce.updateFirewall(name, region, desc, srcRange, svcPorts, hosts) + return gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts) } // Global static IP management diff --git a/pkg/cloudprovider/providers/openstack/openstack.go b/pkg/cloudprovider/providers/openstack/openstack.go index 2a9376c2b7..7bb9ceff3d 100644 --- a/pkg/cloudprovider/providers/openstack/openstack.go +++ b/pkg/cloudprovider/providers/openstack/openstack.go @@ -660,8 +660,8 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS // a list of regions (from config) and query/create loadbalancers in // each region. -func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) { - glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName) +func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) { + glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations) if len(ports) > 1 { return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers") diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 7cec070bf3..a0ebe3acce 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" + "reflect" ) const ( @@ -247,6 +248,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { } message += err.Error() s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message) + return err, retry } // Always update the cache upon success. @@ -321,6 +323,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name // The load balancer doesn't exist yet, so create it. s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer") + err := s.createLoadBalancer(service, namespacedName) if err != nil { return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable @@ -385,7 +388,7 @@ func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP), - ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity) + ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations) if err != nil { return err } else { @@ -488,6 +491,10 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api return true } } + if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) { + return true + } + return false }