mirror of https://github.com/k3s-io/k3s
1588 lines
52 KiB
Go
1588 lines
52 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package aws
|
|
|
|
import (
|
|
"crypto/sha1"
|
|
"fmt"
|
|
"reflect"
|
|
"strconv"
|
|
"strings"
|
|
|
|
"github.com/aws/aws-sdk-go/aws"
|
|
"github.com/aws/aws-sdk-go/aws/awserr"
|
|
"github.com/aws/aws-sdk-go/service/ec2"
|
|
"github.com/aws/aws-sdk-go/service/elb"
|
|
"github.com/aws/aws-sdk-go/service/elbv2"
|
|
"k8s.io/klog"
|
|
|
|
"k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
)
|
|
|
|
const (
|
|
// ProxyProtocolPolicyName is the tag named used for the proxy protocol
|
|
// policy
|
|
ProxyProtocolPolicyName = "k8s-proxyprotocol-enabled"
|
|
|
|
// SSLNegotiationPolicyNameFormat is a format string used for the SSL
|
|
// negotiation policy tag name
|
|
SSLNegotiationPolicyNameFormat = "k8s-SSLNegotiationPolicy-%s"
|
|
)
|
|
|
|
var (
|
|
// Defaults for ELB Healthcheck
|
|
defaultHCHealthyThreshold = int64(2)
|
|
defaultHCUnhealthyThreshold = int64(6)
|
|
defaultHCTimeout = int64(5)
|
|
defaultHCInterval = int64(10)
|
|
)
|
|
|
|
func isNLB(annotations map[string]string) bool {
|
|
if annotations[ServiceAnnotationLoadBalancerType] == "nlb" {
|
|
return true
|
|
}
|
|
return false
|
|
}
|
|
|
|
type nlbPortMapping struct {
|
|
FrontendPort int64
|
|
TrafficPort int64
|
|
ClientCIDR string
|
|
|
|
HealthCheckPort int64
|
|
HealthCheckPath string
|
|
HealthCheckProtocol string
|
|
}
|
|
|
|
// getLoadBalancerAdditionalTags converts the comma separated list of key-value
|
|
// pairs in the ServiceAnnotationLoadBalancerAdditionalTags annotation and returns
|
|
// it as a map.
|
|
func getLoadBalancerAdditionalTags(annotations map[string]string) map[string]string {
|
|
additionalTags := make(map[string]string)
|
|
if additionalTagsList, ok := annotations[ServiceAnnotationLoadBalancerAdditionalTags]; ok {
|
|
additionalTagsList = strings.TrimSpace(additionalTagsList)
|
|
|
|
// Break up list of "Key1=Val,Key2=Val2"
|
|
tagList := strings.Split(additionalTagsList, ",")
|
|
|
|
// Break up "Key=Val"
|
|
for _, tagSet := range tagList {
|
|
tag := strings.Split(strings.TrimSpace(tagSet), "=")
|
|
|
|
// Accept "Key=val" or "Key=" or just "Key"
|
|
if len(tag) >= 2 && len(tag[0]) != 0 {
|
|
// There is a key and a value, so save it
|
|
additionalTags[tag[0]] = tag[1]
|
|
} else if len(tag) == 1 && len(tag[0]) != 0 {
|
|
// Just "Key"
|
|
additionalTags[tag[0]] = ""
|
|
}
|
|
}
|
|
}
|
|
|
|
return additionalTags
|
|
}
|
|
|
|
// ensureLoadBalancerv2 ensures a v2 load balancer is created
|
|
func (c *Cloud) ensureLoadBalancerv2(namespacedName types.NamespacedName, loadBalancerName string, mappings []nlbPortMapping, instanceIDs, subnetIDs []string, internalELB bool, annotations map[string]string) (*elbv2.LoadBalancer, error) {
|
|
loadBalancer, err := c.describeLoadBalancerv2(loadBalancerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dirty := false
|
|
|
|
// Get additional tags set by the user
|
|
tags := getLoadBalancerAdditionalTags(annotations)
|
|
// Add default tags
|
|
tags[TagNameKubernetesService] = namespacedName.String()
|
|
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
|
|
|
|
if loadBalancer == nil {
|
|
// Create the LB
|
|
createRequest := &elbv2.CreateLoadBalancerInput{
|
|
Type: aws.String(elbv2.LoadBalancerTypeEnumNetwork),
|
|
Name: aws.String(loadBalancerName),
|
|
}
|
|
if internalELB {
|
|
createRequest.Scheme = aws.String("internal")
|
|
}
|
|
|
|
// We are supposed to specify one subnet per AZ.
|
|
// TODO: What happens if we have more than one subnet per AZ?
|
|
createRequest.SubnetMappings = createSubnetMappings(subnetIDs)
|
|
|
|
for k, v := range tags {
|
|
createRequest.Tags = append(createRequest.Tags, &elbv2.Tag{
|
|
Key: aws.String(k), Value: aws.String(v),
|
|
})
|
|
}
|
|
|
|
klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
|
|
createResponse, err := c.elbv2.CreateLoadBalancer(createRequest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error creating load balancer: %q", err)
|
|
}
|
|
|
|
loadBalancer = createResponse.LoadBalancers[0]
|
|
|
|
// Create Target Groups
|
|
resourceArns := make([]*string, 0, len(mappings))
|
|
|
|
for i := range mappings {
|
|
// It is easier to keep track of updates by having possibly
|
|
// duplicate target groups where the backend port is the same
|
|
_, targetGroupArn, err := c.createListenerV2(createResponse.LoadBalancers[0].LoadBalancerArn, mappings[i], namespacedName, instanceIDs, *createResponse.LoadBalancers[0].VpcId)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error creating listener: %q", err)
|
|
}
|
|
resourceArns = append(resourceArns, targetGroupArn)
|
|
|
|
}
|
|
|
|
// Add tags to targets
|
|
targetGroupTags := make([]*elbv2.Tag, 0, len(tags))
|
|
|
|
for k, v := range tags {
|
|
targetGroupTags = append(targetGroupTags, &elbv2.Tag{
|
|
Key: aws.String(k), Value: aws.String(v),
|
|
})
|
|
}
|
|
if len(resourceArns) > 0 && len(targetGroupTags) > 0 {
|
|
// elbv2.AddTags doesn't allow to tag multiple resources at once
|
|
for _, arn := range resourceArns {
|
|
_, err = c.elbv2.AddTags(&elbv2.AddTagsInput{
|
|
ResourceArns: []*string{arn},
|
|
Tags: targetGroupTags,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error adding tags after creating Load Balancer: %q", err)
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
// TODO: Sync internal vs non-internal
|
|
|
|
// sync mappings
|
|
{
|
|
listenerDescriptions, err := c.elbv2.DescribeListeners(
|
|
&elbv2.DescribeListenersInput{
|
|
LoadBalancerArn: loadBalancer.LoadBalancerArn,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error describing listeners: %q", err)
|
|
}
|
|
|
|
// actual maps FrontendPort to an elbv2.Listener
|
|
actual := map[int64]*elbv2.Listener{}
|
|
for _, listener := range listenerDescriptions.Listeners {
|
|
actual[*listener.Port] = listener
|
|
}
|
|
|
|
actualTargetGroups, err := c.elbv2.DescribeTargetGroups(
|
|
&elbv2.DescribeTargetGroupsInput{
|
|
LoadBalancerArn: loadBalancer.LoadBalancerArn,
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error listing target groups: %q", err)
|
|
}
|
|
|
|
nodePortTargetGroup := map[int64]*elbv2.TargetGroup{}
|
|
for _, targetGroup := range actualTargetGroups.TargetGroups {
|
|
nodePortTargetGroup[*targetGroup.Port] = targetGroup
|
|
}
|
|
|
|
// Create Target Groups
|
|
addTagsInput := &elbv2.AddTagsInput{
|
|
ResourceArns: []*string{},
|
|
Tags: []*elbv2.Tag{},
|
|
}
|
|
|
|
// Handle additions/modifications
|
|
for _, mapping := range mappings {
|
|
frontendPort := mapping.FrontendPort
|
|
nodePort := mapping.TrafficPort
|
|
|
|
// modifications
|
|
if listener, ok := actual[frontendPort]; ok {
|
|
// nodePort must have changed, we'll need to delete old TG
|
|
// and recreate
|
|
if targetGroup, ok := nodePortTargetGroup[nodePort]; !ok {
|
|
// Create new Target group
|
|
targetName := createTargetName(namespacedName, frontendPort, nodePort)
|
|
targetGroup, err = c.ensureTargetGroup(
|
|
nil,
|
|
mapping,
|
|
instanceIDs,
|
|
targetName,
|
|
*loadBalancer.VpcId,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Associate new target group to LB
|
|
_, err := c.elbv2.ModifyListener(&elbv2.ModifyListenerInput{
|
|
ListenerArn: listener.ListenerArn,
|
|
Port: aws.Int64(frontendPort),
|
|
Protocol: aws.String("TCP"),
|
|
DefaultActions: []*elbv2.Action{{
|
|
TargetGroupArn: targetGroup.TargetGroupArn,
|
|
Type: aws.String("forward"),
|
|
}},
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error updating load balancer listener: %q", err)
|
|
}
|
|
|
|
// Delete old target group
|
|
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{
|
|
TargetGroupArn: listener.DefaultActions[0].TargetGroupArn,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error deleting old target group: %q", err)
|
|
}
|
|
|
|
} else {
|
|
// Run ensureTargetGroup to make sure instances in service are up-to-date
|
|
targetName := createTargetName(namespacedName, frontendPort, nodePort)
|
|
_, err = c.ensureTargetGroup(
|
|
targetGroup,
|
|
mapping,
|
|
instanceIDs,
|
|
targetName,
|
|
*loadBalancer.VpcId,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
dirty = true
|
|
continue
|
|
}
|
|
|
|
// Additions
|
|
_, targetGroupArn, err := c.createListenerV2(loadBalancer.LoadBalancerArn, mapping, namespacedName, instanceIDs, *loadBalancer.VpcId)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
addTagsInput.ResourceArns = append(addTagsInput.ResourceArns, targetGroupArn)
|
|
dirty = true
|
|
}
|
|
|
|
frontEndPorts := map[int64]bool{}
|
|
for i := range mappings {
|
|
frontEndPorts[mappings[i].FrontendPort] = true
|
|
}
|
|
|
|
// handle deletions
|
|
for port, listener := range actual {
|
|
if _, ok := frontEndPorts[port]; !ok {
|
|
err := c.deleteListenerV2(listener)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
// Add tags to new targets
|
|
for k, v := range tags {
|
|
addTagsInput.Tags = append(addTagsInput.Tags, &elbv2.Tag{
|
|
Key: aws.String(k), Value: aws.String(v),
|
|
})
|
|
}
|
|
if len(addTagsInput.ResourceArns) > 0 && len(addTagsInput.Tags) > 0 {
|
|
_, err = c.elbv2.AddTags(addTagsInput)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error adding tags after modifying load balancer targets: %q", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
desiredLoadBalancerAttributes := map[string]string{}
|
|
// Default values to ensured a remove annotation reverts back to the default
|
|
desiredLoadBalancerAttributes["load_balancing.cross_zone.enabled"] = "false"
|
|
|
|
// Determine if cross zone load balancing enabled/disabled has been specified
|
|
crossZoneLoadBalancingEnabledAnnotation := annotations[ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled]
|
|
if crossZoneLoadBalancingEnabledAnnotation != "" {
|
|
crossZoneEnabled, err := strconv.ParseBool(crossZoneLoadBalancingEnabledAnnotation)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error parsing service annotation: %s=%s",
|
|
ServiceAnnotationLoadBalancerCrossZoneLoadBalancingEnabled,
|
|
crossZoneLoadBalancingEnabledAnnotation,
|
|
)
|
|
}
|
|
|
|
if crossZoneEnabled {
|
|
desiredLoadBalancerAttributes["load_balancing.cross_zone.enabled"] = "true"
|
|
}
|
|
}
|
|
|
|
// Whether the ELB was new or existing, sync attributes regardless. This accounts for things
|
|
// that cannot be specified at the time of creation and can only be modified after the fact,
|
|
// e.g. idle connection timeout.
|
|
describeAttributesRequest := &elbv2.DescribeLoadBalancerAttributesInput{
|
|
LoadBalancerArn: loadBalancer.LoadBalancerArn,
|
|
}
|
|
describeAttributesOutput, err := c.elbv2.DescribeLoadBalancerAttributes(describeAttributesRequest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Unable to retrieve load balancer attributes during attribute sync: %q", err)
|
|
}
|
|
|
|
changedAttributes := []*elbv2.LoadBalancerAttribute{}
|
|
|
|
// Identify to be changed attributes
|
|
for _, foundAttribute := range describeAttributesOutput.Attributes {
|
|
if targetValue, ok := desiredLoadBalancerAttributes[*foundAttribute.Key]; ok {
|
|
if targetValue != *foundAttribute.Value {
|
|
changedAttributes = append(changedAttributes, &elbv2.LoadBalancerAttribute{
|
|
Key: foundAttribute.Key,
|
|
Value: aws.String(targetValue),
|
|
})
|
|
}
|
|
}
|
|
}
|
|
|
|
// Update attributes requiring changes
|
|
if len(changedAttributes) > 0 {
|
|
klog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName)
|
|
|
|
_, err = c.elbv2.ModifyLoadBalancerAttributes(&elbv2.ModifyLoadBalancerAttributesInput{
|
|
LoadBalancerArn: loadBalancer.LoadBalancerArn,
|
|
Attributes: changedAttributes,
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err)
|
|
}
|
|
}
|
|
|
|
// Subnets cannot be modified on NLBs
|
|
if dirty {
|
|
loadBalancers, err := c.elbv2.DescribeLoadBalancers(
|
|
&elbv2.DescribeLoadBalancersInput{
|
|
LoadBalancerArns: []*string{
|
|
loadBalancer.LoadBalancerArn,
|
|
},
|
|
},
|
|
)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error retrieving load balancer after update: %q", err)
|
|
}
|
|
loadBalancer = loadBalancers.LoadBalancers[0]
|
|
}
|
|
}
|
|
return loadBalancer, nil
|
|
}
|
|
|
|
// create a valid target group name - ensure name is not over 32 characters
|
|
func createTargetName(namespacedName types.NamespacedName, frontendPort, nodePort int64) string {
|
|
sha := fmt.Sprintf("%x", sha1.Sum([]byte(namespacedName.String())))[:13]
|
|
return fmt.Sprintf("k8s-tg-%s-%d-%d", sha, frontendPort, nodePort)
|
|
}
|
|
|
|
func (c *Cloud) createListenerV2(loadBalancerArn *string, mapping nlbPortMapping, namespacedName types.NamespacedName, instanceIDs []string, vpcID string) (listener *elbv2.Listener, targetGroupArn *string, err error) {
|
|
targetName := createTargetName(namespacedName, mapping.FrontendPort, mapping.TrafficPort)
|
|
|
|
klog.Infof("Creating load balancer target group for %v with name: %s", namespacedName, targetName)
|
|
target, err := c.ensureTargetGroup(
|
|
nil,
|
|
mapping,
|
|
instanceIDs,
|
|
targetName,
|
|
vpcID,
|
|
)
|
|
if err != nil {
|
|
return nil, aws.String(""), err
|
|
}
|
|
|
|
createListernerInput := &elbv2.CreateListenerInput{
|
|
LoadBalancerArn: loadBalancerArn,
|
|
Port: aws.Int64(mapping.FrontendPort),
|
|
Protocol: aws.String("TCP"),
|
|
DefaultActions: []*elbv2.Action{{
|
|
TargetGroupArn: target.TargetGroupArn,
|
|
Type: aws.String(elbv2.ActionTypeEnumForward),
|
|
}},
|
|
}
|
|
klog.Infof("Creating load balancer listener for %v", namespacedName)
|
|
createListenerOutput, err := c.elbv2.CreateListener(createListernerInput)
|
|
if err != nil {
|
|
return nil, aws.String(""), fmt.Errorf("Error creating load balancer listener: %q", err)
|
|
}
|
|
return createListenerOutput.Listeners[0], target.TargetGroupArn, nil
|
|
}
|
|
|
|
// cleans up listener and corresponding target group
|
|
func (c *Cloud) deleteListenerV2(listener *elbv2.Listener) error {
|
|
_, err := c.elbv2.DeleteListener(&elbv2.DeleteListenerInput{ListenerArn: listener.ListenerArn})
|
|
if err != nil {
|
|
return fmt.Errorf("Error deleting load balancer listener: %q", err)
|
|
}
|
|
_, err = c.elbv2.DeleteTargetGroup(&elbv2.DeleteTargetGroupInput{TargetGroupArn: listener.DefaultActions[0].TargetGroupArn})
|
|
if err != nil {
|
|
return fmt.Errorf("Error deleting load balancer target group: %q", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// ensureTargetGroup creates a target group with a set of instances
|
|
func (c *Cloud) ensureTargetGroup(targetGroup *elbv2.TargetGroup, mapping nlbPortMapping, instances []string, name string, vpcID string) (*elbv2.TargetGroup, error) {
|
|
dirty := false
|
|
if targetGroup == nil {
|
|
|
|
input := &elbv2.CreateTargetGroupInput{
|
|
VpcId: aws.String(vpcID),
|
|
Name: aws.String(name),
|
|
Port: aws.Int64(mapping.TrafficPort),
|
|
Protocol: aws.String("TCP"),
|
|
TargetType: aws.String("instance"),
|
|
HealthCheckIntervalSeconds: aws.Int64(30),
|
|
HealthCheckPort: aws.String("traffic-port"),
|
|
HealthCheckProtocol: aws.String("TCP"),
|
|
HealthyThresholdCount: aws.Int64(3),
|
|
UnhealthyThresholdCount: aws.Int64(3),
|
|
}
|
|
|
|
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
|
|
if mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
|
|
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
|
|
}
|
|
|
|
// Account for externalTrafficPolicy = "Local"
|
|
if mapping.HealthCheckPort != mapping.TrafficPort {
|
|
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
|
|
}
|
|
|
|
result, err := c.elbv2.CreateTargetGroup(input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error creating load balancer target group: %q", err)
|
|
}
|
|
if len(result.TargetGroups) != 1 {
|
|
return nil, fmt.Errorf("Expected only one target group on CreateTargetGroup, got %d groups", len(result.TargetGroups))
|
|
}
|
|
|
|
registerInput := &elbv2.RegisterTargetsInput{
|
|
TargetGroupArn: result.TargetGroups[0].TargetGroupArn,
|
|
Targets: []*elbv2.TargetDescription{},
|
|
}
|
|
for _, instanceID := range instances {
|
|
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
|
|
Id: aws.String(string(instanceID)),
|
|
Port: aws.Int64(mapping.TrafficPort),
|
|
})
|
|
}
|
|
|
|
_, err = c.elbv2.RegisterTargets(registerInput)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error registering targets for load balancer: %q", err)
|
|
}
|
|
|
|
return result.TargetGroups[0], nil
|
|
}
|
|
|
|
// handle instances in service
|
|
{
|
|
healthResponse, err := c.elbv2.DescribeTargetHealth(&elbv2.DescribeTargetHealthInput{TargetGroupArn: targetGroup.TargetGroupArn})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error describing target group health: %q", err)
|
|
}
|
|
actualIDs := []string{}
|
|
for _, healthDescription := range healthResponse.TargetHealthDescriptions {
|
|
if healthDescription.TargetHealth.Reason != nil {
|
|
switch aws.StringValue(healthDescription.TargetHealth.Reason) {
|
|
case elbv2.TargetHealthReasonEnumTargetDeregistrationInProgress:
|
|
// We don't need to count this instance in service if it is
|
|
// on its way out
|
|
default:
|
|
actualIDs = append(actualIDs, *healthDescription.Target.Id)
|
|
}
|
|
}
|
|
}
|
|
|
|
actual := sets.NewString(actualIDs...)
|
|
expected := sets.NewString(instances...)
|
|
|
|
additions := expected.Difference(actual)
|
|
removals := actual.Difference(expected)
|
|
|
|
if len(additions) > 0 {
|
|
registerInput := &elbv2.RegisterTargetsInput{
|
|
TargetGroupArn: targetGroup.TargetGroupArn,
|
|
Targets: []*elbv2.TargetDescription{},
|
|
}
|
|
for instanceID := range additions {
|
|
registerInput.Targets = append(registerInput.Targets, &elbv2.TargetDescription{
|
|
Id: aws.String(instanceID),
|
|
Port: aws.Int64(mapping.TrafficPort),
|
|
})
|
|
}
|
|
_, err := c.elbv2.RegisterTargets(registerInput)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error registering new targets in target group: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
|
|
if len(removals) > 0 {
|
|
deregisterInput := &elbv2.DeregisterTargetsInput{
|
|
TargetGroupArn: targetGroup.TargetGroupArn,
|
|
Targets: []*elbv2.TargetDescription{},
|
|
}
|
|
for instanceID := range removals {
|
|
deregisterInput.Targets = append(deregisterInput.Targets, &elbv2.TargetDescription{
|
|
Id: aws.String(instanceID),
|
|
Port: aws.Int64(mapping.TrafficPort),
|
|
})
|
|
}
|
|
_, err := c.elbv2.DeregisterTargets(deregisterInput)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error trying to deregister targets in target group: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
// ensure the health check is correct
|
|
{
|
|
dirtyHealthCheck := false
|
|
|
|
input := &elbv2.ModifyTargetGroupInput{
|
|
TargetGroupArn: targetGroup.TargetGroupArn,
|
|
}
|
|
|
|
if aws.StringValue(targetGroup.HealthCheckProtocol) != mapping.HealthCheckProtocol {
|
|
input.HealthCheckProtocol = aws.String(mapping.HealthCheckProtocol)
|
|
dirtyHealthCheck = true
|
|
}
|
|
if aws.StringValue(targetGroup.HealthCheckPort) != strconv.Itoa(int(mapping.HealthCheckPort)) {
|
|
input.HealthCheckPort = aws.String(strconv.Itoa(int(mapping.HealthCheckPort)))
|
|
dirtyHealthCheck = true
|
|
}
|
|
if mapping.HealthCheckPath != "" && mapping.HealthCheckProtocol != elbv2.ProtocolEnumTcp {
|
|
input.HealthCheckPath = aws.String(mapping.HealthCheckPath)
|
|
dirtyHealthCheck = true
|
|
}
|
|
|
|
if dirtyHealthCheck {
|
|
_, err := c.elbv2.ModifyTargetGroup(input)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error modifying target group health check: %q", err)
|
|
}
|
|
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
if dirty {
|
|
result, err := c.elbv2.DescribeTargetGroups(&elbv2.DescribeTargetGroupsInput{
|
|
Names: []*string{aws.String(name)},
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error retrieving target group after creation/update: %q", err)
|
|
}
|
|
targetGroup = result.TargetGroups[0]
|
|
}
|
|
|
|
return targetGroup, nil
|
|
}
|
|
|
|
func portsForNLB(lbName string, sg *ec2.SecurityGroup, clientTraffic bool) sets.Int64 {
|
|
response := sets.NewInt64()
|
|
var annotation string
|
|
if clientTraffic {
|
|
annotation = fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
|
|
} else {
|
|
annotation = fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
|
|
}
|
|
|
|
for i := range sg.IpPermissions {
|
|
for j := range sg.IpPermissions[i].IpRanges {
|
|
description := aws.StringValue(sg.IpPermissions[i].IpRanges[j].Description)
|
|
if description == annotation {
|
|
// TODO should probably check FromPort == ToPort
|
|
response.Insert(aws.Int64Value(sg.IpPermissions[i].FromPort))
|
|
}
|
|
}
|
|
}
|
|
return response
|
|
}
|
|
|
|
// filterForIPRangeDescription filters in security groups that have IpRange Descriptions that match a loadBalancerName
|
|
func filterForIPRangeDescription(securityGroups []*ec2.SecurityGroup, lbName string) []*ec2.SecurityGroup {
|
|
response := []*ec2.SecurityGroup{}
|
|
clientRule := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
|
|
healthRule := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
|
|
alreadyAdded := sets.NewString()
|
|
for i := range securityGroups {
|
|
for j := range securityGroups[i].IpPermissions {
|
|
for k := range securityGroups[i].IpPermissions[j].IpRanges {
|
|
description := aws.StringValue(securityGroups[i].IpPermissions[j].IpRanges[k].Description)
|
|
if description == clientRule || description == healthRule {
|
|
sgIDString := aws.StringValue(securityGroups[i].GroupId)
|
|
if !alreadyAdded.Has(sgIDString) {
|
|
response = append(response, securityGroups[i])
|
|
alreadyAdded.Insert(sgIDString)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return response
|
|
}
|
|
|
|
func (c *Cloud) getVpcCidrBlocks() ([]string, error) {
|
|
vpcs, err := c.ec2.DescribeVpcs(&ec2.DescribeVpcsInput{
|
|
VpcIds: []*string{aws.String(c.vpcID)},
|
|
})
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Error querying VPC for ELB: %q", err)
|
|
}
|
|
if len(vpcs.Vpcs) != 1 {
|
|
return nil, fmt.Errorf("Error querying VPC for ELB, got %d vpcs for %s", len(vpcs.Vpcs), c.vpcID)
|
|
}
|
|
|
|
cidrBlocks := make([]string, 0, len(vpcs.Vpcs[0].CidrBlockAssociationSet))
|
|
for _, cidr := range vpcs.Vpcs[0].CidrBlockAssociationSet {
|
|
cidrBlocks = append(cidrBlocks, aws.StringValue(cidr.CidrBlock))
|
|
}
|
|
return cidrBlocks, nil
|
|
}
|
|
|
|
// abstraction for updating SG rules
|
|
// if clientTraffic is false, then only update HealthCheck rules
|
|
func (c *Cloud) updateInstanceSecurityGroupsForNLBTraffic(actualGroups []*ec2.SecurityGroup, desiredSgIds []string, ports []int64, lbName string, clientCidrs []string, clientTraffic bool) error {
|
|
|
|
klog.V(8).Infof("updateInstanceSecurityGroupsForNLBTraffic: actualGroups=%v, desiredSgIds=%v, ports=%v, clientTraffic=%v", actualGroups, desiredSgIds, ports, clientTraffic)
|
|
// Map containing the groups we want to make changes on; the ports to make
|
|
// changes on; and whether to add or remove it. true to add, false to remove
|
|
portChanges := map[string]map[int64]bool{}
|
|
|
|
for _, id := range desiredSgIds {
|
|
// consider everything an addition for now
|
|
if _, ok := portChanges[id]; !ok {
|
|
portChanges[id] = make(map[int64]bool)
|
|
}
|
|
for _, port := range ports {
|
|
portChanges[id][port] = true
|
|
}
|
|
}
|
|
|
|
// Compare to actual groups
|
|
for _, actualGroup := range actualGroups {
|
|
actualGroupID := aws.StringValue(actualGroup.GroupId)
|
|
if actualGroupID == "" {
|
|
klog.Warning("Ignoring group without ID: ", actualGroup)
|
|
continue
|
|
}
|
|
|
|
addingMap, ok := portChanges[actualGroupID]
|
|
if ok {
|
|
desiredSet := sets.NewInt64()
|
|
for port := range addingMap {
|
|
desiredSet.Insert(port)
|
|
}
|
|
existingSet := portsForNLB(lbName, actualGroup, clientTraffic)
|
|
|
|
// remove from portChanges ports that are already allowed
|
|
if intersection := desiredSet.Intersection(existingSet); intersection.Len() > 0 {
|
|
for p := range intersection {
|
|
delete(portChanges[actualGroupID], p)
|
|
}
|
|
}
|
|
|
|
// allowed ports that need to be removed
|
|
if difference := existingSet.Difference(desiredSet); difference.Len() > 0 {
|
|
for p := range difference {
|
|
portChanges[actualGroupID][p] = false
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Make changes we've planned on
|
|
for instanceSecurityGroupID, portMap := range portChanges {
|
|
adds := []*ec2.IpPermission{}
|
|
removes := []*ec2.IpPermission{}
|
|
for port, add := range portMap {
|
|
if add {
|
|
if clientTraffic {
|
|
klog.V(2).Infof("Adding rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
|
|
klog.V(2).Infof("Adding rule for client traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
|
|
} else {
|
|
klog.V(2).Infof("Adding rule for health check traffic from the network load balancer (%s) to instances (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
|
|
}
|
|
} else {
|
|
if clientTraffic {
|
|
klog.V(2).Infof("Removing rule for client MTU discovery from the network load balancer (%s) to instances (%s)", clientCidrs, instanceSecurityGroupID)
|
|
klog.V(2).Infof("Removing rule for client traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
|
|
}
|
|
klog.V(2).Infof("Removing rule for health check traffic from the network load balancer (%s) to instance (%s), port (%v)", clientCidrs, instanceSecurityGroupID, port)
|
|
}
|
|
|
|
if clientTraffic {
|
|
clientRuleAnnotation := fmt.Sprintf("%s=%s", NLBClientRuleDescription, lbName)
|
|
// Client Traffic
|
|
permission := &ec2.IpPermission{
|
|
FromPort: aws.Int64(port),
|
|
ToPort: aws.Int64(port),
|
|
IpProtocol: aws.String("tcp"),
|
|
}
|
|
ranges := []*ec2.IpRange{}
|
|
for _, cidr := range clientCidrs {
|
|
ranges = append(ranges, &ec2.IpRange{
|
|
CidrIp: aws.String(cidr),
|
|
Description: aws.String(clientRuleAnnotation),
|
|
})
|
|
}
|
|
permission.IpRanges = ranges
|
|
if add {
|
|
adds = append(adds, permission)
|
|
} else {
|
|
removes = append(removes, permission)
|
|
}
|
|
} else {
|
|
healthRuleAnnotation := fmt.Sprintf("%s=%s", NLBHealthCheckRuleDescription, lbName)
|
|
|
|
// NLB HealthCheck
|
|
permission := &ec2.IpPermission{
|
|
FromPort: aws.Int64(port),
|
|
ToPort: aws.Int64(port),
|
|
IpProtocol: aws.String("tcp"),
|
|
}
|
|
ranges := []*ec2.IpRange{}
|
|
for _, cidr := range clientCidrs {
|
|
ranges = append(ranges, &ec2.IpRange{
|
|
CidrIp: aws.String(cidr),
|
|
Description: aws.String(healthRuleAnnotation),
|
|
})
|
|
}
|
|
permission.IpRanges = ranges
|
|
if add {
|
|
adds = append(adds, permission)
|
|
} else {
|
|
removes = append(removes, permission)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(adds) > 0 {
|
|
changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, adds)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !changed {
|
|
klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
|
|
}
|
|
}
|
|
|
|
if len(removes) > 0 {
|
|
changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, removes)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if !changed {
|
|
klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
|
|
}
|
|
}
|
|
|
|
if clientTraffic {
|
|
// MTU discovery
|
|
mtuRuleAnnotation := fmt.Sprintf("%s=%s", NLBMtuDiscoveryRuleDescription, lbName)
|
|
mtuPermission := &ec2.IpPermission{
|
|
IpProtocol: aws.String("icmp"),
|
|
FromPort: aws.Int64(3),
|
|
ToPort: aws.Int64(4),
|
|
}
|
|
ranges := []*ec2.IpRange{}
|
|
for _, cidr := range clientCidrs {
|
|
ranges = append(ranges, &ec2.IpRange{
|
|
CidrIp: aws.String(cidr),
|
|
Description: aws.String(mtuRuleAnnotation),
|
|
})
|
|
}
|
|
mtuPermission.IpRanges = ranges
|
|
|
|
group, err := c.findSecurityGroup(instanceSecurityGroupID)
|
|
if err != nil {
|
|
klog.Warningf("Error retrieving security group: %q", err)
|
|
return err
|
|
}
|
|
|
|
if group == nil {
|
|
klog.Warning("Security group not found: ", instanceSecurityGroupID)
|
|
return nil
|
|
}
|
|
|
|
icmpExists := false
|
|
permCount := 0
|
|
for _, perm := range group.IpPermissions {
|
|
if *perm.IpProtocol == "icmp" {
|
|
icmpExists = true
|
|
continue
|
|
}
|
|
|
|
if perm.FromPort != nil {
|
|
permCount++
|
|
}
|
|
}
|
|
|
|
if !icmpExists && permCount > 0 {
|
|
// the icmp permission is missing
|
|
changed, err := c.addSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission})
|
|
if err != nil {
|
|
klog.Warningf("Error adding MTU permission to security group: %q", err)
|
|
return err
|
|
}
|
|
if !changed {
|
|
klog.Warning("Allowing ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
|
|
}
|
|
} else if icmpExists && permCount == 0 {
|
|
// there is no additional permissions, remove icmp
|
|
changed, err := c.removeSecurityGroupIngress(instanceSecurityGroupID, []*ec2.IpPermission{mtuPermission})
|
|
if err != nil {
|
|
klog.Warningf("Error removing MTU permission to security group: %q", err)
|
|
return err
|
|
}
|
|
if !changed {
|
|
klog.Warning("Revoking ingress was not needed; concurrent change? groupId=", instanceSecurityGroupID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Add SG rules for a given NLB
|
|
func (c *Cloud) updateInstanceSecurityGroupsForNLB(mappings []nlbPortMapping, instances map[InstanceID]*ec2.Instance, lbName string, clientCidrs []string) error {
|
|
if c.cfg.Global.DisableSecurityGroupIngress {
|
|
return nil
|
|
}
|
|
|
|
vpcCidrBlocks, err := c.getVpcCidrBlocks()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Unlike the classic ELB, NLB does not have a security group that we can
|
|
// filter against all existing groups to see if they allow access. Instead
|
|
// we use the IpRange.Description field to annotate NLB health check and
|
|
// client traffic rules
|
|
|
|
// Get the actual list of groups that allow ingress for the load-balancer
|
|
var actualGroups []*ec2.SecurityGroup
|
|
{
|
|
// Server side filter
|
|
describeRequest := &ec2.DescribeSecurityGroupsInput{}
|
|
filters := []*ec2.Filter{
|
|
newEc2Filter("ip-permission.protocol", "tcp"),
|
|
}
|
|
describeRequest.Filters = c.tagging.addFilters(filters)
|
|
response, err := c.ec2.DescribeSecurityGroups(describeRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("Error querying security groups for NLB: %q", err)
|
|
}
|
|
for _, sg := range response {
|
|
if !c.tagging.hasClusterTag(sg.Tags) {
|
|
continue
|
|
}
|
|
actualGroups = append(actualGroups, sg)
|
|
}
|
|
|
|
// client-side filter
|
|
// Filter out groups that don't have IP Rules we've annotated for this service
|
|
actualGroups = filterForIPRangeDescription(actualGroups, lbName)
|
|
}
|
|
|
|
taggedSecurityGroups, err := c.getTaggedSecurityGroups()
|
|
if err != nil {
|
|
return fmt.Errorf("Error querying for tagged security groups: %q", err)
|
|
}
|
|
|
|
externalTrafficPolicyIsLocal := false
|
|
trafficPorts := []int64{}
|
|
for i := range mappings {
|
|
trafficPorts = append(trafficPorts, mappings[i].TrafficPort)
|
|
if mappings[i].TrafficPort != mappings[i].HealthCheckPort {
|
|
externalTrafficPolicyIsLocal = true
|
|
}
|
|
}
|
|
|
|
healthCheckPorts := trafficPorts
|
|
// if externalTrafficPolicy is Local, all listeners use the same health
|
|
// check port
|
|
if externalTrafficPolicyIsLocal && len(mappings) > 0 {
|
|
healthCheckPorts = []int64{mappings[0].HealthCheckPort}
|
|
}
|
|
|
|
desiredGroupIds := []string{}
|
|
// Scan instances for groups we want open
|
|
for _, instance := range instances {
|
|
securityGroup, err := findSecurityGroupForInstance(instance, taggedSecurityGroups)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if securityGroup == nil {
|
|
klog.Warningf("Ignoring instance without security group: %s", aws.StringValue(instance.InstanceId))
|
|
continue
|
|
}
|
|
|
|
id := aws.StringValue(securityGroup.GroupId)
|
|
if id == "" {
|
|
klog.Warningf("found security group without id: %v", securityGroup)
|
|
continue
|
|
}
|
|
|
|
desiredGroupIds = append(desiredGroupIds, id)
|
|
}
|
|
|
|
// Run once for Client traffic
|
|
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, trafficPorts, lbName, clientCidrs, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// Run once for health check traffic
|
|
err = c.updateInstanceSecurityGroupsForNLBTraffic(actualGroups, desiredGroupIds, healthCheckPorts, lbName, vpcCidrBlocks, false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Cloud) ensureLoadBalancer(namespacedName types.NamespacedName, loadBalancerName string, listeners []*elb.Listener, subnetIDs []string, securityGroupIDs []string, internalELB, proxyProtocol bool, loadBalancerAttributes *elb.LoadBalancerAttributes, annotations map[string]string) (*elb.LoadBalancerDescription, error) {
|
|
loadBalancer, err := c.describeLoadBalancer(loadBalancerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
dirty := false
|
|
|
|
if loadBalancer == nil {
|
|
createRequest := &elb.CreateLoadBalancerInput{}
|
|
createRequest.LoadBalancerName = aws.String(loadBalancerName)
|
|
|
|
createRequest.Listeners = listeners
|
|
|
|
if internalELB {
|
|
createRequest.Scheme = aws.String("internal")
|
|
}
|
|
|
|
// We are supposed to specify one subnet per AZ.
|
|
// TODO: What happens if we have more than one subnet per AZ?
|
|
if subnetIDs == nil {
|
|
createRequest.Subnets = nil
|
|
} else {
|
|
createRequest.Subnets = aws.StringSlice(subnetIDs)
|
|
}
|
|
|
|
if securityGroupIDs == nil {
|
|
createRequest.SecurityGroups = nil
|
|
} else {
|
|
createRequest.SecurityGroups = aws.StringSlice(securityGroupIDs)
|
|
}
|
|
|
|
// Get additional tags set by the user
|
|
tags := getLoadBalancerAdditionalTags(annotations)
|
|
|
|
// Add default tags
|
|
tags[TagNameKubernetesService] = namespacedName.String()
|
|
tags = c.tagging.buildTags(ResourceLifecycleOwned, tags)
|
|
|
|
for k, v := range tags {
|
|
createRequest.Tags = append(createRequest.Tags, &elb.Tag{
|
|
Key: aws.String(k), Value: aws.String(v),
|
|
})
|
|
}
|
|
|
|
klog.Infof("Creating load balancer for %v with name: %s", namespacedName, loadBalancerName)
|
|
_, err := c.elb.CreateLoadBalancer(createRequest)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if proxyProtocol {
|
|
err = c.createProxyProtocolPolicy(loadBalancerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, listener := range listeners {
|
|
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to true", *listener.InstancePort)
|
|
err := c.setBackendPolicies(loadBalancerName, *listener.InstancePort, []*string{aws.String(ProxyProtocolPolicyName)})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
}
|
|
|
|
dirty = true
|
|
} else {
|
|
// TODO: Sync internal vs non-internal
|
|
|
|
{
|
|
// Sync subnets
|
|
expected := sets.NewString(subnetIDs...)
|
|
actual := stringSetFromPointers(loadBalancer.Subnets)
|
|
|
|
additions := expected.Difference(actual)
|
|
removals := actual.Difference(expected)
|
|
|
|
if removals.Len() != 0 {
|
|
request := &elb.DetachLoadBalancerFromSubnetsInput{}
|
|
request.LoadBalancerName = aws.String(loadBalancerName)
|
|
request.Subnets = stringSetToPointers(removals)
|
|
klog.V(2).Info("Detaching load balancer from removed subnets")
|
|
_, err := c.elb.DetachLoadBalancerFromSubnets(request)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error detaching AWS loadbalancer from subnets: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
|
|
if additions.Len() != 0 {
|
|
request := &elb.AttachLoadBalancerToSubnetsInput{}
|
|
request.LoadBalancerName = aws.String(loadBalancerName)
|
|
request.Subnets = stringSetToPointers(additions)
|
|
klog.V(2).Info("Attaching load balancer to added subnets")
|
|
_, err := c.elb.AttachLoadBalancerToSubnets(request)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error attaching AWS loadbalancer to subnets: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
{
|
|
// Sync security groups
|
|
expected := sets.NewString(securityGroupIDs...)
|
|
actual := stringSetFromPointers(loadBalancer.SecurityGroups)
|
|
|
|
if !expected.Equal(actual) {
|
|
// This call just replaces the security groups, unlike e.g. subnets (!)
|
|
request := &elb.ApplySecurityGroupsToLoadBalancerInput{}
|
|
request.LoadBalancerName = aws.String(loadBalancerName)
|
|
if securityGroupIDs == nil {
|
|
request.SecurityGroups = nil
|
|
} else {
|
|
request.SecurityGroups = aws.StringSlice(securityGroupIDs)
|
|
}
|
|
klog.V(2).Info("Applying updated security groups to load balancer")
|
|
_, err := c.elb.ApplySecurityGroupsToLoadBalancer(request)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error applying AWS loadbalancer security groups: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
{
|
|
// Sync listeners
|
|
listenerDescriptions := loadBalancer.ListenerDescriptions
|
|
|
|
foundSet := make(map[int]bool)
|
|
removals := []*int64{}
|
|
for _, listenerDescription := range listenerDescriptions {
|
|
actual := listenerDescription.Listener
|
|
if actual == nil {
|
|
klog.Warning("Ignoring empty listener in AWS loadbalancer: ", loadBalancerName)
|
|
continue
|
|
}
|
|
|
|
found := -1
|
|
for i, expected := range listeners {
|
|
if elbProtocolsAreEqual(actual.Protocol, expected.Protocol) {
|
|
continue
|
|
}
|
|
if elbProtocolsAreEqual(actual.InstanceProtocol, expected.InstanceProtocol) {
|
|
continue
|
|
}
|
|
if aws.Int64Value(actual.InstancePort) != aws.Int64Value(expected.InstancePort) {
|
|
continue
|
|
}
|
|
if aws.Int64Value(actual.LoadBalancerPort) != aws.Int64Value(expected.LoadBalancerPort) {
|
|
continue
|
|
}
|
|
if awsArnEquals(actual.SSLCertificateId, expected.SSLCertificateId) {
|
|
continue
|
|
}
|
|
found = i
|
|
}
|
|
if found != -1 {
|
|
foundSet[found] = true
|
|
} else {
|
|
removals = append(removals, actual.LoadBalancerPort)
|
|
}
|
|
}
|
|
|
|
additions := []*elb.Listener{}
|
|
for i := range listeners {
|
|
if foundSet[i] {
|
|
continue
|
|
}
|
|
additions = append(additions, listeners[i])
|
|
}
|
|
|
|
if len(removals) != 0 {
|
|
request := &elb.DeleteLoadBalancerListenersInput{}
|
|
request.LoadBalancerName = aws.String(loadBalancerName)
|
|
request.LoadBalancerPorts = removals
|
|
klog.V(2).Info("Deleting removed load balancer listeners")
|
|
_, err := c.elb.DeleteLoadBalancerListeners(request)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error deleting AWS loadbalancer listeners: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
|
|
if len(additions) != 0 {
|
|
request := &elb.CreateLoadBalancerListenersInput{}
|
|
request.LoadBalancerName = aws.String(loadBalancerName)
|
|
request.Listeners = additions
|
|
klog.V(2).Info("Creating added load balancer listeners")
|
|
_, err := c.elb.CreateLoadBalancerListeners(request)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating AWS loadbalancer listeners: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
{
|
|
// Sync proxy protocol state for new and existing listeners
|
|
|
|
proxyPolicies := make([]*string, 0)
|
|
if proxyProtocol {
|
|
// Ensure the backend policy exists
|
|
|
|
// NOTE The documentation for the AWS API indicates we could get an HTTP 400
|
|
// back if a policy of the same name already exists. However, the aws-sdk does not
|
|
// seem to return an error to us in these cases. Therefore, this will issue an API
|
|
// request every time.
|
|
err := c.createProxyProtocolPolicy(loadBalancerName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
proxyPolicies = append(proxyPolicies, aws.String(ProxyProtocolPolicyName))
|
|
}
|
|
|
|
foundBackends := make(map[int64]bool)
|
|
proxyProtocolBackends := make(map[int64]bool)
|
|
for _, backendListener := range loadBalancer.BackendServerDescriptions {
|
|
foundBackends[*backendListener.InstancePort] = false
|
|
proxyProtocolBackends[*backendListener.InstancePort] = proxyProtocolEnabled(backendListener)
|
|
}
|
|
|
|
for _, listener := range listeners {
|
|
setPolicy := false
|
|
instancePort := *listener.InstancePort
|
|
|
|
if currentState, ok := proxyProtocolBackends[instancePort]; !ok {
|
|
// This is a new ELB backend so we only need to worry about
|
|
// potentially adding a policy and not removing an
|
|
// existing one
|
|
setPolicy = proxyProtocol
|
|
} else {
|
|
foundBackends[instancePort] = true
|
|
// This is an existing ELB backend so we need to determine
|
|
// if the state changed
|
|
setPolicy = (currentState != proxyProtocol)
|
|
}
|
|
|
|
if setPolicy {
|
|
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to %t", instancePort, proxyProtocol)
|
|
err := c.setBackendPolicies(loadBalancerName, instancePort, proxyPolicies)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
// We now need to figure out if any backend policies need removed
|
|
// because these old policies will stick around even if there is no
|
|
// corresponding listener anymore
|
|
for instancePort, found := range foundBackends {
|
|
if !found {
|
|
klog.V(2).Infof("Adjusting AWS loadbalancer proxy protocol on node port %d. Setting to false", instancePort)
|
|
err := c.setBackendPolicies(loadBalancerName, instancePort, []*string{})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
}
|
|
|
|
{
|
|
// Add additional tags
|
|
klog.V(2).Infof("Creating additional load balancer tags for %s", loadBalancerName)
|
|
tags := getLoadBalancerAdditionalTags(annotations)
|
|
if len(tags) > 0 {
|
|
err := c.addLoadBalancerTags(loadBalancerName, tags)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("unable to create additional load balancer tags: %v", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Whether the ELB was new or existing, sync attributes regardless. This accounts for things
|
|
// that cannot be specified at the time of creation and can only be modified after the fact,
|
|
// e.g. idle connection timeout.
|
|
{
|
|
describeAttributesRequest := &elb.DescribeLoadBalancerAttributesInput{}
|
|
describeAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
|
|
describeAttributesOutput, err := c.elb.DescribeLoadBalancerAttributes(describeAttributesRequest)
|
|
if err != nil {
|
|
klog.Warning("Unable to retrieve load balancer attributes during attribute sync")
|
|
return nil, err
|
|
}
|
|
|
|
foundAttributes := &describeAttributesOutput.LoadBalancerAttributes
|
|
|
|
// Update attributes if they're dirty
|
|
if !reflect.DeepEqual(loadBalancerAttributes, foundAttributes) {
|
|
klog.V(2).Infof("Updating load-balancer attributes for %q", loadBalancerName)
|
|
|
|
modifyAttributesRequest := &elb.ModifyLoadBalancerAttributesInput{}
|
|
modifyAttributesRequest.LoadBalancerName = aws.String(loadBalancerName)
|
|
modifyAttributesRequest.LoadBalancerAttributes = loadBalancerAttributes
|
|
_, err = c.elb.ModifyLoadBalancerAttributes(modifyAttributesRequest)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("Unable to update load balancer attributes during attribute sync: %q", err)
|
|
}
|
|
dirty = true
|
|
}
|
|
}
|
|
|
|
if dirty {
|
|
loadBalancer, err = c.describeLoadBalancer(loadBalancerName)
|
|
if err != nil {
|
|
klog.Warning("Unable to retrieve load balancer after creation/update")
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
return loadBalancer, nil
|
|
}
|
|
|
|
func createSubnetMappings(subnetIDs []string) []*elbv2.SubnetMapping {
|
|
response := []*elbv2.SubnetMapping{}
|
|
|
|
for _, id := range subnetIDs {
|
|
// Ignore AllocationId for now
|
|
response = append(response, &elbv2.SubnetMapping{SubnetId: aws.String(id)})
|
|
}
|
|
|
|
return response
|
|
}
|
|
|
|
// elbProtocolsAreEqual checks if two ELB protocol strings are considered the same
|
|
// Comparison is case insensitive
|
|
func elbProtocolsAreEqual(l, r *string) bool {
|
|
if l == nil || r == nil {
|
|
return l == r
|
|
}
|
|
return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
|
|
}
|
|
|
|
// awsArnEquals checks if two ARN strings are considered the same
|
|
// Comparison is case insensitive
|
|
func awsArnEquals(l, r *string) bool {
|
|
if l == nil || r == nil {
|
|
return l == r
|
|
}
|
|
return strings.EqualFold(aws.StringValue(l), aws.StringValue(r))
|
|
}
|
|
|
|
// getExpectedHealthCheck returns an elb.Healthcheck for the provided target
|
|
// and using either sensible defaults or overrides via Service annotations
|
|
func (c *Cloud) getExpectedHealthCheck(target string, annotations map[string]string) (*elb.HealthCheck, error) {
|
|
healthcheck := &elb.HealthCheck{Target: &target}
|
|
getOrDefault := func(annotation string, defaultValue int64) (*int64, error) {
|
|
i64 := defaultValue
|
|
var err error
|
|
if s, ok := annotations[annotation]; ok {
|
|
i64, err = strconv.ParseInt(s, 10, 0)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed parsing health check annotation value: %v", err)
|
|
}
|
|
}
|
|
return &i64, nil
|
|
}
|
|
var err error
|
|
healthcheck.HealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCHealthyThreshold, defaultHCHealthyThreshold)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
healthcheck.UnhealthyThreshold, err = getOrDefault(ServiceAnnotationLoadBalancerHCUnhealthyThreshold, defaultHCUnhealthyThreshold)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
healthcheck.Timeout, err = getOrDefault(ServiceAnnotationLoadBalancerHCTimeout, defaultHCTimeout)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
healthcheck.Interval, err = getOrDefault(ServiceAnnotationLoadBalancerHCInterval, defaultHCInterval)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if err = healthcheck.Validate(); err != nil {
|
|
return nil, fmt.Errorf("some of the load balancer health check parameters are invalid: %v", err)
|
|
}
|
|
return healthcheck, nil
|
|
}
|
|
|
|
// Makes sure that the health check for an ELB matches the configured health check node port
|
|
func (c *Cloud) ensureLoadBalancerHealthCheck(loadBalancer *elb.LoadBalancerDescription, protocol string, port int32, path string, annotations map[string]string) error {
|
|
name := aws.StringValue(loadBalancer.LoadBalancerName)
|
|
|
|
actual := loadBalancer.HealthCheck
|
|
expectedTarget := protocol + ":" + strconv.FormatInt(int64(port), 10) + path
|
|
expected, err := c.getExpectedHealthCheck(expectedTarget, annotations)
|
|
if err != nil {
|
|
return fmt.Errorf("cannot update health check for load balancer %q: %q", name, err)
|
|
}
|
|
|
|
// comparing attributes 1 by 1 to avoid breakage in case a new field is
|
|
// added to the HC which breaks the equality
|
|
if aws.StringValue(expected.Target) == aws.StringValue(actual.Target) &&
|
|
aws.Int64Value(expected.HealthyThreshold) == aws.Int64Value(actual.HealthyThreshold) &&
|
|
aws.Int64Value(expected.UnhealthyThreshold) == aws.Int64Value(actual.UnhealthyThreshold) &&
|
|
aws.Int64Value(expected.Interval) == aws.Int64Value(actual.Interval) &&
|
|
aws.Int64Value(expected.Timeout) == aws.Int64Value(actual.Timeout) {
|
|
return nil
|
|
}
|
|
|
|
request := &elb.ConfigureHealthCheckInput{}
|
|
request.HealthCheck = expected
|
|
request.LoadBalancerName = loadBalancer.LoadBalancerName
|
|
|
|
_, err = c.elb.ConfigureHealthCheck(request)
|
|
if err != nil {
|
|
return fmt.Errorf("error configuring load balancer health check for %q: %q", name, err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Makes sure that exactly the specified hosts are registered as instances with the load balancer
|
|
func (c *Cloud) ensureLoadBalancerInstances(loadBalancerName string, lbInstances []*elb.Instance, instanceIDs map[InstanceID]*ec2.Instance) error {
|
|
expected := sets.NewString()
|
|
for id := range instanceIDs {
|
|
expected.Insert(string(id))
|
|
}
|
|
|
|
actual := sets.NewString()
|
|
for _, lbInstance := range lbInstances {
|
|
actual.Insert(aws.StringValue(lbInstance.InstanceId))
|
|
}
|
|
|
|
additions := expected.Difference(actual)
|
|
removals := actual.Difference(expected)
|
|
|
|
addInstances := []*elb.Instance{}
|
|
for _, instanceID := range additions.List() {
|
|
addInstance := &elb.Instance{}
|
|
addInstance.InstanceId = aws.String(instanceID)
|
|
addInstances = append(addInstances, addInstance)
|
|
}
|
|
|
|
removeInstances := []*elb.Instance{}
|
|
for _, instanceID := range removals.List() {
|
|
removeInstance := &elb.Instance{}
|
|
removeInstance.InstanceId = aws.String(instanceID)
|
|
removeInstances = append(removeInstances, removeInstance)
|
|
}
|
|
|
|
if len(addInstances) > 0 {
|
|
registerRequest := &elb.RegisterInstancesWithLoadBalancerInput{}
|
|
registerRequest.Instances = addInstances
|
|
registerRequest.LoadBalancerName = aws.String(loadBalancerName)
|
|
_, err := c.elb.RegisterInstancesWithLoadBalancer(registerRequest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
klog.V(1).Infof("Instances added to load-balancer %s", loadBalancerName)
|
|
}
|
|
|
|
if len(removeInstances) > 0 {
|
|
deregisterRequest := &elb.DeregisterInstancesFromLoadBalancerInput{}
|
|
deregisterRequest.Instances = removeInstances
|
|
deregisterRequest.LoadBalancerName = aws.String(loadBalancerName)
|
|
_, err := c.elb.DeregisterInstancesFromLoadBalancer(deregisterRequest)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
klog.V(1).Infof("Instances removed from load-balancer %s", loadBalancerName)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Cloud) getLoadBalancerTLSPorts(loadBalancer *elb.LoadBalancerDescription) []int64 {
|
|
ports := []int64{}
|
|
|
|
for _, listenerDescription := range loadBalancer.ListenerDescriptions {
|
|
protocol := aws.StringValue(listenerDescription.Listener.Protocol)
|
|
if protocol == "SSL" || protocol == "HTTPS" {
|
|
ports = append(ports, aws.Int64Value(listenerDescription.Listener.LoadBalancerPort))
|
|
}
|
|
}
|
|
return ports
|
|
}
|
|
|
|
func (c *Cloud) ensureSSLNegotiationPolicy(loadBalancer *elb.LoadBalancerDescription, policyName string) error {
|
|
klog.V(2).Info("Describing load balancer policies on load balancer")
|
|
result, err := c.elb.DescribeLoadBalancerPolicies(&elb.DescribeLoadBalancerPoliciesInput{
|
|
LoadBalancerName: loadBalancer.LoadBalancerName,
|
|
PolicyNames: []*string{
|
|
aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
|
|
},
|
|
})
|
|
if err != nil {
|
|
if aerr, ok := err.(awserr.Error); ok {
|
|
switch aerr.Code() {
|
|
case elb.ErrCodePolicyNotFoundException:
|
|
default:
|
|
return fmt.Errorf("error describing security policies on load balancer: %q", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(result.PolicyDescriptions) > 0 {
|
|
return nil
|
|
}
|
|
|
|
klog.V(2).Infof("Creating SSL negotiation policy '%s' on load balancer", fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName))
|
|
// there is an upper limit of 98 policies on an ELB, we're pretty safe from
|
|
// running into it
|
|
_, err = c.elb.CreateLoadBalancerPolicy(&elb.CreateLoadBalancerPolicyInput{
|
|
LoadBalancerName: loadBalancer.LoadBalancerName,
|
|
PolicyName: aws.String(fmt.Sprintf(SSLNegotiationPolicyNameFormat, policyName)),
|
|
PolicyTypeName: aws.String("SSLNegotiationPolicyType"),
|
|
PolicyAttributes: []*elb.PolicyAttribute{
|
|
{
|
|
AttributeName: aws.String("Reference-Security-Policy"),
|
|
AttributeValue: aws.String(policyName),
|
|
},
|
|
},
|
|
})
|
|
if err != nil {
|
|
return fmt.Errorf("error creating security policy on load balancer: %q", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cloud) setSSLNegotiationPolicy(loadBalancerName, sslPolicyName string, port int64) error {
|
|
policyName := fmt.Sprintf(SSLNegotiationPolicyNameFormat, sslPolicyName)
|
|
request := &elb.SetLoadBalancerPoliciesOfListenerInput{
|
|
LoadBalancerName: aws.String(loadBalancerName),
|
|
LoadBalancerPort: aws.Int64(port),
|
|
PolicyNames: []*string{
|
|
aws.String(policyName),
|
|
},
|
|
}
|
|
klog.V(2).Infof("Setting SSL negotiation policy '%s' on load balancer", policyName)
|
|
_, err := c.elb.SetLoadBalancerPoliciesOfListener(request)
|
|
if err != nil {
|
|
return fmt.Errorf("error setting SSL negotiation policy '%s' on load balancer: %q", policyName, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (c *Cloud) createProxyProtocolPolicy(loadBalancerName string) error {
|
|
request := &elb.CreateLoadBalancerPolicyInput{
|
|
LoadBalancerName: aws.String(loadBalancerName),
|
|
PolicyName: aws.String(ProxyProtocolPolicyName),
|
|
PolicyTypeName: aws.String("ProxyProtocolPolicyType"),
|
|
PolicyAttributes: []*elb.PolicyAttribute{
|
|
{
|
|
AttributeName: aws.String("ProxyProtocol"),
|
|
AttributeValue: aws.String("true"),
|
|
},
|
|
},
|
|
}
|
|
klog.V(2).Info("Creating proxy protocol policy on load balancer")
|
|
_, err := c.elb.CreateLoadBalancerPolicy(request)
|
|
if err != nil {
|
|
return fmt.Errorf("error creating proxy protocol policy on load balancer: %q", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *Cloud) setBackendPolicies(loadBalancerName string, instancePort int64, policies []*string) error {
|
|
request := &elb.SetLoadBalancerPoliciesForBackendServerInput{
|
|
InstancePort: aws.Int64(instancePort),
|
|
LoadBalancerName: aws.String(loadBalancerName),
|
|
PolicyNames: policies,
|
|
}
|
|
if len(policies) > 0 {
|
|
klog.V(2).Infof("Adding AWS loadbalancer backend policies on node port %d", instancePort)
|
|
} else {
|
|
klog.V(2).Infof("Removing AWS loadbalancer backend policies on node port %d", instancePort)
|
|
}
|
|
_, err := c.elb.SetLoadBalancerPoliciesForBackendServer(request)
|
|
if err != nil {
|
|
return fmt.Errorf("error adjusting AWS loadbalancer backend policies: %q", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func proxyProtocolEnabled(backend *elb.BackendServerDescription) bool {
|
|
for _, policy := range backend.PolicyNames {
|
|
if aws.StringValue(policy) == ProxyProtocolPolicyName {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// findInstancesForELB gets the EC2 instances corresponding to the Nodes, for setting up an ELB
|
|
// We ignore Nodes (with a log message) where the instanceid cannot be determined from the provider,
|
|
// and we ignore instances which are not found
|
|
func (c *Cloud) findInstancesForELB(nodes []*v1.Node) (map[InstanceID]*ec2.Instance, error) {
|
|
// Map to instance ids ignoring Nodes where we cannot find the id (but logging)
|
|
instanceIDs := mapToAWSInstanceIDsTolerant(nodes)
|
|
|
|
cacheCriteria := cacheCriteria{
|
|
// MaxAge not required, because we only care about security groups, which should not change
|
|
HasInstances: instanceIDs, // Refresh if any of the instance ids are missing
|
|
}
|
|
snapshot, err := c.instanceCache.describeAllInstancesCached(cacheCriteria)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
instances := snapshot.FindInstances(instanceIDs)
|
|
// We ignore instances that cannot be found
|
|
|
|
return instances, nil
|
|
}
|