2019-09-27 21:51:53 +00:00
// +build !providerless
/ *
Copyright 2016 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package openstack
import (
"context"
"fmt"
"net"
"reflect"
"strings"
"time"
"github.com/gophercloud/gophercloud"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/external"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
v2monitors "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
v2pools "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups"
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
"github.com/gophercloud/gophercloud/openstack/networking/v2/networks"
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
"github.com/gophercloud/gophercloud/pagination"
2020-08-10 17:43:49 +00:00
"k8s.io/klog/v2"
2019-09-27 21:51:53 +00:00
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
cloudprovider "k8s.io/cloud-provider"
servicehelpers "k8s.io/cloud-provider/service/helpers"
)
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
const (
// loadbalancerActive* is configuration of exponential backoff for
// going into ACTIVE loadbalancer provisioning status. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
// it will time out after 128s, which roughly corresponds to 120s
loadbalancerActiveInitDelay = 1 * time . Second
loadbalancerActiveFactor = 1.2
loadbalancerActiveSteps = 19
// loadbalancerDelete* is configuration of exponential backoff for
// waiting for delete operation to complete. Starting with 1
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
// it will time out after 32s, which roughly corresponds to 30s
loadbalancerDeleteInitDelay = 1 * time . Second
loadbalancerDeleteFactor = 1.2
loadbalancerDeleteSteps = 13
activeStatus = "ACTIVE"
errorStatus = "ERROR"
ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
ServiceAnnotationLoadBalancerSubnetID = "loadbalancer.openstack.org/subnet-id"
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
// to indicate that we want an internal loadbalancer service.
// If the value of ServiceAnnotationLoadBalancerInternal is false, it indicates that we want an external loadbalancer service. Default to false.
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
)
var _ cloudprovider . LoadBalancer = ( * LbaasV2 ) ( nil )
// LbaasV2 is a LoadBalancer implementation for Neutron LBaaS v2 API
type LbaasV2 struct {
LoadBalancer
}
func networkExtensions ( client * gophercloud . ServiceClient ) ( map [ string ] bool , error ) {
seen := make ( map [ string ] bool )
pager := extensions . List ( client )
err := pager . EachPage ( func ( page pagination . Page ) ( bool , error ) {
exts , err := extensions . ExtractExtensions ( page )
if err != nil {
return false , err
}
for _ , ext := range exts {
seen [ ext . Alias ] = true
}
return true , nil
} )
return seen , err
}
func getFloatingIPByPortID ( client * gophercloud . ServiceClient , portID string ) ( * floatingips . FloatingIP , error ) {
opts := floatingips . ListOpts {
PortID : portID ,
}
pager := floatingips . List ( client , opts )
floatingIPList := make ( [ ] floatingips . FloatingIP , 0 , 1 )
err := pager . EachPage ( func ( page pagination . Page ) ( bool , error ) {
f , err := floatingips . ExtractFloatingIPs ( page )
if err != nil {
return false , err
}
floatingIPList = append ( floatingIPList , f ... )
if len ( floatingIPList ) > 1 {
return false , ErrMultipleResults
}
return true , nil
} )
if err != nil {
if isNotFound ( err ) {
return nil , ErrNotFound
}
return nil , err
}
if len ( floatingIPList ) == 0 {
return nil , ErrNotFound
} else if len ( floatingIPList ) > 1 {
return nil , ErrMultipleResults
}
return & floatingIPList [ 0 ] , nil
}
func getLoadbalancerByName ( client * gophercloud . ServiceClient , name string ) ( * loadbalancers . LoadBalancer , error ) {
opts := loadbalancers . ListOpts {
Name : name ,
}
pager := loadbalancers . List ( client , opts )
loadbalancerList := make ( [ ] loadbalancers . LoadBalancer , 0 , 1 )
err := pager . EachPage ( func ( page pagination . Page ) ( bool , error ) {
v , err := loadbalancers . ExtractLoadBalancers ( page )
if err != nil {
return false , err
}
loadbalancerList = append ( loadbalancerList , v ... )
if len ( loadbalancerList ) > 1 {
return false , ErrMultipleResults
}
return true , nil
} )
if err != nil {
if isNotFound ( err ) {
return nil , ErrNotFound
}
return nil , err
}
if len ( loadbalancerList ) == 0 {
return nil , ErrNotFound
} else if len ( loadbalancerList ) > 1 {
return nil , ErrMultipleResults
}
return & loadbalancerList [ 0 ] , nil
}
func getListenersByLoadBalancerID ( client * gophercloud . ServiceClient , id string ) ( [ ] listeners . Listener , error ) {
var existingListeners [ ] listeners . Listener
err := listeners . List ( client , listeners . ListOpts { LoadbalancerID : id } ) . EachPage ( func ( page pagination . Page ) ( bool , error ) {
listenerList , err := listeners . ExtractListeners ( page )
if err != nil {
return false , err
}
for _ , l := range listenerList {
for _ , lb := range l . Loadbalancers {
if lb . ID == id {
existingListeners = append ( existingListeners , l )
break
}
}
}
return true , nil
} )
if err != nil {
return nil , err
}
return existingListeners , nil
}
// get listener for a port or nil if does not exist
func getListenerForPort ( existingListeners [ ] listeners . Listener , port v1 . ServicePort ) * listeners . Listener {
for _ , l := range existingListeners {
if listeners . Protocol ( l . Protocol ) == toListenersProtocol ( port . Protocol ) && l . ProtocolPort == int ( port . Port ) {
return & l
}
}
return nil
}
// Get pool for a listener. A listener always has exactly one pool.
func getPoolByListenerID ( client * gophercloud . ServiceClient , loadbalancerID string , listenerID string ) ( * v2pools . Pool , error ) {
listenerPools := make ( [ ] v2pools . Pool , 0 , 1 )
err := v2pools . List ( client , v2pools . ListOpts { LoadbalancerID : loadbalancerID } ) . EachPage ( func ( page pagination . Page ) ( bool , error ) {
poolsList , err := v2pools . ExtractPools ( page )
if err != nil {
return false , err
}
for _ , p := range poolsList {
for _ , l := range p . Listeners {
if l . ID == listenerID {
listenerPools = append ( listenerPools , p )
}
}
}
if len ( listenerPools ) > 1 {
return false , ErrMultipleResults
}
return true , nil
} )
if err != nil {
if isNotFound ( err ) {
return nil , ErrNotFound
}
return nil , err
}
if len ( listenerPools ) == 0 {
return nil , ErrNotFound
} else if len ( listenerPools ) > 1 {
return nil , ErrMultipleResults
}
return & listenerPools [ 0 ] , nil
}
func getMembersByPoolID ( client * gophercloud . ServiceClient , id string ) ( [ ] v2pools . Member , error ) {
var members [ ] v2pools . Member
err := v2pools . ListMembers ( client , id , v2pools . ListMembersOpts { } ) . EachPage ( func ( page pagination . Page ) ( bool , error ) {
membersList , err := v2pools . ExtractMembers ( page )
if err != nil {
return false , err
}
members = append ( members , membersList ... )
return true , nil
} )
if err != nil {
return nil , err
}
return members , nil
}
// Check if a member exists for node
func memberExists ( members [ ] v2pools . Member , addr string , port int ) bool {
for _ , member := range members {
if member . Address == addr && member . ProtocolPort == port {
return true
}
}
return false
}
func popListener ( existingListeners [ ] listeners . Listener , id string ) [ ] listeners . Listener {
for i , existingListener := range existingListeners {
if existingListener . ID == id {
existingListeners [ i ] = existingListeners [ len ( existingListeners ) - 1 ]
existingListeners = existingListeners [ : len ( existingListeners ) - 1 ]
break
}
}
return existingListeners
}
func popMember ( members [ ] v2pools . Member , addr string , port int ) [ ] v2pools . Member {
for i , member := range members {
if member . Address == addr && member . ProtocolPort == port {
members [ i ] = members [ len ( members ) - 1 ]
members = members [ : len ( members ) - 1 ]
}
}
return members
}
func getSecurityGroupName ( service * v1 . Service ) string {
securityGroupName := fmt . Sprintf ( "lb-sg-%s-%s-%s" , service . UID , service . Namespace , service . Name )
//OpenStack requires that the name of a security group is shorter than 255 bytes.
if len ( securityGroupName ) > 255 {
securityGroupName = securityGroupName [ : 255 ]
}
return securityGroupName
}
func getSecurityGroupRules ( client * gophercloud . ServiceClient , opts rules . ListOpts ) ( [ ] rules . SecGroupRule , error ) {
pager := rules . List ( client , opts )
var securityRules [ ] rules . SecGroupRule
err := pager . EachPage ( func ( page pagination . Page ) ( bool , error ) {
ruleList , err := rules . ExtractRules ( page )
if err != nil {
return false , err
}
securityRules = append ( securityRules , ruleList ... )
return true , nil
} )
if err != nil {
return nil , err
}
return securityRules , nil
}
func waitLoadbalancerActiveProvisioningStatus ( client * gophercloud . ServiceClient , loadbalancerID string ) ( string , error ) {
backoff := wait . Backoff {
Duration : loadbalancerActiveInitDelay ,
Factor : loadbalancerActiveFactor ,
Steps : loadbalancerActiveSteps ,
}
var provisioningStatus string
err := wait . ExponentialBackoff ( backoff , func ( ) ( bool , error ) {
loadbalancer , err := loadbalancers . Get ( client , loadbalancerID ) . Extract ( )
if err != nil {
return false , err
}
provisioningStatus = loadbalancer . ProvisioningStatus
if loadbalancer . ProvisioningStatus == activeStatus {
return true , nil
} else if loadbalancer . ProvisioningStatus == errorStatus {
return true , fmt . Errorf ( "loadbalancer has gone into ERROR state" )
} else {
return false , nil
}
} )
if err == wait . ErrWaitTimeout {
err = fmt . Errorf ( "loadbalancer failed to go into ACTIVE provisioning status within alloted time" )
}
return provisioningStatus , err
}
func waitLoadbalancerDeleted ( client * gophercloud . ServiceClient , loadbalancerID string ) error {
backoff := wait . Backoff {
Duration : loadbalancerDeleteInitDelay ,
Factor : loadbalancerDeleteFactor ,
Steps : loadbalancerDeleteSteps ,
}
err := wait . ExponentialBackoff ( backoff , func ( ) ( bool , error ) {
_ , err := loadbalancers . Get ( client , loadbalancerID ) . Extract ( )
if err != nil {
if isNotFound ( err ) {
return true , nil
}
return false , err
}
return false , nil
} )
if err == wait . ErrWaitTimeout {
err = fmt . Errorf ( "loadbalancer failed to delete within the alloted time" )
}
return err
}
func toRuleProtocol ( protocol v1 . Protocol ) rules . RuleProtocol {
switch protocol {
case v1 . ProtocolTCP :
return rules . ProtocolTCP
case v1 . ProtocolUDP :
return rules . ProtocolUDP
default :
return rules . RuleProtocol ( strings . ToLower ( string ( protocol ) ) )
}
}
func toListenersProtocol ( protocol v1 . Protocol ) listeners . Protocol {
switch protocol {
case v1 . ProtocolTCP :
return listeners . ProtocolTCP
default :
return listeners . Protocol ( string ( protocol ) )
}
}
func createNodeSecurityGroup ( client * gophercloud . ServiceClient , nodeSecurityGroupID string , port int , protocol v1 . Protocol , lbSecGroup string ) error {
v4NodeSecGroupRuleCreateOpts := rules . CreateOpts {
Direction : rules . DirIngress ,
PortRangeMax : port ,
PortRangeMin : port ,
Protocol : toRuleProtocol ( protocol ) ,
RemoteGroupID : lbSecGroup ,
SecGroupID : nodeSecurityGroupID ,
EtherType : rules . EtherType4 ,
}
v6NodeSecGroupRuleCreateOpts := rules . CreateOpts {
Direction : rules . DirIngress ,
PortRangeMax : port ,
PortRangeMin : port ,
Protocol : toRuleProtocol ( protocol ) ,
RemoteGroupID : lbSecGroup ,
SecGroupID : nodeSecurityGroupID ,
EtherType : rules . EtherType6 ,
}
_ , err := rules . Create ( client , v4NodeSecGroupRuleCreateOpts ) . Extract ( )
if err != nil {
return err
}
_ , err = rules . Create ( client , v6NodeSecGroupRuleCreateOpts ) . Extract ( )
if err != nil {
return err
}
return nil
}
func ( lbaas * LbaasV2 ) createLoadBalancer ( service * v1 . Service , name string , internalAnnotation bool ) ( * loadbalancers . LoadBalancer , error ) {
createOpts := loadbalancers . CreateOpts {
Name : name ,
Description : fmt . Sprintf ( "Kubernetes external service %s" , name ) ,
VipSubnetID : lbaas . opts . SubnetID ,
Provider : lbaas . opts . LBProvider ,
}
loadBalancerIP := service . Spec . LoadBalancerIP
if loadBalancerIP != "" && internalAnnotation {
createOpts . VipAddress = loadBalancerIP
}
loadbalancer , err := loadbalancers . Create ( lbaas . lb , createOpts ) . Extract ( )
if e rr != nil {
return nil , fmt . Errorf ( "error creating loadbalancer %v: %v" , createOpts , err )
}
return loadbalancer , nil
}
// GetLoadBalancer returns whether the specified load balancer exists and its status
func ( lbaas * LbaasV2 ) GetLoadBalancer ( ctx context . Context , clusterName string , service * v1 . Service ) ( * v1 . LoadBalancerStatus , bool , error ) {
loadBalancerName := lbaas . GetLoadBalancerName ( ctx , clusterName , service )
loadbalancer , err := getLoadbalancerByName ( lbaas . lb , loadBalancerName )
if err == ErrNotFound {
return nil , false , nil
}
if loadbalancer == nil {
return nil , false , err
}
status := & v1 . LoadBalancerStatus { }
portID := loadbalancer . VipPortID
if portID != "" {
floatIP , err := getFloatingIPByPortID ( lbaas . network , portID )
if err != nil && err != ErrNotFound {
return nil , false , fmt . Errorf ( "error getting floating ip for port %s: %v" , portID , err )
}
if floatIP != nil {
status . Ingress = [ ] v1 . LoadBalancerIngress { { IP : floatIP . FloatingIP } }
}
} else {
status . Ingress = [ ] v1 . LoadBalancerIngress { { IP : loadbalancer . VipAddress } }
}
return status , true , err
}
// GetLoadBalancerName is an implementation of LoadBalancer.GetLoadBalancerName.
func ( lbaas * LbaasV2 ) GetLoadBalancerName ( ctx context . Context , clusterName string , service * v1 . Service ) string {
// TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
return cloudprovider . DefaultLoadBalancerName ( service )
}
// The LB needs to be configured with instance addresses on the same
// subnet as the LB (aka opts.SubnetID). Currently we're just
// guessing that the node's InternalIP is the right address.
// In case no InternalIP can be found, ExternalIP is tried.
// If neither InternalIP nor ExternalIP can be found an error is
// returned.
func nodeAddressForLB ( node * v1 . Node ) ( string , error ) {
addrs := node . Status . Addresses
if len ( addrs ) == 0 {
return "" , ErrNoAddressFound
}
allowedAddrTypes := [ ] v1 . NodeAddressType { v1 . NodeInternalIP , v1 . NodeExternalIP }
for _ , allowedAddrType := range allowedAddrTypes {
for _ , addr := range addrs {
if addr . Type == allowedAddrType {
return addr . Address , nil
}
}
}
return "" , ErrNoAddressFound
}
//getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
func getStringFromServiceAnnotation ( service * v1 . Service , annotationKey string , defaultSetting string ) string {
klog . V ( 4 ) . Infof ( "getStringFromServiceAnnotation(%v, %v, %v)" , service , annotationKey , defaultSetting )
if annotationValue , ok := service . Annotations [ annotationKey ] ; ok {
//if there is an annotation for this setting, set the "setting" var to it
// annotationValue can be empty, it is working as designed
// it makes possible for instance provisioning loadbalancer without floatingip
klog . V ( 4 ) . Infof ( "Found a Service Annotation: %v = %v" , annotationKey , annotationValue )
return annotationValue
}
//if there is no annotation, set "settings" var to the value from cloud config
klog . V ( 4 ) . Infof ( "Could not find a Service Annotation; falling back on cloud-config setting: %v = %v" , annotationKey , defaultSetting )
return defaultSetting
}
// getSubnetIDForLB returns subnet-id for a specific node
func getSubnetIDForLB ( compute * gophercloud . ServiceClient , node v1 . Node ) ( string , error ) {
ipAddress , err := nodeAddressForLB ( & node )
if err != nil {
return "" , err
}
instanceID := node . Spec . ProviderID
if ind := strings . LastIndex ( instanceID , "/" ) ; ind >= 0 {
instanceID = instanceID [ ( ind + 1 ) : ]
}
interfaces , err := getAttachedInterfacesByID ( compute , instanceID )
if err != nil {
return "" , err
}
for _ , intf := range interfaces {
for _ , fixedIP := range intf . FixedIPs {
if fixedIP . IPAddress == ipAddress {
return fixedIP . SubnetID , nil
}
}
}
return "" , ErrNotFound
}
// getNodeSecurityGroupIDForLB lists node-security-groups for specific nodes
func getNodeSecurityGroupIDForLB ( compute * gophercloud . ServiceClient , network * gophercloud . ServiceClient , nodes [ ] * v1 . Node ) ( [ ] string , error ) {
secGroupNames := sets . NewString ( )
for _ , node := range nodes {
nodeName := types . NodeName ( node . Name )
srv , err := getServerByName ( compute , nodeName )
if err != nil {
return [ ] string { } , err
}
// use the first node-security-groups
// case 0: node1:SG1 node2:SG1 return SG1
// case 1: node1:SG1 node2:SG2 return SG1,SG2
// case 2: node1:SG1,SG2 node2:SG3,SG4 return SG1,SG3
// case 3: node1:SG1,SG2 node2:SG2,SG3 return SG1,SG2
secGroupNames . Insert ( srv . SecurityGroups [ 0 ] [ "name" ] . ( string ) )
}
secGroupIDs := make ( [ ] string , secGroupNames . Len ( ) )
for i , name := range secGroupNames . List ( ) {
secGroupID , err := groups . IDFromName ( network , name )
if err != nil {
return [ ] string { } , err
}
secGroupIDs [ i ] = secGroupID
}
return secGroupIDs , nil
}
// isSecurityGroupNotFound return true while 'err' is object of gophercloud.ErrResourceNotFound
func isSecurityGroupNotFound ( err error ) bool {
errType := reflect . TypeOf ( err ) . String ( )
errTypeSlice := strings . Split ( errType , "." )
errTypeValue := ""
if len ( errTypeSlice ) != 0 {
errTypeValue = errTypeSlice [ len ( errTypeSlice ) - 1 ]
}
if errTypeValue == "ErrResourceNotFound" {
return true
}
return false
}
// getFloatingNetworkIDForLB returns a floating-network-id for cluster.
func getFloatingNetworkIDForLB ( client * gophercloud . ServiceClient ) ( string , error ) {
var floatingNetworkIds [ ] string
type NetworkWithExternalExt struct {
networks . Network
external . NetworkExternalExt
}
err := networks . List ( client , networks . ListOpts { } ) . EachPage ( func ( page pagination . Page ) ( bool , error ) {
var externalNetwork [ ] NetworkWithExternalExt
err := networks . ExtractNetworksInto ( page , & externalNetwork )
if err != nil {
return false , err
}
for _ , externalNet := range externalNetwork {
if externalNet . External {
floatingNetworkIds = append ( floatingNetworkIds , externalNet . ID )
}
}
if len ( floatingNetworkIds ) > 1 {
return false , ErrMultipleResults
}
return true , nil
} )
if err != nil {
if isNotFound ( err ) {
return "" , ErrNotFound
}
if err == ErrMultipleResults {
klog . V ( 4 ) . Infof ( "find multiple external networks, pick the first one when there are no explicit configuration." )
return floatingNetworkIds [ 0 ] , nil
}
return "" , err
}
if len ( floatingNetworkIds ) == 0 {
return "" , ErrNotFound
}
return floatingNetworkIds [ 0 ] , nil
}
// TODO: This code currently ignores 'region' and always creates a
// loadbalancer in only the current OpenStack region. We should take
// a list of regions (from config) and query/create loadbalancers in
// each region.
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
func ( lbaas * LbaasV2 ) EnsureLoadBalancer ( ctx context . Context , clusterName string , apiService * v1 . Service , nodes [ ] * v1 . Node ) ( * v1 . LoadBalancerStatus , error ) {
klog . V ( 4 ) . Infof ( "EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)" , clusterName , apiService . Namespace , apiService . Name , apiService . Spec . LoadBalancerIP , apiService . Spec . Ports , nodes , apiService . Annotations )
if len ( nodes ) == 0 {
return nil , fmt . Errorf ( "there are no available nodes for LoadBalancer service %s/%s" , apiService . Namespace , apiService . Name )
}
lbaas . opts . SubnetID = getStringFromServiceAnnotation ( apiService , ServiceAnnotationLoadBalancerSubnetID , lbaas . opts . SubnetID )
if len ( lbaas . opts . SubnetID ) == 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID , err := getSubnetIDForLB ( lbaas . compute , * nodes [ 0 ] )
if err != nil {
klog . Warningf ( "Failed to find subnet-id for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
return nil , fmt . Errorf ( "no subnet-id for service %s/%s : subnet-id not set in cloud provider config, " +
"and failed to find subnet-id from OpenStack: %v" , apiService . Namespace , apiService . Name , err )
}
lbaas . opts . SubnetID = subnetID
}
ports := apiService . Spec . Ports
if len ( ports ) == 0 {
return nil , fmt . Errorf ( "no ports provided to openstack load balancer" )
}
floatingPool := getStringFromServiceAnnotation ( apiService , ServiceAnnotationLoadBalancerFloatingNetworkID , lbaas . opts . FloatingNetworkID )
if len ( floatingPool ) == 0 {
var err error
floatingPool , err = getFloatingNetworkIDForLB ( lbaas . network )
if err != nil {
klog . Warningf ( "Failed to find floating-network-id for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
}
var internalAnnotation bool
internal := getStringFromServiceAnnotation ( apiService , ServiceAnnotationLoadBalancerInternal , "false" )
switch internal {
case "true" :
klog . V ( 4 ) . Info ( "Ensure an internal loadbalancer service." )
internalAnnotation = true
case "false" :
if len ( floatingPool ) != 0 {
klog . V ( 4 ) . Infof ( "Ensure an external loadbalancer service, using floatingPool: %v" , floatingPool )
internalAnnotation = false
} else {
return nil , fmt . Errorf ( "floating-network-id or loadbalancer.openstack.org/floating-network-id should be specified when ensuring an external loadbalancer service" )
}
default :
return nil , fmt . Errorf ( "unknown service.beta.kubernetes.io/openstack-internal-load-balancer annotation: %v, specify \"true\" or \"false\" " ,
internal )
}
// Check for TCP protocol on each port
// TODO: Convert all error messages to use an event recorder
for _ , port := range ports {
if port . Protocol != v1 . ProtocolTCP {
return nil , fmt . Errorf ( "only TCP LoadBalancer is supported for openstack load balancers" )
}
}
sourceRanges , err := servicehelpers . GetLoadBalancerSourceRanges ( apiService )
if err != nil {
return nil , fmt . Errorf ( "failed to get source ranges for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
if ! servicehelpers . IsAllowAll ( sourceRanges ) && ! lbaas . opts . ManageSecurityGroups {
return nil , fmt . Errorf ( "source range restrictions are not supported for openstack load balancers without managing security groups" )
}
affinity := apiService . Spec . SessionAffinity
var persistence * v2pools . SessionPersistence
switch affinity {
case v1 . ServiceAffinityNone :
persistence = nil
case v1 . ServiceAffinityClientIP :
persistence = & v2pools . SessionPersistence { Type : "SOURCE_IP" }
default :
return nil , fmt . Errorf ( "unsupported load balancer affinity: %v" , affinity )
}
name := lbaas . GetLoadBalancerName ( ctx , clusterName , apiService )
loadbalancer , err := getLoadbalancerByName ( lbaas . lb , name )
if err != nil {
if err != ErrNotFound {
return nil , fmt . Errorf ( "error getting loadbalancer %s: %v" , name , err )
}
klog . V ( 2 ) . Infof ( "Creating loadbalancer %s" , name )
loadbalancer , err = lbaas . createLoadBalancer ( apiService , name , internalAnnotation )
if err != nil {
// Unknown error, retry later
return nil , fmt . Errorf ( "error creating loadbalancer %s: %v" , name , err )
}
} else {
klog . V ( 2 ) . Infof ( "LoadBalancer %s already exists" , name )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
lbmethod := v2pools . LBMethod ( lbaas . opts . LBMethod )
if lbmethod == "" {
lbmethod = v2pools . LBMethodRoundRobin
}
oldListeners , err := getListenersByLoadBalancerID ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "error getting LB %s listeners: %v" , name , err )
}
for portIndex , port := range ports {
listener := getListenerForPort ( oldListeners , port )
if listener == nil {
klog . V ( 4 ) . Infof ( "Creating listener for port %d" , int ( port . Port ) )
listener , err = listeners . Create ( lbaas . lb , listeners . CreateOpts {
Name : fmt . Sprintf ( "listener_%s_%d" , name , portIndex ) ,
Protocol : listeners . Protocol ( port . Protocol ) ,
ProtocolPort : int ( port . Port ) ,
LoadbalancerID : loadbalancer . ID ,
} ) . Extract ( )
if err != nil {
// Unknown error, retry later
return nil , fmt . Errorf ( "error creating LB listener: %v" , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
klog . V ( 4 ) . Infof ( "Listener for %s port %d: %s" , string ( port . Protocol ) , int ( port . Port ) , listener . ID )
// After all ports have been processed, remaining listeners are removed as obsolete.
// Pop valid listeners.
oldListeners = popListener ( oldListeners , listener . ID )
pool , err := getPoolByListenerID ( lbaas . lb , loadbalancer . ID , listener . ID )
if err != nil && err != ErrNotFound {
// Unknown error, retry later
return nil , fmt . Errorf ( "error getting pool for listener %s: %v" , listener . ID , err )
}
if pool == nil {
klog . V ( 4 ) . Infof ( "Creating pool for listener %s" , listener . ID )
pool , err = v2pools . Create ( lbaas . lb , v2pools . CreateOpts {
Name : fmt . Sprintf ( "pool_%s_%d" , name , portIndex ) ,
Protocol : v2pools . Protocol ( port . Protocol ) ,
LBMethod : lbmethod ,
ListenerID : listener . ID ,
Persistence : persistence ,
} ) . Extract ( )
if err != nil {
// Unknown error, retry later
return nil , fmt . Errorf ( "error creating pool for listener %s: %v" , listener . ID , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
klog . V ( 4 ) . Infof ( "Pool for listener %s: %s" , listener . ID , pool . ID )
members , err := getMembersByPoolID ( lbaas . lb , pool . ID )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error getting pool members %s: %v" , pool . ID , err )
}
for _ , node := range nodes {
addr , err := nodeAddressForLB ( node )
if err != nil {
if err == ErrNotFound {
// Node failure, do not create member
klog . Warningf ( "Failed to create LB pool member for node %s: %v" , node . Name , err )
continue
} else {
return nil , fmt . Errorf ( "error getting address for node %s: %v" , node . Name , err )
}
}
if ! memberExists ( members , addr , int ( port . NodePort ) ) {
klog . V ( 4 ) . Infof ( "Creating member for pool %s" , pool . ID )
_ , err := v2pools . CreateMember ( lbaas . lb , pool . ID , v2pools . CreateMemberOpts {
Name : fmt . Sprintf ( "member_%s_%d_%s" , name , portIndex , node . Name ) ,
ProtocolPort : int ( port . NodePort ) ,
Address : addr ,
SubnetID : lbaas . opts . SubnetID ,
} ) . Extract ( )
if err != nil {
return nil , fmt . Errorf ( "error creating LB pool member for node: %s, %v" , node . Name , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
} else {
// After all members have been processed, remaining members are deleted as obsolete.
members = popMember ( members , addr , int ( port . NodePort ) )
}
klog . V ( 4 ) . Infof ( "Ensured pool %s has member for %s at %s" , pool . ID , node . Name , addr )
}
// Delete obsolete members for this pool
for _ , member := range members {
klog . V ( 4 ) . Infof ( "Deleting obsolete member %s for pool %s address %s" , member . ID , pool . ID , member . Address )
err := v2pools . DeleteMember ( lbaas . lb , pool . ID , member . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error deleting obsolete member %s for pool %s address %s: %v" , member . ID , pool . ID , member . Address , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
monitorID := pool . MonitorID
if monitorID == "" && lbaas . opts . CreateMonitor {
klog . V ( 4 ) . Infof ( "Creating monitor for pool %s" , pool . ID )
monitor , err := v2monitors . Create ( lbaas . lb , v2monitors . CreateOpts {
Name : fmt . Sprintf ( "monitor_%s_%d" , name , portIndex ) ,
PoolID : pool . ID ,
Type : string ( port . Protocol ) ,
Delay : int ( lbaas . opts . MonitorDelay . Duration . Seconds ( ) ) ,
Timeout : int ( lbaas . opts . MonitorTimeout . Duration . Seconds ( ) ) ,
MaxRetries : int ( lbaas . opts . MonitorMaxRetries ) ,
} ) . Extract ( )
if err != nil {
return nil , fmt . Errorf ( "error creating LB pool healthmonitor: %v" , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
monitorID = monitor . ID
} else if ! lbaas . opts . CreateMonitor {
klog . V ( 4 ) . Infof ( "Do not create monitor for pool %s when create-monitor is false" , pool . ID )
}
if monitorID != "" {
klog . V ( 4 ) . Infof ( "Monitor for pool %s: %s" , pool . ID , monitorID )
}
}
// All remaining listeners are obsolete, delete
for _ , listener := range oldListeners {
klog . V ( 4 ) . Infof ( "Deleting obsolete listener %s:" , listener . ID )
// get pool for listener
pool , err := getPoolByListenerID ( lbaas . lb , loadbalancer . ID , listener . ID )
if err != nil && err != ErrNotFound {
return nil , fmt . Errorf ( "error getting pool for obsolete listener %s: %v" , listener . ID , err )
}
if pool != nil {
// get and delete monitor
monitorID := pool . MonitorID
if monitorID != "" {
klog . V ( 4 ) . Infof ( "Deleting obsolete monitor %s for pool %s" , monitorID , pool . ID )
err = v2monitors . Delete ( lbaas . lb , monitorID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error deleting obsolete monitor %s for pool %s: %v" , monitorID , pool . ID , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// get and delete pool members
members , err := getMembersByPoolID ( lbaas . lb , pool . ID )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error getting members for pool %s: %v" , pool . ID , err )
}
for _ , member := range members {
klog . V ( 4 ) . Infof ( "Deleting obsolete member %s for pool %s address %s" , member . ID , pool . ID , member . Address )
err := v2pools . DeleteMember ( lbaas . lb , pool . ID , member . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error deleting obsolete member %s for pool %s address %s: %v" , member . ID , pool . ID , member . Address , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
klog . V ( 4 ) . Infof ( "Deleting obsolete pool %s for listener %s" , pool . ID , listener . ID )
// delete pool
err = v2pools . Delete ( lbaas . lb , pool . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error deleting obsolete pool %s for listener %s: %v" , pool . ID , listener . ID , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// delete listener
err = listeners . Delete ( lbaas . lb , listener . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return nil , fmt . Errorf ( "error deleteting obsolete listener: %v" , err )
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return nil , fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
klog . V ( 2 ) . Infof ( "Deleted obsolete listener: %s" , listener . ID )
}
portID := loadbalancer . VipPortID
floatIP , err := getFloatingIPByPortID ( lbaas . network , portID )
if err != nil && err != ErrNotFound {
return nil , fmt . Errorf ( "error getting floating ip for port %s: %v" , portID , err )
}
if floatIP == nil && floatingPool != "" && ! internalAnnotation {
klog . V ( 4 ) . Infof ( "Creating floating ip for loadbalancer %s port %s" , loadbalancer . ID , portID )
floatIPOpts := floatingips . CreateOpts {
FloatingNetworkID : floatingPool ,
PortID : portID ,
}
loadBalancerIP := apiService . Spec . LoadBalancerIP
if loadBalancerIP != "" {
floatIPOpts . FloatingIP = loadBalancerIP
}
floatIP , err = floatingips . Create ( lbaas . network , floatIPOpts ) . Extract ( )
if err != nil {
return nil , fmt . Errorf ( "error creating LB floatingip %+v: %v" , floatIPOpts , err )
}
}
status := & v1 . LoadBalancerStatus { }
if floatIP != nil {
status . Ingress = [ ] v1 . LoadBalancerIngress { { IP : floatIP . FloatingIP } }
} else {
status . Ingress = [ ] v1 . LoadBalancerIngress { { IP : loadbalancer . VipAddress } }
}
if lbaas . opts . ManageSecurityGroups {
err := lbaas . ensureSecurityGroup ( clusterName , apiService , nodes , loadbalancer )
if err != nil {
2019-12-12 01:27:03 +00:00
return status , fmt . Errorf ( "Error reconciling security groups for LB service %v/%v: %v" , apiService . Namespace , apiService . Name , err )
2019-09-27 21:51:53 +00:00
}
}
return status , nil
}
// ensureSecurityGroup ensures security group exist for specific loadbalancer service.
// Creating security group for specific loadbalancer service when it does not exist.
func ( lbaas * LbaasV2 ) ensureSecurityGroup ( clusterName string , apiService * v1 . Service , nodes [ ] * v1 . Node , loadbalancer * loadbalancers . LoadBalancer ) error {
// find node-security-group for service
var err error
if len ( lbaas . opts . NodeSecurityGroupIDs ) == 0 {
lbaas . opts . NodeSecurityGroupIDs , err = getNodeSecurityGroupIDForLB ( lbaas . compute , lbaas . network , nodes )
if err != nil {
return fmt . Errorf ( "failed to find node-security-group for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
}
klog . V ( 4 ) . Infof ( "find node-security-group %v for loadbalancer service %s/%s" , lbaas . opts . NodeSecurityGroupIDs , apiService . Namespace , apiService . Name )
// get service ports
ports := apiService . Spec . Ports
if len ( ports ) == 0 {
return fmt . Errorf ( "no ports provided to openstack load balancer" )
}
// get service source ranges
sourceRanges , err := servicehelpers . GetLoadBalancerSourceRanges ( apiService )
if err != nil {
return fmt . Errorf ( "failed to get source ranges for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
// ensure security group for LB
lbSecGroupName := getSecurityGroupName ( apiService )
lbSecGroupID , err := groups . IDFromName ( lbaas . network , lbSecGroupName )
if err != nil {
// If the security group of LB not exist, create it later
if isSecurityGroupNotFound ( err ) {
lbSecGroupID = ""
} else {
return fmt . Errorf ( "error occurred finding security group: %s: %v" , lbSecGroupName , err )
}
}
if len ( lbSecGroupID ) == 0 {
// create security group
lbSecGroupCreateOpts := groups . CreateOpts {
Name : getSecurityGroupName ( apiService ) ,
Description : fmt . Sprintf ( "Security Group for %s/%s Service LoadBalancer in cluster %s" , apiService . Namespace , apiService . Name , clusterName ) ,
}
lbSecGroup , err := groups . Create ( lbaas . network , lbSecGroupCreateOpts ) . Extract ( )
if err != nil {
return fmt . Errorf ( "failed to create Security Group for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
lbSecGroupID = lbSecGroup . ID
//add rule in security group
for _ , port := range ports {
for _ , sourceRange := range sourceRanges . StringSlice ( ) {
ethertype := rules . EtherType4
network , _ , err := net . ParseCIDR ( sourceRange )
if err != nil {
return fmt . Errorf ( "error parsing source range %s as a CIDR: %v" , sourceRange , err )
}
if network . To4 ( ) == nil {
ethertype = rules . EtherType6
}
lbSecGroupRuleCreateOpts := rules . CreateOpts {
Direction : rules . DirIngress ,
PortRangeMax : int ( port . Port ) ,
PortRangeMin : int ( port . Port ) ,
Protocol : toRuleProtocol ( port . Protocol ) ,
RemoteIPPrefix : sourceRange ,
SecGroupID : lbSecGroup . ID ,
EtherType : ethertype ,
}
_ , err = rules . Create ( lbaas . network , lbSecGroupRuleCreateOpts ) . Extract ( )
if err != nil {
return fmt . Errorf ( "error occurred creating rule for SecGroup %s: %v" , lbSecGroup . ID , err )
}
}
}
lbSecGroupRuleCreateOpts := rules . CreateOpts {
Direction : rules . DirIngress ,
PortRangeMax : 4 , // ICMP: Code - Values for ICMP "Destination Unreachable: Fragmentation Needed and Don't Fragment was Set"
PortRangeMin : 3 , // ICMP: Type
Protocol : rules . ProtocolICMP ,
RemoteIPPrefix : "0.0.0.0/0" , // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
SecGroupID : lbSecGroup . ID ,
EtherType : rules . EtherType4 ,
}
_ , err = rules . Create ( lbaas . network , lbSecGroupRuleCreateOpts ) . Extract ( )
if err != nil {
return fmt . Errorf ( "error occurred creating rule for SecGroup %s: %v" , lbSecGroup . ID , err )
}
lbSecGroupRuleCreateOpts = rules . CreateOpts {
Direction : rules . DirIngress ,
PortRangeMax : 0 , // ICMP: Code - Values for ICMP "Packet Too Big"
PortRangeMin : 2 , // ICMP: Type
Protocol : rules . ProtocolICMP ,
RemoteIPPrefix : "::/0" , // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
SecGroupID : lbSecGroup . ID ,
EtherType : rules . EtherType6 ,
}
_ , err = rules . Create ( lbaas . network , lbSecGroupRuleCreateOpts ) . Extract ( )
if err != nil {
return fmt . Errorf ( "error occurred creating rule for SecGroup %s: %v" , lbSecGroup . ID , err )
}
// get security groups of port
portID := loadbalancer . VipPortID
port , err := getPortByID ( lbaas . network , portID )
if err != nil {
return err
}
// ensure the vip port has the security groups
found := false
for _ , portSecurityGroups := range port . SecurityGroups {
if portSecurityGroups == lbSecGroup . ID {
found = true
break
}
}
// update loadbalancer vip port
if ! found {
port . SecurityGroups = append ( port . SecurityGroups , lbSecGroup . ID )
updateOpts := neutronports . UpdateOpts { SecurityGroups : & port . SecurityGroups }
res := neutronports . Update ( lbaas . network , portID , updateOpts )
if res . Err != nil {
msg := fmt . Sprintf ( "Error occurred updating port %s for loadbalancer service %s/%s: %v" , portID , apiService . Namespace , apiService . Name , res . Err )
return fmt . Errorf ( msg )
}
}
}
// ensure rules for every node security group
for _ , port := range ports {
for _ , nodeSecurityGroupID := range lbaas . opts . NodeSecurityGroupIDs {
opts := rules . ListOpts {
Direction : string ( rules . DirIngress ) ,
SecGroupID : nodeSecurityGroupID ,
RemoteGroupID : lbSecGroupID ,
PortRangeMax : int ( port . NodePort ) ,
PortRangeMin : int ( port . NodePort ) ,
Protocol : string ( port . Protocol ) ,
}
secGroupRules , err := getSecurityGroupRules ( lbaas . network , opts )
if err != nil && ! isNotFound ( err ) {
msg := fmt . Sprintf ( "Error finding rules for remote group id %s in security group id %s: %v" , lbSecGroupID , nodeSecurityGroupID , err )
return fmt . Errorf ( msg )
}
if len ( secGroupRules ) != 0 {
// Do not add rule when find rules for remote group in the Node Security Group
continue
}
// Add the rules in the Node Security Group
err = createNodeSecurityGroup ( lbaas . network , nodeSecurityGroupID , int ( port . NodePort ) , port . Protocol , lbSecGroupID )
if err != nil {
return fmt . Errorf ( "error occurred creating security group for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
}
}
return nil
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func ( lbaas * LbaasV2 ) UpdateLoadBalancer ( ctx context . Context , clusterName string , service * v1 . Service , nodes [ ] * v1 . Node ) error {
loadBalancerName := lbaas . GetLoadBalancerName ( ctx , clusterName , service )
klog . V ( 4 ) . Infof ( "UpdateLoadBalancer(%v, %v, %v)" , clusterName , loadBalancerName , nodes )
lbaas . opts . SubnetID = getStringFromServiceAnnotation ( service , ServiceAnnotationLoadBalancerSubnetID , lbaas . opts . SubnetID )
if len ( lbaas . opts . SubnetID ) == 0 && len ( nodes ) > 0 {
// Get SubnetID automatically.
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
subnetID , err := getSubnetIDForLB ( lbaas . compute , * nodes [ 0 ] )
if err != nil {
klog . Warningf ( "Failed to find subnet-id for loadbalancer service %s/%s: %v" , service . Namespace , service . Name , err )
return fmt . Errorf ( "no subnet-id for service %s/%s : subnet-id not set in cloud provider config, " +
"and failed to find subnet-id from OpenStack: %v" , service . Namespace , service . Name , err )
}
lbaas . opts . SubnetID = subnetID
}
ports := service . Spec . Ports
if len ( ports ) == 0 {
return fmt . Errorf ( "no ports provided to openstack load balancer" )
}
loadbalancer , err := getLoadbalancerByName ( lbaas . lb , loadBalancerName )
if err != nil {
return err
}
if loadbalancer == nil {
return fmt . Errorf ( "loadbalancer %s does not exist" , loadBalancerName )
}
// Get all listeners for this loadbalancer, by "port key".
type portKey struct {
Protocol listeners . Protocol
Port int
}
var listenerIDs [ ] string
lbListeners := make ( map [ portKey ] listeners . Listener )
allListeners , err := getListenersByLoadBalancerID ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "error getting listeners for LB %s: %v" , loadBalancerName , err )
}
for _ , l := range allListeners {
key := portKey { Protocol : listeners . Protocol ( l . Protocol ) , Port : l . ProtocolPort }
lbListeners [ key ] = l
listenerIDs = append ( listenerIDs , l . ID )
}
// Get all pools for this loadbalancer, by listener ID.
lbPools := make ( map [ string ] v2pools . Pool )
for _ , listenerID := range listenerIDs {
pool , err := getPoolByListenerID ( lbaas . lb , loadbalancer . ID , listenerID )
if err != nil {
return fmt . Errorf ( "error getting pool for listener %s: %v" , listenerID , err )
}
lbPools [ listenerID ] = * pool
}
// Compose Set of member (addresses) that _should_ exist
addrs := make ( map [ string ] * v1 . Node )
for _ , node := range nodes {
addr , err := nodeAddressForLB ( node )
if err != nil {
return err
}
addrs [ addr ] = node
}
// Check for adding/removing members associated with each port
for portIndex , port := range ports {
// Get listener associated with this port
listener , ok := lbListeners [ portKey {
Protocol : toListenersProtocol ( port . Protocol ) ,
Port : int ( port . Port ) ,
} ]
if ! ok {
return fmt . Errorf ( "loadbalancer %s does not contain required listener for port %d and protocol %s" , loadBalancerName , port . Port , port . Protocol )
}
// Get pool associated with this listener
pool , ok := lbPools [ listener . ID ]
if ! ok {
return fmt . Errorf ( "loadbalancer %s does not contain required pool for listener %s" , loadBalancerName , listener . ID )
}
// Find existing pool members (by address) for this port
getMembers , err := getMembersByPoolID ( lbaas . lb , pool . ID )
if err != nil {
return fmt . Errorf ( "error getting pool members %s: %v" , pool . ID , err )
}
members := make ( map [ string ] v2pools . Member )
for _ , member := range getMembers {
members [ member . Address ] = member
}
// Add any new members for this port
for addr , node := range addrs {
if _ , ok := members [ addr ] ; ok && members [ addr ] . ProtocolPort == int ( port . NodePort ) {
// Already exists, do not create member
continue
}
_ , err := v2pools . CreateMember ( lbaas . lb , pool . ID , v2pools . CreateMemberOpts {
Name : fmt . Sprintf ( "member_%s_%d_%s" , loadbalancer . Name , portIndex , node . Name ) ,
Address : addr ,
ProtocolPort : int ( port . NodePort ) ,
SubnetID : lbaas . opts . SubnetID ,
} ) . Extract ( )
if err != nil {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// Remove any old members for this port
for _ , member := range members {
if _ , ok := addrs [ member . Address ] ; ok && member . ProtocolPort == int ( port . NodePort ) {
// Still present, do not delete member
continue
}
err = v2pools . DeleteMember ( lbaas . lb , pool . ID , member . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
}
if lbaas . opts . ManageSecurityGroups {
err := lbaas . updateSecurityGroup ( clusterName , service , nodes , loadbalancer )
if err != nil {
return fmt . Errorf ( "failed to update Security Group for loadbalancer service %s/%s: %v" , service . Namespace , service . Name , err )
}
}
return nil
}
// updateSecurityGroup updating security group for specific loadbalancer service.
func ( lbaas * LbaasV2 ) updateSecurityGroup ( clusterName string , apiService * v1 . Service , nodes [ ] * v1 . Node , loadbalancer * loadbalancers . LoadBalancer ) error {
originalNodeSecurityGroupIDs := lbaas . opts . NodeSecurityGroupIDs
var err error
lbaas . opts . NodeSecurityGroupIDs , err = getNodeSecurityGroupIDForLB ( lbaas . compute , lbaas . network , nodes )
if err != nil {
return fmt . Errorf ( "failed to find node-security-group for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
klog . V ( 4 ) . Infof ( "find node-security-group %v for loadbalancer service %s/%s" , lbaas . opts . NodeSecurityGroupIDs , apiService . Namespace , apiService . Name )
original := sets . NewString ( originalNodeSecurityGroupIDs ... )
current := sets . NewString ( lbaas . opts . NodeSecurityGroupIDs ... )
removals := original . Difference ( current )
// Generate Name
lbSecGroupName := getSecurityGroupName ( apiService )
lbSecGroupID , err := groups . IDFromName ( lbaas . network , lbSecGroupName )
if err != nil {
return fmt . Errorf ( "error occurred finding security group: %s: %v" , lbSecGroupName , err )
}
ports := apiService . Spec . Ports
if len ( ports ) == 0 {
return fmt . Errorf ( "no ports provided to openstack load balancer" )
}
for _ , port := range ports {
for removal := range removals {
// Delete the rules in the Node Security Group
opts := rules . ListOpts {
Direction : string ( rules . DirIngress ) ,
SecGroupID : removal ,
RemoteGroupID : lbSecGroupID ,
PortRangeMax : int ( port . NodePort ) ,
PortRangeMin : int ( port . NodePort ) ,
Protocol : string ( port . Protocol ) ,
}
secGroupRules , err := getSecurityGroupRules ( lbaas . network , opts )
if err != nil && ! isNotFound ( err ) {
return fmt . Errorf ( "error finding rules for remote group id %s in security group id %s: %v" , lbSecGroupID , removal , err )
}
for _ , rule := range secGroupRules {
res := rules . Delete ( lbaas . network , rule . ID )
if res . Err != nil && ! isNotFound ( res . Err ) {
return fmt . Errorf ( "error occurred deleting security group rule: %s: %v" , rule . ID , res . Err )
}
}
}
for _ , nodeSecurityGroupID := range lbaas . opts . NodeSecurityGroupIDs {
opts := rules . ListOpts {
Direction : string ( rules . DirIngress ) ,
SecGroupID : nodeSecurityGroupID ,
RemoteGroupID : lbSecGroupID ,
PortRangeMax : int ( port . NodePort ) ,
PortRangeMin : int ( port . NodePort ) ,
Protocol : string ( port . Protocol ) ,
}
secGroupRules , err := getSecurityGroupRules ( lbaas . network , opts )
if err != nil && ! isNotFound ( err ) {
return fmt . Errorf ( "error finding rules for remote group id %s in security group id %s: %v" , lbSecGroupID , nodeSecurityGroupID , err )
}
if len ( secGroupRules ) != 0 {
// Do not add rule when find rules for remote group in the Node Security Group
continue
}
// Add the rules in the Node Security Group
err = createNodeSecurityGroup ( lbaas . network , nodeSecurityGroupID , int ( port . NodePort ) , port . Protocol , lbSecGroupID )
if err != nil {
return fmt . Errorf ( "error occurred creating security group for loadbalancer service %s/%s: %v" , apiService . Namespace , apiService . Name , err )
}
}
}
return nil
}
// EnsureLoadBalancerDeleted deletes the specified load balancer
func ( lbaas * LbaasV2 ) EnsureLoadBalancerDeleted ( ctx context . Context , clusterName string , service * v1 . Service ) error {
loadBalancerName := lbaas . GetLoadBalancerName ( ctx , clusterName , service )
klog . V ( 4 ) . Infof ( "EnsureLoadBalancerDeleted(%v, %v)" , clusterName , loadBalancerName )
loadbalancer , err := getLoadbalancerByName ( lbaas . lb , loadBalancerName )
if err != nil && err != ErrNotFound {
return err
}
if loadbalancer == nil {
return nil
}
if loadbalancer . VipPortID != "" {
portID := loadbalancer . VipPortID
floatingIP , err := getFloatingIPByPortID ( lbaas . network , portID )
if err != nil && err != ErrNotFound {
return err
}
if floatingIP != nil {
err = floatingips . Delete ( lbaas . network , floatingIP . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
}
}
// get all listeners associated with this loadbalancer
listenerList , err := getListenersByLoadBalancerID ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "error getting LB %s listeners: %v" , loadbalancer . ID , err )
}
// get all pools (and health monitors) associated with this loadbalancer
var poolIDs [ ] string
var monitorIDs [ ] string
for _ , listener := range listenerList {
pool , err := getPoolByListenerID ( lbaas . lb , loadbalancer . ID , listener . ID )
if err != nil && err != ErrNotFound {
return fmt . Errorf ( "error getting pool for listener %s: %v" , listener . ID , err )
}
if pool != nil {
poolIDs = append ( poolIDs , pool . ID )
// If create-monitor of cloud-config is false, pool has not monitor.
if pool . MonitorID != "" {
monitorIDs = append ( monitorIDs , pool . MonitorID )
}
}
}
// delete all monitors
for _ , monitorID := range monitorIDs {
err := v2monitors . Delete ( lbaas . lb , monitorID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// delete all members and pools
for _ , poolID := range poolIDs {
// get members for current pool
membersList , err := getMembersByPoolID ( lbaas . lb , poolID )
if err != nil && ! isNotFound ( err ) {
return fmt . Errorf ( "error getting pool members %s: %v" , poolID , err )
}
// delete all members for this pool
for _ , member := range membersList {
err := v2pools . DeleteMember ( lbaas . lb , poolID , member . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// delete pool
err = v2pools . Delete ( lbaas . lb , poolID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// delete all listeners
for _ , listener := range listenerList {
err := listeners . Delete ( lbaas . lb , listener . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
provisioningStatus , err := waitLoadbalancerActiveProvisioningStatus ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to loadbalance ACTIVE provisioning status %v: %v" , provisioningStatus , err )
}
}
// delete loadbalancer
err = loadbalancers . Delete ( lbaas . lb , loadbalancer . ID ) . ExtractErr ( )
if err != nil && ! isNotFound ( err ) {
return err
}
err = waitLoadbalancerDeleted ( lbaas . lb , loadbalancer . ID )
if err != nil {
return fmt . Errorf ( "failed to delete loadbalancer: %v" , err )
}
// Delete the Security Group
if lbaas . opts . ManageSecurityGroups {
err := lbaas . EnsureSecurityGroupDeleted ( clusterName , service )
if err != nil {
return fmt . Errorf ( "failed to delete Security Group for loadbalancer service %s/%s: %v" , service . Namespace , service . Name , err )
}
}
return nil
}
// EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
func ( lbaas * LbaasV2 ) EnsureSecurityGroupDeleted ( clusterName string , service * v1 . Service ) error {
// Generate Name
lbSecGroupName := getSecurityGroupName ( service )
lbSecGroupID , err := groups . IDFromName ( lbaas . network , lbSecGroupName )
if err != nil {
if isSecurityGroupNotFound ( err ) {
// It is OK when the security group has been deleted by others.
return nil
}
return fmt . Errorf ( "error occurred finding security group: %s: %v" , lbSecGroupName , err )
}
lbSecGroup := groups . Delete ( lbaas . network , lbSecGroupID )
if lbSecGroup . Err != nil && ! isNotFound ( lbSecGroup . Err ) {
return lbSecGroup . Err
}
if len ( lbaas . opts . NodeSecurityGroupIDs ) == 0 {
// Just happen when nodes have not Security Group, or should not happen
// UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
// And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
klog . Warningf ( "Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s" ,
service . Namespace , service . Name )
} else {
// Delete the rules in the Node Security Group
for _ , nodeSecurityGroupID := range lbaas . opts . NodeSecurityGroupIDs {
opts := rules . ListOpts {
SecGroupID : nodeSecurityGroupID ,
RemoteGroupID : lbSecGroupID ,
}
secGroupRules , err := getSecurityGroupRules ( lbaas . network , opts )
if err != nil && ! isNotFound ( err ) {
msg := fmt . Sprintf ( "Error finding rules for remote group id %s in security group id %s: %v" , lbSecGroupID , nodeSecurityGroupID , err )
return fmt . Errorf ( msg )
}
for _ , rule := range secGroupRules {
res := rules . Delete ( lbaas . network , rule . ID )
if res . Err != nil && ! isNotFound ( res . Err ) {
return fmt . Errorf ( "error occurred deleting security group rule: %s: %v" , rule . ID , res . Err )
}
}
}
}
return nil
}