mirror of https://github.com/k3s-io/k3s
AWS: Add support for load balancer source ranges
This refactors #21431 to pull a lot of the code into cloudprovider so it can be reused by AWS. It also changes the name of the annotation to be non-GCE specific: service.beta.kubernetes.io/load-balancer-source-ranges Fix #21651pull/6/head
parent
cbf2b269ad
commit
49e1149227
|
@ -26,6 +26,20 @@ import (
|
|||
"k8s.io/kubernetes/pkg/types"
|
||||
)
|
||||
|
||||
const (
|
||||
// The value of a LBAnnotationAllowSourceRange annotation determines
|
||||
// the source IP ranges to allow to access a service exposed as
|
||||
// type=LoadBalancer (when accesssed through the LoadBalancer created
|
||||
// by the cloud provider).
|
||||
//
|
||||
// It should be a comma-separated list of CIDRs, e.g. `0.0.0.0/0` to
|
||||
// allow full access (the default) or `18.0.0.0/8,56.0.0.0/8` to allow
|
||||
// access only from the CIDRs currently allocated to MIT & the USPS.
|
||||
//
|
||||
// Not all cloud providers support this annotation, though AWS & GCE do.
|
||||
LBAnnotationAllowSourceRange = "service.beta.kubernetes.io/load-balancer-source-ranges"
|
||||
)
|
||||
|
||||
// Interface is an abstract, pluggable interface for cloud providers.
|
||||
type Interface interface {
|
||||
// LoadBalancer returns a balancer interface. Also returns true if the interface is supported, false otherwise.
|
||||
|
@ -84,7 +98,7 @@ type LoadBalancer interface {
|
|||
// if so, what its status is.
|
||||
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations ServiceAnnotation) (*api.LoadBalancerStatus, error)
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error)
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
UpdateLoadBalancer(name, region string, hosts []string) error
|
||||
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
|
||||
|
@ -160,10 +174,3 @@ type Zones interface {
|
|||
// GetZone returns the Zone containing the current failure zone and locality region that the program is running in
|
||||
GetZone() (Zone, error)
|
||||
}
|
||||
|
||||
type ServiceAnnotation map[string]string
|
||||
|
||||
func (s ServiceAnnotation) GetValue(key string) (string, bool) {
|
||||
val, ok := s[key]
|
||||
return val, ok
|
||||
}
|
||||
|
|
|
@ -1948,7 +1948,7 @@ func isSubnetPublic(rt []*ec2.RouteTable, subnetID string) (bool, error) {
|
|||
|
||||
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
|
||||
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations)
|
||||
|
||||
if region != s.region {
|
||||
|
@ -1979,6 +1979,11 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
|||
return nil, err
|
||||
}
|
||||
|
||||
sourceRanges, err := cloudprovider.GetSourceRangeAnnotations(annotations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
vpcId, err := s.findVPCID()
|
||||
if err != nil {
|
||||
glog.Error("Error finding VPC", err)
|
||||
|
@ -2003,16 +2008,20 @@ func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, port
|
|||
return nil, err
|
||||
}
|
||||
|
||||
ec2SourceRanges := []*ec2.IpRange{}
|
||||
for _, sourceRange := range sourceRanges.StringSlice() {
|
||||
ec2SourceRanges = append(ec2SourceRanges, &ec2.IpRange{CidrIp: aws.String(sourceRange)})
|
||||
}
|
||||
|
||||
permissions := []*ec2.IpPermission{}
|
||||
for _, port := range ports {
|
||||
portInt64 := int64(port.Port)
|
||||
protocol := strings.ToLower(string(port.Protocol))
|
||||
sourceIp := "0.0.0.0/0"
|
||||
|
||||
permission := &ec2.IpPermission{}
|
||||
permission.FromPort = &portInt64
|
||||
permission.ToPort = &portInt64
|
||||
permission.IpRanges = []*ec2.IpRange{{CidrIp: &sourceIp}}
|
||||
permission.IpRanges = ec2SourceRanges
|
||||
permission.IpProtocol = &protocol
|
||||
|
||||
permissions = append(permissions, permission)
|
||||
|
|
|
@ -132,7 +132,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
|
|||
|
||||
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
if f.Balancers == nil {
|
||||
f.Balancers = make(map[string]FakeBalancer)
|
||||
|
|
|
@ -62,34 +62,12 @@ const (
|
|||
operationPollInterval = 3 * time.Second
|
||||
operationPollTimeoutDuration = 30 * time.Minute
|
||||
|
||||
defaultLBSourceRange = "0.0.0.0/0"
|
||||
|
||||
//Expected annotations for GCE
|
||||
gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges"
|
||||
|
||||
// Each page can have 500 results, but we cap how many pages
|
||||
// are iterated through to prevent infinite loops if the API
|
||||
// were to continuously return a nextPageToken.
|
||||
maxPages = 25
|
||||
)
|
||||
|
||||
//validateAllowSourceRange validates annotation of allow source ranges
|
||||
func validateSourceRangeAnnotation(annotation cloudprovider.ServiceAnnotation) error {
|
||||
val := annotation[gceLBAllowSourceRange]
|
||||
errMsg := fmt.Errorf("Service annotation %s:%s is not valid. Expecting source IP ranges. Comma Seperated. For example, 0.0.0.0/0,192.168.2.0/24", gceLBAllowSourceRange, val)
|
||||
ranges := strings.Split(val, ",")
|
||||
if len(ranges) <= 0 {
|
||||
return errMsg
|
||||
}
|
||||
for _, subnet := range ranges {
|
||||
_, _, err := net.ParseCIDR(subnet)
|
||||
if err != nil {
|
||||
return errMsg
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
|
||||
type GCECloud struct {
|
||||
service *compute.Service
|
||||
|
@ -474,7 +452,7 @@ func isHTTPErrorCode(err error, code int) bool {
|
|||
// Due to an interesting series of design decisions, this handles both creating
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, svc types.NamespacedName, affinityType api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
|
||||
portStr := []string{}
|
||||
for _, p := range ports {
|
||||
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
|
||||
|
@ -599,13 +577,9 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
// is because the forwarding rule is used as the indicator that the load
|
||||
// balancer is fully created - it's what getLoadBalancer checks for.
|
||||
// Check if user specified the allow source range
|
||||
sourceRanges := []string{defaultLBSourceRange}
|
||||
val, ok := annotations.GetValue(gceLBAllowSourceRange)
|
||||
if ok {
|
||||
if err := validateSourceRangeAnnotation(annotations); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sourceRanges = strings.Split(val, ",")
|
||||
sourceRanges, err := cloudprovider.GetSourceRangeAnnotations(annotations)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, serviceName, region, ipAddress, ports, sourceRanges)
|
||||
|
@ -766,7 +740,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
|
|||
}
|
||||
}
|
||||
|
||||
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) {
|
||||
func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress string, ports []*api.ServicePort, sourceRanges cloudprovider.IPNetSet) (exists bool, needsUpdate bool, err error) {
|
||||
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
|
||||
if err != nil {
|
||||
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
|
@ -790,7 +764,15 @@ func (gce *GCECloud) firewallNeedsUpdate(name, serviceName, region, ipAddress st
|
|||
}
|
||||
// The service controller already verified that the protocol matches on all ports, no need to check.
|
||||
|
||||
if !slicesEqual(fw.SourceRanges, sourceRanges) {
|
||||
actualSourceRanges, err := cloudprovider.ParseIPNetSet(fw.SourceRanges)
|
||||
if err != nil {
|
||||
// This really shouldn't happen... GCE has returned something unexpected
|
||||
glog.Warningf("Error parsing firewall SourceRanges: %v", fw.SourceRanges)
|
||||
// We don't return the error, because we can hopefully recover from this by reconfiguring the firewall
|
||||
return true, true, nil
|
||||
}
|
||||
|
||||
if !sourceRanges.Equal(actualSourceRanges) {
|
||||
return true, true, nil
|
||||
}
|
||||
return true, false, nil
|
||||
|
@ -870,7 +852,7 @@ func (gce *GCECloud) createTargetPool(name, serviceName, region string, hosts []
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges cloudprovider.IPNetSet, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -888,7 +870,7 @@ func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges []st
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges cloudprovider.IPNetSet, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -906,7 +888,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges []st
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
|
||||
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges cloudprovider.IPNetSet, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
|
||||
allowedPorts := make([]string, len(ports))
|
||||
for ix := range ports {
|
||||
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
|
||||
|
@ -919,7 +901,7 @@ func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges []st
|
|||
Name: makeFirewallName(name),
|
||||
Description: desc,
|
||||
Network: gce.networkURL,
|
||||
SourceRanges: sourceRanges,
|
||||
SourceRanges: sourceRanges.StringSlice(),
|
||||
TargetTags: hostTags,
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
|
@ -1224,7 +1206,7 @@ func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
|
|||
}
|
||||
|
||||
// CreateFirewall creates the given firewall rule.
|
||||
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
|
||||
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges cloudprovider.IPNetSet, ports []int64, hostNames []string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1253,7 +1235,7 @@ func (gce *GCECloud) DeleteFirewall(name string) error {
|
|||
|
||||
// UpdateFirewall applies the given firewall rule as an update to an existing
|
||||
// firewall rule with the same name.
|
||||
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
|
||||
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges cloudprovider.IPNetSet, ports []int64, hostNames []string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
if err != nil {
|
||||
return err
|
||||
|
|
|
@ -660,7 +660,7 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
|
|||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations map[string]string) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations)
|
||||
|
||||
if len(ports) > 1 {
|
||||
|
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
Copyright 2016 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cloudprovider
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"strings"
|
||||
)
|
||||
|
||||
const (
|
||||
defaultLBSourceRange = "0.0.0.0/0"
|
||||
)
|
||||
|
||||
type IPNetSet map[string]*net.IPNet
|
||||
|
||||
func ParseIPNetSet(specs []string) (IPNetSet, error) {
|
||||
ipnetset := make(IPNetSet)
|
||||
for _, spec := range specs {
|
||||
spec = strings.TrimSpace(spec)
|
||||
_, ipnet, err := net.ParseCIDR(spec)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
k := ipnet.String() // In case of normalization
|
||||
ipnetset[k] = ipnet
|
||||
}
|
||||
return ipnetset, nil
|
||||
}
|
||||
|
||||
// StringSlice returns a []string with the String representation of each element in the set.
|
||||
// Order is undefined.
|
||||
func (s IPNetSet) StringSlice() []string {
|
||||
a := make([]string, 0, len(s))
|
||||
for k := range s {
|
||||
a = append(a, k)
|
||||
}
|
||||
return a
|
||||
}
|
||||
|
||||
// Equal checks if two IPNetSets are equal (ignoring order)
|
||||
func (l IPNetSet) Equal(r IPNetSet) bool {
|
||||
if len(l) != len(r) {
|
||||
return false
|
||||
}
|
||||
|
||||
for k := range l {
|
||||
_, found := r[k]
|
||||
if !found {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// GetSourceRangeAnnotations verifies and parses the LBAnnotationAllowSourceRange annotation from a service,
|
||||
// extracting the source ranges to allow, and if not present returns a default (allow-all) value.
|
||||
func GetSourceRangeAnnotations(annotation map[string]string) (IPNetSet, error) {
|
||||
val := annotation[LBAnnotationAllowSourceRange]
|
||||
val = strings.TrimSpace(val)
|
||||
if val == "" {
|
||||
val = defaultLBSourceRange
|
||||
}
|
||||
specs := strings.Split(val, ",")
|
||||
ipnets, err := ParseIPNetSet(specs)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Service annotation %s:%s is not valid. Expecting a comma-separated list of source IP ranges. For example, 10.0.0.0/24,192.168.2.0/24", LBAnnotationAllowSourceRange, val)
|
||||
}
|
||||
return ipnets, nil
|
||||
}
|
Loading…
Reference in New Issue