mirror of https://github.com/k3s-io/k3s
commit
2e3053a204
|
@ -84,7 +84,7 @@ type LoadBalancer interface {
|
|||
// if so, what its status is.
|
||||
GetLoadBalancer(name, region string) (status *api.LoadBalancerStatus, exists bool, err error)
|
||||
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error)
|
||||
EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations ServiceAnnotation) (*api.LoadBalancerStatus, error)
|
||||
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
||||
UpdateLoadBalancer(name, region string, hosts []string) error
|
||||
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
|
||||
|
@ -160,3 +160,10 @@ type Zones interface {
|
|||
// GetZone returns the Zone containing the current failure zone and locality region that the program is running in
|
||||
GetZone() (Zone, error)
|
||||
}
|
||||
|
||||
type ServiceAnnotation map[string]string
|
||||
|
||||
func (s ServiceAnnotation) GetValue(key string) (string, bool) {
|
||||
val, ok := s[key]
|
||||
return val, ok
|
||||
}
|
||||
|
|
|
@ -1820,8 +1820,8 @@ func (s *AWSCloud) listSubnetIDsinVPC(vpcId string) ([]string, error) {
|
|||
|
||||
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
|
||||
// TODO(justinsb) It is weird that these take a region. I suspect it won't work cross-region anyway.
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName)
|
||||
func (s *AWSCloud) EnsureLoadBalancer(name, region string, publicIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, publicIP, ports, hosts, serviceName, annotations)
|
||||
|
||||
if region != s.region {
|
||||
return nil, fmt.Errorf("requested load balancer region '%s' does not match cluster region '%s'", region, s.region)
|
||||
|
|
|
@ -715,7 +715,7 @@ func TestLoadBalancerMatchesClusterRegion(t *testing.T) {
|
|||
|
||||
serviceName := types.NamespacedName{Namespace: "foo", Name: "bar"}
|
||||
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone)
|
||||
_, err = c.EnsureLoadBalancer("elb-name", badELBRegion, nil, nil, nil, serviceName, api.ServiceAffinityNone, nil)
|
||||
if err == nil || err.Error() != errorMessage {
|
||||
t.Errorf("Expected EnsureLoadBalancer region mismatch error.")
|
||||
}
|
||||
|
|
|
@ -132,7 +132,7 @@ func (f *FakeCloud) GetLoadBalancer(name, region string) (*api.LoadBalancerStatu
|
|||
|
||||
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
|
||||
// It adds an entry "create" into the internal method call record.
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (f *FakeCloud) EnsureLoadBalancer(name, region string, externalIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
f.addCall("create")
|
||||
if f.Balancers == nil {
|
||||
f.Balancers = make(map[string]FakeBalancer)
|
||||
|
|
|
@ -61,8 +61,30 @@ const (
|
|||
|
||||
operationPollInterval = 3 * time.Second
|
||||
operationPollTimeoutDuration = 30 * time.Minute
|
||||
|
||||
defaultLBSourceRange = "0.0.0.0/0"
|
||||
|
||||
//Expected annotations for GCE
|
||||
gceLBAllowSourceRange = "net.beta.kubernetes.io/gce-source-ranges"
|
||||
)
|
||||
|
||||
//validateAllowSourceRange validates annotation of allow source ranges
|
||||
func validateSourceRangeAnnotation(annotation cloudprovider.ServiceAnnotation) error {
|
||||
val := annotation[gceLBAllowSourceRange]
|
||||
errMsg := fmt.Errorf("Service annotation %s:%s is not valid. Expecting source IP ranges. Comma Seperated. For example, 0.0.0.0/0,192.168.2.0/24", gceLBAllowSourceRange, val)
|
||||
ranges := strings.Split(val, ",")
|
||||
if len(ranges) <= 0 {
|
||||
return errMsg
|
||||
}
|
||||
for _, subnet := range ranges {
|
||||
_, _, err := net.ParseCIDR(subnet)
|
||||
if err != nil {
|
||||
return errMsg
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GCECloud is an implementation of Interface, LoadBalancer and Instances for Google Compute Engine.
|
||||
type GCECloud struct {
|
||||
service *compute.Service
|
||||
|
@ -432,12 +454,12 @@ func isHTTPErrorCode(err error, code int) bool {
|
|||
// Due to an interesting series of design decisions, this handles both creating
|
||||
// new load balancers and updating existing load balancers, recognizing when
|
||||
// each is needed.
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP, ports []*api.ServicePort, hostNames []string, serviceName types.NamespacedName, affinityType api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
portStr := []string{}
|
||||
for _, p := range ports {
|
||||
portStr = append(portStr, fmt.Sprintf("%s/%d", p.Protocol, p.Port))
|
||||
}
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName)
|
||||
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, requestedIP, portStr, hostNames, serviceName, annotations)
|
||||
|
||||
if len(hostNames) == 0 {
|
||||
return nil, fmt.Errorf("Cannot EnsureLoadBalancer() with no hosts")
|
||||
|
@ -555,7 +577,17 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
// Deal with the firewall next. The reason we do this here rather than last
|
||||
// is because the forwarding rule is used as the indicator that the load
|
||||
// balancer is fully created - it's what getLoadBalancer checks for.
|
||||
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports)
|
||||
// Check if user specified the allow source range
|
||||
sourceRanges := []string{defaultLBSourceRange}
|
||||
val, ok := annotations.GetValue(gceLBAllowSourceRange)
|
||||
if ok {
|
||||
if err := validateSourceRangeAnnotation(annotations); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
sourceRanges = strings.Split(val, ",")
|
||||
}
|
||||
|
||||
firewallExists, firewallNeedsUpdate, err := gce.firewallNeedsUpdate(name, region, ipAddress, ports, sourceRanges)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -565,12 +597,12 @@ func (gce *GCECloud) EnsureLoadBalancer(name, region string, requestedIP net.IP,
|
|||
// Unlike forwarding rules and target pools, firewalls can be updated
|
||||
// without needing to be deleted and recreated.
|
||||
if firewallExists {
|
||||
if err := gce.updateFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
if err := gce.updateFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): updated firewall", name, serviceName)
|
||||
} else {
|
||||
if err := gce.createFirewall(name, region, desc, "0.0.0.0/0", ports, hosts); err != nil {
|
||||
if err := gce.createFirewall(name, region, desc, sourceRanges, ports, hosts); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v(%v)): created firewall", name, serviceName)
|
||||
|
@ -713,7 +745,7 @@ func translateAffinityType(affinityType api.ServiceAffinity) string {
|
|||
}
|
||||
}
|
||||
|
||||
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort) (exists bool, needsUpdate bool, err error) {
|
||||
func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports []*api.ServicePort, sourceRanges []string) (exists bool, needsUpdate bool, err error) {
|
||||
fw, err := gce.service.Firewalls.Get(gce.projectID, makeFirewallName(name)).Do()
|
||||
if err != nil {
|
||||
if isHTTPErrorCode(err, http.StatusNotFound) {
|
||||
|
@ -737,6 +769,9 @@ func (gce *GCECloud) firewallNeedsUpdate(name, region, ipAddress string, ports [
|
|||
}
|
||||
// The service controller already verified that the protocol matches on all ports, no need to check.
|
||||
|
||||
if !slicesEqual(fw.SourceRanges, sourceRanges) {
|
||||
return true, true, nil
|
||||
}
|
||||
return true, false, nil
|
||||
}
|
||||
|
||||
|
@ -810,8 +845,8 @@ func (gce *GCECloud) createTargetPool(name, region string, hosts []*gceInstance,
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts)
|
||||
func (gce *GCECloud) createFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -828,8 +863,8 @@ func (gce *GCECloud) createFirewall(name, region, desc, srcRange string, ports [
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, srcRange, ports, hosts)
|
||||
func (gce *GCECloud) updateFirewall(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) error {
|
||||
firewall, err := gce.firewallObject(name, region, desc, sourceRanges, ports, hosts)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -846,7 +881,7 @@ func (gce *GCECloud) updateFirewall(name, region, desc, srcRange string, ports [
|
|||
return nil
|
||||
}
|
||||
|
||||
func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
|
||||
func (gce *GCECloud) firewallObject(name, region, desc string, sourceRanges []string, ports []*api.ServicePort, hosts []*gceInstance) (*compute.Firewall, error) {
|
||||
allowedPorts := make([]string, len(ports))
|
||||
for ix := range ports {
|
||||
allowedPorts[ix] = strconv.Itoa(ports[ix].Port)
|
||||
|
@ -859,7 +894,7 @@ func (gce *GCECloud) firewallObject(name, region, desc, srcRange string, ports [
|
|||
Name: makeFirewallName(name),
|
||||
Description: desc,
|
||||
Network: gce.networkURL,
|
||||
SourceRanges: []string{srcRange},
|
||||
SourceRanges: sourceRanges,
|
||||
TargetTags: hostTags,
|
||||
Allowed: []*compute.FirewallAllowed{
|
||||
{
|
||||
|
@ -1138,7 +1173,7 @@ func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
|
|||
}
|
||||
|
||||
// CreateFirewall creates the given firewall rule.
|
||||
func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error {
|
||||
func (gce *GCECloud) CreateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1153,7 +1188,7 @@ func (gce *GCECloud) CreateFirewall(name, desc, srcRange string, ports []int64,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return gce.createFirewall(name, region, desc, srcRange, svcPorts, hosts)
|
||||
return gce.createFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
|
||||
}
|
||||
|
||||
// DeleteFirewall deletes the given firewall rule.
|
||||
|
@ -1167,7 +1202,7 @@ func (gce *GCECloud) DeleteFirewall(name string) error {
|
|||
|
||||
// UpdateFirewall applies the given firewall rule as an update to an existing
|
||||
// firewall rule with the same name.
|
||||
func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64, hostNames []string) error {
|
||||
func (gce *GCECloud) UpdateFirewall(name, desc string, sourceRanges []string, ports []int64, hostNames []string) error {
|
||||
region, err := GetGCERegion(gce.localZone)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1182,7 +1217,7 @@ func (gce *GCECloud) UpdateFirewall(name, desc, srcRange string, ports []int64,
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return gce.updateFirewall(name, region, desc, srcRange, svcPorts, hosts)
|
||||
return gce.updateFirewall(name, region, desc, sourceRanges, svcPorts, hosts)
|
||||
}
|
||||
|
||||
// Global static IP management
|
||||
|
|
|
@ -660,8 +660,8 @@ func (lb *LoadBalancer) GetLoadBalancer(name, region string) (*api.LoadBalancerS
|
|||
// a list of regions (from config) and query/create loadbalancers in
|
||||
// each region.
|
||||
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName)
|
||||
func (lb *LoadBalancer) EnsureLoadBalancer(name, region string, loadBalancerIP net.IP, ports []*api.ServicePort, hosts []string, serviceName types.NamespacedName, affinity api.ServiceAffinity, annotations cloudprovider.ServiceAnnotation) (*api.LoadBalancerStatus, error) {
|
||||
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", name, region, loadBalancerIP, ports, hosts, serviceName, annotations)
|
||||
|
||||
if len(ports) > 1 {
|
||||
return nil, fmt.Errorf("multiple ports are not yet supported in openstack load balancers")
|
||||
|
|
|
@ -34,6 +34,7 @@ import (
|
|||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/types"
|
||||
"k8s.io/kubernetes/pkg/util/runtime"
|
||||
"reflect"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -247,6 +248,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||
}
|
||||
message += err.Error()
|
||||
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
|
||||
|
||||
return err, retry
|
||||
}
|
||||
// Always update the cache upon success.
|
||||
|
@ -321,6 +323,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.Name
|
|||
|
||||
// The load balancer doesn't exist yet, so create it.
|
||||
s.eventRecorder.Event(service, api.EventTypeNormal, "CreatingLoadBalancer", "Creating load balancer")
|
||||
|
||||
err := s.createLoadBalancer(service, namespacedName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to create load balancer for service %s: %v", namespacedName, err), retryable
|
||||
|
@ -385,7 +388,7 @@ func (s *ServiceController) createLoadBalancer(service *api.Service, serviceName
|
|||
// - Not all cloud providers support all protocols and the next step is expected to return
|
||||
// an error for unsupported protocols
|
||||
status, err := s.balancer.EnsureLoadBalancer(name, s.zone.Region, net.ParseIP(service.Spec.LoadBalancerIP),
|
||||
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity)
|
||||
ports, hostsFromNodeList(&nodes), serviceName, service.Spec.SessionAffinity, service.ObjectMeta.Annotations)
|
||||
if err != nil {
|
||||
return err
|
||||
} else {
|
||||
|
@ -488,6 +491,10 @@ func (s *ServiceController) needsUpdate(oldService *api.Service, newService *api
|
|||
return true
|
||||
}
|
||||
}
|
||||
if !reflect.DeepEqual(oldService.Annotations, newService.Annotations) {
|
||||
return true
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue