mirror of https://github.com/k3s-io/k3s
1578 lines
55 KiB
Go
1578 lines
55 KiB
Go
// +build !providerless
|
|
|
|
/*
|
|
Copyright 2016 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package openstack
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"reflect"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/gophercloud/gophercloud"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/external"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/layer3/floatingips"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/listeners"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/loadbalancers"
|
|
v2monitors "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/monitors"
|
|
v2pools "github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/lbaas_v2/pools"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/groups"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/extensions/security/rules"
|
|
"github.com/gophercloud/gophercloud/openstack/networking/v2/networks"
|
|
neutronports "github.com/gophercloud/gophercloud/openstack/networking/v2/ports"
|
|
"github.com/gophercloud/gophercloud/pagination"
|
|
"k8s.io/klog"
|
|
|
|
v1 "k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
cloudprovider "k8s.io/cloud-provider"
|
|
servicehelpers "k8s.io/cloud-provider/service/helpers"
|
|
)
|
|
|
|
// Note: when creating a new Loadbalancer (VM), it can take some time before it is ready for use,
|
|
// this timeout is used for waiting until the Loadbalancer provisioning status goes to ACTIVE state.
|
|
const (
|
|
// loadbalancerActive* is configuration of exponential backoff for
|
|
// going into ACTIVE loadbalancer provisioning status. Starting with 1
|
|
// seconds, multiplying by 1.2 with each step and taking 19 steps at maximum
|
|
// it will time out after 128s, which roughly corresponds to 120s
|
|
loadbalancerActiveInitDelay = 1 * time.Second
|
|
loadbalancerActiveFactor = 1.2
|
|
loadbalancerActiveSteps = 19
|
|
|
|
// loadbalancerDelete* is configuration of exponential backoff for
|
|
// waiting for delete operation to complete. Starting with 1
|
|
// seconds, multiplying by 1.2 with each step and taking 13 steps at maximum
|
|
// it will time out after 32s, which roughly corresponds to 30s
|
|
loadbalancerDeleteInitDelay = 1 * time.Second
|
|
loadbalancerDeleteFactor = 1.2
|
|
loadbalancerDeleteSteps = 13
|
|
|
|
activeStatus = "ACTIVE"
|
|
errorStatus = "ERROR"
|
|
|
|
ServiceAnnotationLoadBalancerFloatingNetworkID = "loadbalancer.openstack.org/floating-network-id"
|
|
ServiceAnnotationLoadBalancerSubnetID = "loadbalancer.openstack.org/subnet-id"
|
|
|
|
// ServiceAnnotationLoadBalancerInternal is the annotation used on the service
|
|
// to indicate that we want an internal loadbalancer service.
|
|
// If the value of ServiceAnnotationLoadBalancerInternal is false, it indicates that we want an external loadbalancer service. Default to false.
|
|
ServiceAnnotationLoadBalancerInternal = "service.beta.kubernetes.io/openstack-internal-load-balancer"
|
|
)
|
|
|
|
var _ cloudprovider.LoadBalancer = (*LbaasV2)(nil)
|
|
|
|
// LbaasV2 is a LoadBalancer implementation for Neutron LBaaS v2 API
|
|
type LbaasV2 struct {
|
|
LoadBalancer
|
|
}
|
|
|
|
func networkExtensions(client *gophercloud.ServiceClient) (map[string]bool, error) {
|
|
seen := make(map[string]bool)
|
|
|
|
pager := extensions.List(client)
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
exts, err := extensions.ExtractExtensions(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, ext := range exts {
|
|
seen[ext.Alias] = true
|
|
}
|
|
return true, nil
|
|
})
|
|
|
|
return seen, err
|
|
}
|
|
|
|
func getFloatingIPByPortID(client *gophercloud.ServiceClient, portID string) (*floatingips.FloatingIP, error) {
|
|
opts := floatingips.ListOpts{
|
|
PortID: portID,
|
|
}
|
|
pager := floatingips.List(client, opts)
|
|
|
|
floatingIPList := make([]floatingips.FloatingIP, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
f, err := floatingips.ExtractFloatingIPs(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
floatingIPList = append(floatingIPList, f...)
|
|
if len(floatingIPList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(floatingIPList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(floatingIPList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &floatingIPList[0], nil
|
|
}
|
|
|
|
func getLoadbalancerByName(client *gophercloud.ServiceClient, name string) (*loadbalancers.LoadBalancer, error) {
|
|
opts := loadbalancers.ListOpts{
|
|
Name: name,
|
|
}
|
|
pager := loadbalancers.List(client, opts)
|
|
|
|
loadbalancerList := make([]loadbalancers.LoadBalancer, 0, 1)
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
v, err := loadbalancers.ExtractLoadBalancers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
loadbalancerList = append(loadbalancerList, v...)
|
|
if len(loadbalancerList) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(loadbalancerList) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(loadbalancerList) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &loadbalancerList[0], nil
|
|
}
|
|
|
|
func getListenersByLoadBalancerID(client *gophercloud.ServiceClient, id string) ([]listeners.Listener, error) {
|
|
var existingListeners []listeners.Listener
|
|
err := listeners.List(client, listeners.ListOpts{LoadbalancerID: id}).EachPage(func(page pagination.Page) (bool, error) {
|
|
listenerList, err := listeners.ExtractListeners(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, l := range listenerList {
|
|
for _, lb := range l.Loadbalancers {
|
|
if lb.ID == id {
|
|
existingListeners = append(existingListeners, l)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return existingListeners, nil
|
|
}
|
|
|
|
// get listener for a port or nil if does not exist
|
|
func getListenerForPort(existingListeners []listeners.Listener, port v1.ServicePort) *listeners.Listener {
|
|
for _, l := range existingListeners {
|
|
if listeners.Protocol(l.Protocol) == toListenersProtocol(port.Protocol) && l.ProtocolPort == int(port.Port) {
|
|
return &l
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Get pool for a listener. A listener always has exactly one pool.
|
|
func getPoolByListenerID(client *gophercloud.ServiceClient, loadbalancerID string, listenerID string) (*v2pools.Pool, error) {
|
|
listenerPools := make([]v2pools.Pool, 0, 1)
|
|
err := v2pools.List(client, v2pools.ListOpts{LoadbalancerID: loadbalancerID}).EachPage(func(page pagination.Page) (bool, error) {
|
|
poolsList, err := v2pools.ExtractPools(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
for _, p := range poolsList {
|
|
for _, l := range p.Listeners {
|
|
if l.ID == listenerID {
|
|
listenerPools = append(listenerPools, p)
|
|
}
|
|
}
|
|
}
|
|
if len(listenerPools) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return nil, ErrNotFound
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
if len(listenerPools) == 0 {
|
|
return nil, ErrNotFound
|
|
} else if len(listenerPools) > 1 {
|
|
return nil, ErrMultipleResults
|
|
}
|
|
|
|
return &listenerPools[0], nil
|
|
}
|
|
|
|
func getMembersByPoolID(client *gophercloud.ServiceClient, id string) ([]v2pools.Member, error) {
|
|
var members []v2pools.Member
|
|
err := v2pools.ListMembers(client, id, v2pools.ListMembersOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
|
membersList, err := v2pools.ExtractMembers(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
members = append(members, membersList...)
|
|
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return members, nil
|
|
}
|
|
|
|
// Check if a member exists for node
|
|
func memberExists(members []v2pools.Member, addr string, port int) bool {
|
|
for _, member := range members {
|
|
if member.Address == addr && member.ProtocolPort == port {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
func popListener(existingListeners []listeners.Listener, id string) []listeners.Listener {
|
|
for i, existingListener := range existingListeners {
|
|
if existingListener.ID == id {
|
|
existingListeners[i] = existingListeners[len(existingListeners)-1]
|
|
existingListeners = existingListeners[:len(existingListeners)-1]
|
|
break
|
|
}
|
|
}
|
|
|
|
return existingListeners
|
|
}
|
|
|
|
func popMember(members []v2pools.Member, addr string, port int) []v2pools.Member {
|
|
for i, member := range members {
|
|
if member.Address == addr && member.ProtocolPort == port {
|
|
members[i] = members[len(members)-1]
|
|
members = members[:len(members)-1]
|
|
}
|
|
}
|
|
|
|
return members
|
|
}
|
|
|
|
func getSecurityGroupName(service *v1.Service) string {
|
|
securityGroupName := fmt.Sprintf("lb-sg-%s-%s-%s", service.UID, service.Namespace, service.Name)
|
|
//OpenStack requires that the name of a security group is shorter than 255 bytes.
|
|
if len(securityGroupName) > 255 {
|
|
securityGroupName = securityGroupName[:255]
|
|
}
|
|
|
|
return securityGroupName
|
|
}
|
|
|
|
func getSecurityGroupRules(client *gophercloud.ServiceClient, opts rules.ListOpts) ([]rules.SecGroupRule, error) {
|
|
|
|
pager := rules.List(client, opts)
|
|
|
|
var securityRules []rules.SecGroupRule
|
|
|
|
err := pager.EachPage(func(page pagination.Page) (bool, error) {
|
|
ruleList, err := rules.ExtractRules(page)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
securityRules = append(securityRules, ruleList...)
|
|
return true, nil
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return securityRules, nil
|
|
}
|
|
|
|
func waitLoadbalancerActiveProvisioningStatus(client *gophercloud.ServiceClient, loadbalancerID string) (string, error) {
|
|
backoff := wait.Backoff{
|
|
Duration: loadbalancerActiveInitDelay,
|
|
Factor: loadbalancerActiveFactor,
|
|
Steps: loadbalancerActiveSteps,
|
|
}
|
|
|
|
var provisioningStatus string
|
|
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
|
loadbalancer, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
provisioningStatus = loadbalancer.ProvisioningStatus
|
|
if loadbalancer.ProvisioningStatus == activeStatus {
|
|
return true, nil
|
|
} else if loadbalancer.ProvisioningStatus == errorStatus {
|
|
return true, fmt.Errorf("loadbalancer has gone into ERROR state")
|
|
} else {
|
|
return false, nil
|
|
}
|
|
|
|
})
|
|
|
|
if err == wait.ErrWaitTimeout {
|
|
err = fmt.Errorf("loadbalancer failed to go into ACTIVE provisioning status within alloted time")
|
|
}
|
|
return provisioningStatus, err
|
|
}
|
|
|
|
func waitLoadbalancerDeleted(client *gophercloud.ServiceClient, loadbalancerID string) error {
|
|
backoff := wait.Backoff{
|
|
Duration: loadbalancerDeleteInitDelay,
|
|
Factor: loadbalancerDeleteFactor,
|
|
Steps: loadbalancerDeleteSteps,
|
|
}
|
|
err := wait.ExponentialBackoff(backoff, func() (bool, error) {
|
|
_, err := loadbalancers.Get(client, loadbalancerID).Extract()
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return true, nil
|
|
}
|
|
return false, err
|
|
}
|
|
return false, nil
|
|
})
|
|
|
|
if err == wait.ErrWaitTimeout {
|
|
err = fmt.Errorf("loadbalancer failed to delete within the alloted time")
|
|
}
|
|
|
|
return err
|
|
}
|
|
|
|
func toRuleProtocol(protocol v1.Protocol) rules.RuleProtocol {
|
|
switch protocol {
|
|
case v1.ProtocolTCP:
|
|
return rules.ProtocolTCP
|
|
case v1.ProtocolUDP:
|
|
return rules.ProtocolUDP
|
|
default:
|
|
return rules.RuleProtocol(strings.ToLower(string(protocol)))
|
|
}
|
|
}
|
|
|
|
func toListenersProtocol(protocol v1.Protocol) listeners.Protocol {
|
|
switch protocol {
|
|
case v1.ProtocolTCP:
|
|
return listeners.ProtocolTCP
|
|
default:
|
|
return listeners.Protocol(string(protocol))
|
|
}
|
|
}
|
|
|
|
func createNodeSecurityGroup(client *gophercloud.ServiceClient, nodeSecurityGroupID string, port int, protocol v1.Protocol, lbSecGroup string) error {
|
|
v4NodeSecGroupRuleCreateOpts := rules.CreateOpts{
|
|
Direction: rules.DirIngress,
|
|
PortRangeMax: port,
|
|
PortRangeMin: port,
|
|
Protocol: toRuleProtocol(protocol),
|
|
RemoteGroupID: lbSecGroup,
|
|
SecGroupID: nodeSecurityGroupID,
|
|
EtherType: rules.EtherType4,
|
|
}
|
|
|
|
v6NodeSecGroupRuleCreateOpts := rules.CreateOpts{
|
|
Direction: rules.DirIngress,
|
|
PortRangeMax: port,
|
|
PortRangeMin: port,
|
|
Protocol: toRuleProtocol(protocol),
|
|
RemoteGroupID: lbSecGroup,
|
|
SecGroupID: nodeSecurityGroupID,
|
|
EtherType: rules.EtherType6,
|
|
}
|
|
|
|
_, err := rules.Create(client, v4NodeSecGroupRuleCreateOpts).Extract()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
_, err = rules.Create(client, v6NodeSecGroupRuleCreateOpts).Extract()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (lbaas *LbaasV2) createLoadBalancer(service *v1.Service, name string, internalAnnotation bool) (*loadbalancers.LoadBalancer, error) {
|
|
createOpts := loadbalancers.CreateOpts{
|
|
Name: name,
|
|
Description: fmt.Sprintf("Kubernetes external service %s", name),
|
|
VipSubnetID: lbaas.opts.SubnetID,
|
|
Provider: lbaas.opts.LBProvider,
|
|
}
|
|
|
|
loadBalancerIP := service.Spec.LoadBalancerIP
|
|
if loadBalancerIP != "" && internalAnnotation {
|
|
createOpts.VipAddress = loadBalancerIP
|
|
}
|
|
|
|
loadbalancer, err := loadbalancers.Create(lbaas.lb, createOpts).Extract()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating loadbalancer %v: %v", createOpts, err)
|
|
}
|
|
return loadbalancer, nil
|
|
}
|
|
|
|
// GetLoadBalancer returns whether the specified load balancer exists and its status
|
|
func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
|
|
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
|
|
if err == ErrNotFound {
|
|
return nil, false, nil
|
|
}
|
|
if loadbalancer == nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
status := &v1.LoadBalancerStatus{}
|
|
|
|
portID := loadbalancer.VipPortID
|
|
if portID != "" {
|
|
floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
|
|
if err != nil && err != ErrNotFound {
|
|
return nil, false, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
|
|
}
|
|
|
|
if floatIP != nil {
|
|
status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
|
|
}
|
|
} else {
|
|
status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
|
}
|
|
|
|
return status, true, err
|
|
}
|
|
|
|
// GetLoadBalancerName is an implementation of LoadBalancer.GetLoadBalancerName.
|
|
func (lbaas *LbaasV2) GetLoadBalancerName(ctx context.Context, clusterName string, service *v1.Service) string {
|
|
// TODO: replace DefaultLoadBalancerName to generate more meaningful loadbalancer names.
|
|
return cloudprovider.DefaultLoadBalancerName(service)
|
|
}
|
|
|
|
// The LB needs to be configured with instance addresses on the same
|
|
// subnet as the LB (aka opts.SubnetID). Currently we're just
|
|
// guessing that the node's InternalIP is the right address.
|
|
// In case no InternalIP can be found, ExternalIP is tried.
|
|
// If neither InternalIP nor ExternalIP can be found an error is
|
|
// returned.
|
|
func nodeAddressForLB(node *v1.Node) (string, error) {
|
|
addrs := node.Status.Addresses
|
|
if len(addrs) == 0 {
|
|
return "", ErrNoAddressFound
|
|
}
|
|
|
|
allowedAddrTypes := []v1.NodeAddressType{v1.NodeInternalIP, v1.NodeExternalIP}
|
|
|
|
for _, allowedAddrType := range allowedAddrTypes {
|
|
for _, addr := range addrs {
|
|
if addr.Type == allowedAddrType {
|
|
return addr.Address, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return "", ErrNoAddressFound
|
|
}
|
|
|
|
//getStringFromServiceAnnotation searches a given v1.Service for a specific annotationKey and either returns the annotation's value or a specified defaultSetting
|
|
func getStringFromServiceAnnotation(service *v1.Service, annotationKey string, defaultSetting string) string {
|
|
klog.V(4).Infof("getStringFromServiceAnnotation(%v, %v, %v)", service, annotationKey, defaultSetting)
|
|
if annotationValue, ok := service.Annotations[annotationKey]; ok {
|
|
//if there is an annotation for this setting, set the "setting" var to it
|
|
// annotationValue can be empty, it is working as designed
|
|
// it makes possible for instance provisioning loadbalancer without floatingip
|
|
klog.V(4).Infof("Found a Service Annotation: %v = %v", annotationKey, annotationValue)
|
|
return annotationValue
|
|
}
|
|
//if there is no annotation, set "settings" var to the value from cloud config
|
|
klog.V(4).Infof("Could not find a Service Annotation; falling back on cloud-config setting: %v = %v", annotationKey, defaultSetting)
|
|
return defaultSetting
|
|
}
|
|
|
|
// getSubnetIDForLB returns subnet-id for a specific node
|
|
func getSubnetIDForLB(compute *gophercloud.ServiceClient, node v1.Node) (string, error) {
|
|
ipAddress, err := nodeAddressForLB(&node)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
instanceID := node.Spec.ProviderID
|
|
if ind := strings.LastIndex(instanceID, "/"); ind >= 0 {
|
|
instanceID = instanceID[(ind + 1):]
|
|
}
|
|
|
|
interfaces, err := getAttachedInterfacesByID(compute, instanceID)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
for _, intf := range interfaces {
|
|
for _, fixedIP := range intf.FixedIPs {
|
|
if fixedIP.IPAddress == ipAddress {
|
|
return fixedIP.SubnetID, nil
|
|
}
|
|
}
|
|
}
|
|
|
|
return "", ErrNotFound
|
|
}
|
|
|
|
// getNodeSecurityGroupIDForLB lists node-security-groups for specific nodes
|
|
func getNodeSecurityGroupIDForLB(compute *gophercloud.ServiceClient, network *gophercloud.ServiceClient, nodes []*v1.Node) ([]string, error) {
|
|
secGroupNames := sets.NewString()
|
|
|
|
for _, node := range nodes {
|
|
nodeName := types.NodeName(node.Name)
|
|
srv, err := getServerByName(compute, nodeName)
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
|
|
// use the first node-security-groups
|
|
// case 0: node1:SG1 node2:SG1 return SG1
|
|
// case 1: node1:SG1 node2:SG2 return SG1,SG2
|
|
// case 2: node1:SG1,SG2 node2:SG3,SG4 return SG1,SG3
|
|
// case 3: node1:SG1,SG2 node2:SG2,SG3 return SG1,SG2
|
|
secGroupNames.Insert(srv.SecurityGroups[0]["name"].(string))
|
|
}
|
|
|
|
secGroupIDs := make([]string, secGroupNames.Len())
|
|
for i, name := range secGroupNames.List() {
|
|
secGroupID, err := groups.IDFromName(network, name)
|
|
if err != nil {
|
|
return []string{}, err
|
|
}
|
|
secGroupIDs[i] = secGroupID
|
|
}
|
|
|
|
return secGroupIDs, nil
|
|
}
|
|
|
|
// isSecurityGroupNotFound return true while 'err' is object of gophercloud.ErrResourceNotFound
|
|
func isSecurityGroupNotFound(err error) bool {
|
|
errType := reflect.TypeOf(err).String()
|
|
errTypeSlice := strings.Split(errType, ".")
|
|
errTypeValue := ""
|
|
if len(errTypeSlice) != 0 {
|
|
errTypeValue = errTypeSlice[len(errTypeSlice)-1]
|
|
}
|
|
if errTypeValue == "ErrResourceNotFound" {
|
|
return true
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getFloatingNetworkIDForLB returns a floating-network-id for cluster.
|
|
func getFloatingNetworkIDForLB(client *gophercloud.ServiceClient) (string, error) {
|
|
var floatingNetworkIds []string
|
|
|
|
type NetworkWithExternalExt struct {
|
|
networks.Network
|
|
external.NetworkExternalExt
|
|
}
|
|
|
|
err := networks.List(client, networks.ListOpts{}).EachPage(func(page pagination.Page) (bool, error) {
|
|
var externalNetwork []NetworkWithExternalExt
|
|
err := networks.ExtractNetworksInto(page, &externalNetwork)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
for _, externalNet := range externalNetwork {
|
|
if externalNet.External {
|
|
floatingNetworkIds = append(floatingNetworkIds, externalNet.ID)
|
|
}
|
|
}
|
|
|
|
if len(floatingNetworkIds) > 1 {
|
|
return false, ErrMultipleResults
|
|
}
|
|
return true, nil
|
|
})
|
|
if err != nil {
|
|
if isNotFound(err) {
|
|
return "", ErrNotFound
|
|
}
|
|
|
|
if err == ErrMultipleResults {
|
|
klog.V(4).Infof("find multiple external networks, pick the first one when there are no explicit configuration.")
|
|
return floatingNetworkIds[0], nil
|
|
}
|
|
return "", err
|
|
}
|
|
|
|
if len(floatingNetworkIds) == 0 {
|
|
return "", ErrNotFound
|
|
}
|
|
|
|
return floatingNetworkIds[0], nil
|
|
}
|
|
|
|
// TODO: This code currently ignores 'region' and always creates a
|
|
// loadbalancer in only the current OpenStack region. We should take
|
|
// a list of regions (from config) and query/create loadbalancers in
|
|
// each region.
|
|
|
|
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
|
|
func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
|
|
klog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
|
|
|
|
if len(nodes) == 0 {
|
|
return nil, fmt.Errorf("there are no available nodes for LoadBalancer service %s/%s", apiService.Namespace, apiService.Name)
|
|
}
|
|
|
|
lbaas.opts.SubnetID = getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
|
|
if len(lbaas.opts.SubnetID) == 0 {
|
|
// Get SubnetID automatically.
|
|
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
|
|
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
|
|
if err != nil {
|
|
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
return nil, fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
|
|
"and failed to find subnet-id from OpenStack: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
lbaas.opts.SubnetID = subnetID
|
|
}
|
|
|
|
ports := apiService.Spec.Ports
|
|
if len(ports) == 0 {
|
|
return nil, fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
floatingPool := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerFloatingNetworkID, lbaas.opts.FloatingNetworkID)
|
|
if len(floatingPool) == 0 {
|
|
var err error
|
|
floatingPool, err = getFloatingNetworkIDForLB(lbaas.network)
|
|
if err != nil {
|
|
klog.Warningf("Failed to find floating-network-id for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
}
|
|
|
|
var internalAnnotation bool
|
|
internal := getStringFromServiceAnnotation(apiService, ServiceAnnotationLoadBalancerInternal, "false")
|
|
switch internal {
|
|
case "true":
|
|
klog.V(4).Info("Ensure an internal loadbalancer service.")
|
|
internalAnnotation = true
|
|
case "false":
|
|
if len(floatingPool) != 0 {
|
|
klog.V(4).Infof("Ensure an external loadbalancer service, using floatingPool: %v", floatingPool)
|
|
internalAnnotation = false
|
|
} else {
|
|
return nil, fmt.Errorf("floating-network-id or loadbalancer.openstack.org/floating-network-id should be specified when ensuring an external loadbalancer service")
|
|
}
|
|
default:
|
|
return nil, fmt.Errorf("unknown service.beta.kubernetes.io/openstack-internal-load-balancer annotation: %v, specify \"true\" or \"false\" ",
|
|
internal)
|
|
}
|
|
|
|
// Check for TCP protocol on each port
|
|
// TODO: Convert all error messages to use an event recorder
|
|
for _, port := range ports {
|
|
if port.Protocol != v1.ProtocolTCP {
|
|
return nil, fmt.Errorf("only TCP LoadBalancer is supported for openstack load balancers")
|
|
}
|
|
}
|
|
|
|
sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
|
|
if !servicehelpers.IsAllowAll(sourceRanges) && !lbaas.opts.ManageSecurityGroups {
|
|
return nil, fmt.Errorf("source range restrictions are not supported for openstack load balancers without managing security groups")
|
|
}
|
|
|
|
affinity := apiService.Spec.SessionAffinity
|
|
var persistence *v2pools.SessionPersistence
|
|
switch affinity {
|
|
case v1.ServiceAffinityNone:
|
|
persistence = nil
|
|
case v1.ServiceAffinityClientIP:
|
|
persistence = &v2pools.SessionPersistence{Type: "SOURCE_IP"}
|
|
default:
|
|
return nil, fmt.Errorf("unsupported load balancer affinity: %v", affinity)
|
|
}
|
|
|
|
name := lbaas.GetLoadBalancerName(ctx, clusterName, apiService)
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.lb, name)
|
|
if err != nil {
|
|
if err != ErrNotFound {
|
|
return nil, fmt.Errorf("error getting loadbalancer %s: %v", name, err)
|
|
}
|
|
klog.V(2).Infof("Creating loadbalancer %s", name)
|
|
loadbalancer, err = lbaas.createLoadBalancer(apiService, name, internalAnnotation)
|
|
if err != nil {
|
|
// Unknown error, retry later
|
|
return nil, fmt.Errorf("error creating loadbalancer %s: %v", name, err)
|
|
}
|
|
} else {
|
|
klog.V(2).Infof("LoadBalancer %s already exists", name)
|
|
}
|
|
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
|
|
lbmethod := v2pools.LBMethod(lbaas.opts.LBMethod)
|
|
if lbmethod == "" {
|
|
lbmethod = v2pools.LBMethodRoundRobin
|
|
}
|
|
|
|
oldListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error getting LB %s listeners: %v", name, err)
|
|
}
|
|
for portIndex, port := range ports {
|
|
listener := getListenerForPort(oldListeners, port)
|
|
if listener == nil {
|
|
klog.V(4).Infof("Creating listener for port %d", int(port.Port))
|
|
listener, err = listeners.Create(lbaas.lb, listeners.CreateOpts{
|
|
Name: fmt.Sprintf("listener_%s_%d", name, portIndex),
|
|
Protocol: listeners.Protocol(port.Protocol),
|
|
ProtocolPort: int(port.Port),
|
|
LoadbalancerID: loadbalancer.ID,
|
|
}).Extract()
|
|
if err != nil {
|
|
// Unknown error, retry later
|
|
return nil, fmt.Errorf("error creating LB listener: %v", err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
klog.V(4).Infof("Listener for %s port %d: %s", string(port.Protocol), int(port.Port), listener.ID)
|
|
|
|
// After all ports have been processed, remaining listeners are removed as obsolete.
|
|
// Pop valid listeners.
|
|
oldListeners = popListener(oldListeners, listener.ID)
|
|
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
|
|
if err != nil && err != ErrNotFound {
|
|
// Unknown error, retry later
|
|
return nil, fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
|
|
}
|
|
if pool == nil {
|
|
klog.V(4).Infof("Creating pool for listener %s", listener.ID)
|
|
pool, err = v2pools.Create(lbaas.lb, v2pools.CreateOpts{
|
|
Name: fmt.Sprintf("pool_%s_%d", name, portIndex),
|
|
Protocol: v2pools.Protocol(port.Protocol),
|
|
LBMethod: lbmethod,
|
|
ListenerID: listener.ID,
|
|
Persistence: persistence,
|
|
}).Extract()
|
|
if err != nil {
|
|
// Unknown error, retry later
|
|
return nil, fmt.Errorf("error creating pool for listener %s: %v", listener.ID, err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
|
|
}
|
|
|
|
klog.V(4).Infof("Pool for listener %s: %s", listener.ID, pool.ID)
|
|
members, err := getMembersByPoolID(lbaas.lb, pool.ID)
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
|
|
}
|
|
for _, node := range nodes {
|
|
addr, err := nodeAddressForLB(node)
|
|
if err != nil {
|
|
if err == ErrNotFound {
|
|
// Node failure, do not create member
|
|
klog.Warningf("Failed to create LB pool member for node %s: %v", node.Name, err)
|
|
continue
|
|
} else {
|
|
return nil, fmt.Errorf("error getting address for node %s: %v", node.Name, err)
|
|
}
|
|
}
|
|
|
|
if !memberExists(members, addr, int(port.NodePort)) {
|
|
klog.V(4).Infof("Creating member for pool %s", pool.ID)
|
|
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
|
|
Name: fmt.Sprintf("member_%s_%d_%s", name, portIndex, node.Name),
|
|
ProtocolPort: int(port.NodePort),
|
|
Address: addr,
|
|
SubnetID: lbaas.opts.SubnetID,
|
|
}).Extract()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating LB pool member for node: %s, %v", node.Name, err)
|
|
}
|
|
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
} else {
|
|
// After all members have been processed, remaining members are deleted as obsolete.
|
|
members = popMember(members, addr, int(port.NodePort))
|
|
}
|
|
|
|
klog.V(4).Infof("Ensured pool %s has member for %s at %s", pool.ID, node.Name, addr)
|
|
}
|
|
|
|
// Delete obsolete members for this pool
|
|
for _, member := range members {
|
|
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
|
|
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
monitorID := pool.MonitorID
|
|
if monitorID == "" && lbaas.opts.CreateMonitor {
|
|
klog.V(4).Infof("Creating monitor for pool %s", pool.ID)
|
|
monitor, err := v2monitors.Create(lbaas.lb, v2monitors.CreateOpts{
|
|
Name: fmt.Sprintf("monitor_%s_%d", name, portIndex),
|
|
PoolID: pool.ID,
|
|
Type: string(port.Protocol),
|
|
Delay: int(lbaas.opts.MonitorDelay.Duration.Seconds()),
|
|
Timeout: int(lbaas.opts.MonitorTimeout.Duration.Seconds()),
|
|
MaxRetries: int(lbaas.opts.MonitorMaxRetries),
|
|
}).Extract()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating LB pool healthmonitor: %v", err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
monitorID = monitor.ID
|
|
} else if !lbaas.opts.CreateMonitor {
|
|
klog.V(4).Infof("Do not create monitor for pool %s when create-monitor is false", pool.ID)
|
|
}
|
|
|
|
if monitorID != "" {
|
|
klog.V(4).Infof("Monitor for pool %s: %s", pool.ID, monitorID)
|
|
}
|
|
}
|
|
|
|
// All remaining listeners are obsolete, delete
|
|
for _, listener := range oldListeners {
|
|
klog.V(4).Infof("Deleting obsolete listener %s:", listener.ID)
|
|
// get pool for listener
|
|
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
|
|
if err != nil && err != ErrNotFound {
|
|
return nil, fmt.Errorf("error getting pool for obsolete listener %s: %v", listener.ID, err)
|
|
}
|
|
if pool != nil {
|
|
// get and delete monitor
|
|
monitorID := pool.MonitorID
|
|
if monitorID != "" {
|
|
klog.V(4).Infof("Deleting obsolete monitor %s for pool %s", monitorID, pool.ID)
|
|
err = v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error deleting obsolete monitor %s for pool %s: %v", monitorID, pool.ID, err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
// get and delete pool members
|
|
members, err := getMembersByPoolID(lbaas.lb, pool.ID)
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error getting members for pool %s: %v", pool.ID, err)
|
|
}
|
|
for _, member := range members {
|
|
klog.V(4).Infof("Deleting obsolete member %s for pool %s address %s", member.ID, pool.ID, member.Address)
|
|
err := v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error deleting obsolete member %s for pool %s address %s: %v", member.ID, pool.ID, member.Address, err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
klog.V(4).Infof("Deleting obsolete pool %s for listener %s", pool.ID, listener.ID)
|
|
// delete pool
|
|
err = v2pools.Delete(lbaas.lb, pool.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error deleting obsolete pool %s for listener %s: %v", pool.ID, listener.ID, err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
// delete listener
|
|
err = listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return nil, fmt.Errorf("error deleteting obsolete listener: %v", err)
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
klog.V(2).Infof("Deleted obsolete listener: %s", listener.ID)
|
|
}
|
|
|
|
portID := loadbalancer.VipPortID
|
|
floatIP, err := getFloatingIPByPortID(lbaas.network, portID)
|
|
if err != nil && err != ErrNotFound {
|
|
return nil, fmt.Errorf("error getting floating ip for port %s: %v", portID, err)
|
|
}
|
|
if floatIP == nil && floatingPool != "" && !internalAnnotation {
|
|
klog.V(4).Infof("Creating floating ip for loadbalancer %s port %s", loadbalancer.ID, portID)
|
|
floatIPOpts := floatingips.CreateOpts{
|
|
FloatingNetworkID: floatingPool,
|
|
PortID: portID,
|
|
}
|
|
|
|
loadBalancerIP := apiService.Spec.LoadBalancerIP
|
|
if loadBalancerIP != "" {
|
|
floatIPOpts.FloatingIP = loadBalancerIP
|
|
}
|
|
|
|
floatIP, err = floatingips.Create(lbaas.network, floatIPOpts).Extract()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("error creating LB floatingip %+v: %v", floatIPOpts, err)
|
|
}
|
|
}
|
|
|
|
status := &v1.LoadBalancerStatus{}
|
|
|
|
if floatIP != nil {
|
|
status.Ingress = []v1.LoadBalancerIngress{{IP: floatIP.FloatingIP}}
|
|
} else {
|
|
status.Ingress = []v1.LoadBalancerIngress{{IP: loadbalancer.VipAddress}}
|
|
}
|
|
|
|
if lbaas.opts.ManageSecurityGroups {
|
|
err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
|
|
if err != nil {
|
|
return status, fmt.Errorf("Error reconciling security groups for LB service %v/%v: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
// ensureSecurityGroup ensures security group exist for specific loadbalancer service.
|
|
// Creating security group for specific loadbalancer service when it does not exist.
|
|
func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
|
|
// find node-security-group for service
|
|
var err error
|
|
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
|
|
lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
}
|
|
klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
|
|
|
|
// get service ports
|
|
ports := apiService.Spec.Ports
|
|
if len(ports) == 0 {
|
|
return fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
// get service source ranges
|
|
sourceRanges, err := servicehelpers.GetLoadBalancerSourceRanges(apiService)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get source ranges for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
|
|
// ensure security group for LB
|
|
lbSecGroupName := getSecurityGroupName(apiService)
|
|
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
|
|
if err != nil {
|
|
// If the security group of LB not exist, create it later
|
|
if isSecurityGroupNotFound(err) {
|
|
lbSecGroupID = ""
|
|
} else {
|
|
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
|
|
}
|
|
}
|
|
if len(lbSecGroupID) == 0 {
|
|
// create security group
|
|
lbSecGroupCreateOpts := groups.CreateOpts{
|
|
Name: getSecurityGroupName(apiService),
|
|
Description: fmt.Sprintf("Security Group for %s/%s Service LoadBalancer in cluster %s", apiService.Namespace, apiService.Name, clusterName),
|
|
}
|
|
|
|
lbSecGroup, err := groups.Create(lbaas.network, lbSecGroupCreateOpts).Extract()
|
|
if err != nil {
|
|
return fmt.Errorf("failed to create Security Group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
lbSecGroupID = lbSecGroup.ID
|
|
|
|
//add rule in security group
|
|
for _, port := range ports {
|
|
for _, sourceRange := range sourceRanges.StringSlice() {
|
|
ethertype := rules.EtherType4
|
|
network, _, err := net.ParseCIDR(sourceRange)
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("error parsing source range %s as a CIDR: %v", sourceRange, err)
|
|
}
|
|
|
|
if network.To4() == nil {
|
|
ethertype = rules.EtherType6
|
|
}
|
|
|
|
lbSecGroupRuleCreateOpts := rules.CreateOpts{
|
|
Direction: rules.DirIngress,
|
|
PortRangeMax: int(port.Port),
|
|
PortRangeMin: int(port.Port),
|
|
Protocol: toRuleProtocol(port.Protocol),
|
|
RemoteIPPrefix: sourceRange,
|
|
SecGroupID: lbSecGroup.ID,
|
|
EtherType: ethertype,
|
|
}
|
|
|
|
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
lbSecGroupRuleCreateOpts := rules.CreateOpts{
|
|
Direction: rules.DirIngress,
|
|
PortRangeMax: 4, // ICMP: Code - Values for ICMP "Destination Unreachable: Fragmentation Needed and Don't Fragment was Set"
|
|
PortRangeMin: 3, // ICMP: Type
|
|
Protocol: rules.ProtocolICMP,
|
|
RemoteIPPrefix: "0.0.0.0/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
|
|
SecGroupID: lbSecGroup.ID,
|
|
EtherType: rules.EtherType4,
|
|
}
|
|
|
|
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
|
|
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
|
|
}
|
|
|
|
lbSecGroupRuleCreateOpts = rules.CreateOpts{
|
|
Direction: rules.DirIngress,
|
|
PortRangeMax: 0, // ICMP: Code - Values for ICMP "Packet Too Big"
|
|
PortRangeMin: 2, // ICMP: Type
|
|
Protocol: rules.ProtocolICMP,
|
|
RemoteIPPrefix: "::/0", // The Fragmentation packet can come from anywhere along the path back to the sourceRange - we need to all this from all
|
|
SecGroupID: lbSecGroup.ID,
|
|
EtherType: rules.EtherType6,
|
|
}
|
|
|
|
_, err = rules.Create(lbaas.network, lbSecGroupRuleCreateOpts).Extract()
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred creating rule for SecGroup %s: %v", lbSecGroup.ID, err)
|
|
}
|
|
|
|
// get security groups of port
|
|
portID := loadbalancer.VipPortID
|
|
port, err := getPortByID(lbaas.network, portID)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// ensure the vip port has the security groups
|
|
found := false
|
|
for _, portSecurityGroups := range port.SecurityGroups {
|
|
if portSecurityGroups == lbSecGroup.ID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
// update loadbalancer vip port
|
|
if !found {
|
|
port.SecurityGroups = append(port.SecurityGroups, lbSecGroup.ID)
|
|
updateOpts := neutronports.UpdateOpts{SecurityGroups: &port.SecurityGroups}
|
|
res := neutronports.Update(lbaas.network, portID, updateOpts)
|
|
if res.Err != nil {
|
|
msg := fmt.Sprintf("Error occurred updating port %s for loadbalancer service %s/%s: %v", portID, apiService.Namespace, apiService.Name, res.Err)
|
|
return fmt.Errorf(msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
// ensure rules for every node security group
|
|
for _, port := range ports {
|
|
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
|
|
opts := rules.ListOpts{
|
|
Direction: string(rules.DirIngress),
|
|
SecGroupID: nodeSecurityGroupID,
|
|
RemoteGroupID: lbSecGroupID,
|
|
PortRangeMax: int(port.NodePort),
|
|
PortRangeMin: int(port.NodePort),
|
|
Protocol: string(port.Protocol),
|
|
}
|
|
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
|
|
if err != nil && !isNotFound(err) {
|
|
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
|
|
return fmt.Errorf(msg)
|
|
}
|
|
if len(secGroupRules) != 0 {
|
|
// Do not add rule when find rules for remote group in the Node Security Group
|
|
continue
|
|
}
|
|
|
|
// Add the rules in the Node Security Group
|
|
err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateLoadBalancer updates hosts under the specified load balancer.
|
|
func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
|
|
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
|
|
klog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
|
|
|
|
lbaas.opts.SubnetID = getStringFromServiceAnnotation(service, ServiceAnnotationLoadBalancerSubnetID, lbaas.opts.SubnetID)
|
|
if len(lbaas.opts.SubnetID) == 0 && len(nodes) > 0 {
|
|
// Get SubnetID automatically.
|
|
// The LB needs to be configured with instance addresses on the same subnet, so get SubnetID by one node.
|
|
subnetID, err := getSubnetIDForLB(lbaas.compute, *nodes[0])
|
|
if err != nil {
|
|
klog.Warningf("Failed to find subnet-id for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
|
|
return fmt.Errorf("no subnet-id for service %s/%s : subnet-id not set in cloud provider config, "+
|
|
"and failed to find subnet-id from OpenStack: %v", service.Namespace, service.Name, err)
|
|
}
|
|
lbaas.opts.SubnetID = subnetID
|
|
}
|
|
|
|
ports := service.Spec.Ports
|
|
if len(ports) == 0 {
|
|
return fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if loadbalancer == nil {
|
|
return fmt.Errorf("loadbalancer %s does not exist", loadBalancerName)
|
|
}
|
|
|
|
// Get all listeners for this loadbalancer, by "port key".
|
|
type portKey struct {
|
|
Protocol listeners.Protocol
|
|
Port int
|
|
}
|
|
var listenerIDs []string
|
|
lbListeners := make(map[portKey]listeners.Listener)
|
|
allListeners, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting listeners for LB %s: %v", loadBalancerName, err)
|
|
}
|
|
for _, l := range allListeners {
|
|
key := portKey{Protocol: listeners.Protocol(l.Protocol), Port: l.ProtocolPort}
|
|
lbListeners[key] = l
|
|
listenerIDs = append(listenerIDs, l.ID)
|
|
}
|
|
|
|
// Get all pools for this loadbalancer, by listener ID.
|
|
lbPools := make(map[string]v2pools.Pool)
|
|
for _, listenerID := range listenerIDs {
|
|
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listenerID)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting pool for listener %s: %v", listenerID, err)
|
|
}
|
|
lbPools[listenerID] = *pool
|
|
}
|
|
|
|
// Compose Set of member (addresses) that _should_ exist
|
|
addrs := make(map[string]*v1.Node)
|
|
for _, node := range nodes {
|
|
addr, err := nodeAddressForLB(node)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
addrs[addr] = node
|
|
}
|
|
|
|
// Check for adding/removing members associated with each port
|
|
for portIndex, port := range ports {
|
|
// Get listener associated with this port
|
|
listener, ok := lbListeners[portKey{
|
|
Protocol: toListenersProtocol(port.Protocol),
|
|
Port: int(port.Port),
|
|
}]
|
|
if !ok {
|
|
return fmt.Errorf("loadbalancer %s does not contain required listener for port %d and protocol %s", loadBalancerName, port.Port, port.Protocol)
|
|
}
|
|
|
|
// Get pool associated with this listener
|
|
pool, ok := lbPools[listener.ID]
|
|
if !ok {
|
|
return fmt.Errorf("loadbalancer %s does not contain required pool for listener %s", loadBalancerName, listener.ID)
|
|
}
|
|
|
|
// Find existing pool members (by address) for this port
|
|
getMembers, err := getMembersByPoolID(lbaas.lb, pool.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting pool members %s: %v", pool.ID, err)
|
|
}
|
|
members := make(map[string]v2pools.Member)
|
|
for _, member := range getMembers {
|
|
members[member.Address] = member
|
|
}
|
|
|
|
// Add any new members for this port
|
|
for addr, node := range addrs {
|
|
if _, ok := members[addr]; ok && members[addr].ProtocolPort == int(port.NodePort) {
|
|
// Already exists, do not create member
|
|
continue
|
|
}
|
|
_, err := v2pools.CreateMember(lbaas.lb, pool.ID, v2pools.CreateMemberOpts{
|
|
Name: fmt.Sprintf("member_%s_%d_%s", loadbalancer.Name, portIndex, node.Name),
|
|
Address: addr,
|
|
ProtocolPort: int(port.NodePort),
|
|
SubnetID: lbaas.opts.SubnetID,
|
|
}).Extract()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
// Remove any old members for this port
|
|
for _, member := range members {
|
|
if _, ok := addrs[member.Address]; ok && member.ProtocolPort == int(port.NodePort) {
|
|
// Still present, do not delete member
|
|
continue
|
|
}
|
|
err = v2pools.DeleteMember(lbaas.lb, pool.ID, member.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if lbaas.opts.ManageSecurityGroups {
|
|
err := lbaas.updateSecurityGroup(clusterName, service, nodes, loadbalancer)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateSecurityGroup updating security group for specific loadbalancer service.
|
|
func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Service, nodes []*v1.Node, loadbalancer *loadbalancers.LoadBalancer) error {
|
|
originalNodeSecurityGroupIDs := lbaas.opts.NodeSecurityGroupIDs
|
|
|
|
var err error
|
|
lbaas.opts.NodeSecurityGroupIDs, err = getNodeSecurityGroupIDForLB(lbaas.compute, lbaas.network, nodes)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to find node-security-group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
klog.V(4).Infof("find node-security-group %v for loadbalancer service %s/%s", lbaas.opts.NodeSecurityGroupIDs, apiService.Namespace, apiService.Name)
|
|
|
|
original := sets.NewString(originalNodeSecurityGroupIDs...)
|
|
current := sets.NewString(lbaas.opts.NodeSecurityGroupIDs...)
|
|
removals := original.Difference(current)
|
|
|
|
// Generate Name
|
|
lbSecGroupName := getSecurityGroupName(apiService)
|
|
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
|
|
}
|
|
|
|
ports := apiService.Spec.Ports
|
|
if len(ports) == 0 {
|
|
return fmt.Errorf("no ports provided to openstack load balancer")
|
|
}
|
|
|
|
for _, port := range ports {
|
|
for removal := range removals {
|
|
// Delete the rules in the Node Security Group
|
|
opts := rules.ListOpts{
|
|
Direction: string(rules.DirIngress),
|
|
SecGroupID: removal,
|
|
RemoteGroupID: lbSecGroupID,
|
|
PortRangeMax: int(port.NodePort),
|
|
PortRangeMin: int(port.NodePort),
|
|
Protocol: string(port.Protocol),
|
|
}
|
|
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
|
|
if err != nil && !isNotFound(err) {
|
|
return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, removal, err)
|
|
}
|
|
|
|
for _, rule := range secGroupRules {
|
|
res := rules.Delete(lbaas.network, rule.ID)
|
|
if res.Err != nil && !isNotFound(res.Err) {
|
|
return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
|
|
}
|
|
}
|
|
}
|
|
|
|
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
|
|
opts := rules.ListOpts{
|
|
Direction: string(rules.DirIngress),
|
|
SecGroupID: nodeSecurityGroupID,
|
|
RemoteGroupID: lbSecGroupID,
|
|
PortRangeMax: int(port.NodePort),
|
|
PortRangeMin: int(port.NodePort),
|
|
Protocol: string(port.Protocol),
|
|
}
|
|
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
|
|
if err != nil && !isNotFound(err) {
|
|
return fmt.Errorf("error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
|
|
}
|
|
if len(secGroupRules) != 0 {
|
|
// Do not add rule when find rules for remote group in the Node Security Group
|
|
continue
|
|
}
|
|
|
|
// Add the rules in the Node Security Group
|
|
err = createNodeSecurityGroup(lbaas.network, nodeSecurityGroupID, int(port.NodePort), port.Protocol, lbSecGroupID)
|
|
if err != nil {
|
|
return fmt.Errorf("error occurred creating security group for loadbalancer service %s/%s: %v", apiService.Namespace, apiService.Name, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EnsureLoadBalancerDeleted deletes the specified load balancer
|
|
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
|
|
loadBalancerName := lbaas.GetLoadBalancerName(ctx, clusterName, service)
|
|
klog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)
|
|
|
|
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
if loadbalancer == nil {
|
|
return nil
|
|
}
|
|
|
|
if loadbalancer.VipPortID != "" {
|
|
portID := loadbalancer.VipPortID
|
|
floatingIP, err := getFloatingIPByPortID(lbaas.network, portID)
|
|
if err != nil && err != ErrNotFound {
|
|
return err
|
|
}
|
|
if floatingIP != nil {
|
|
err = floatingips.Delete(lbaas.network, floatingIP.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
|
|
// get all listeners associated with this loadbalancer
|
|
listenerList, err := getListenersByLoadBalancerID(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("error getting LB %s listeners: %v", loadbalancer.ID, err)
|
|
}
|
|
|
|
// get all pools (and health monitors) associated with this loadbalancer
|
|
var poolIDs []string
|
|
var monitorIDs []string
|
|
for _, listener := range listenerList {
|
|
pool, err := getPoolByListenerID(lbaas.lb, loadbalancer.ID, listener.ID)
|
|
if err != nil && err != ErrNotFound {
|
|
return fmt.Errorf("error getting pool for listener %s: %v", listener.ID, err)
|
|
}
|
|
if pool != nil {
|
|
poolIDs = append(poolIDs, pool.ID)
|
|
// If create-monitor of cloud-config is false, pool has not monitor.
|
|
if pool.MonitorID != "" {
|
|
monitorIDs = append(monitorIDs, pool.MonitorID)
|
|
}
|
|
}
|
|
}
|
|
|
|
// delete all monitors
|
|
for _, monitorID := range monitorIDs {
|
|
err := v2monitors.Delete(lbaas.lb, monitorID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
// delete all members and pools
|
|
for _, poolID := range poolIDs {
|
|
// get members for current pool
|
|
membersList, err := getMembersByPoolID(lbaas.lb, poolID)
|
|
if err != nil && !isNotFound(err) {
|
|
return fmt.Errorf("error getting pool members %s: %v", poolID, err)
|
|
}
|
|
// delete all members for this pool
|
|
for _, member := range membersList {
|
|
err := v2pools.DeleteMember(lbaas.lb, poolID, member.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
// delete pool
|
|
err = v2pools.Delete(lbaas.lb, poolID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
// delete all listeners
|
|
for _, listener := range listenerList {
|
|
err := listeners.Delete(lbaas.lb, listener.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
provisioningStatus, err := waitLoadbalancerActiveProvisioningStatus(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to loadbalance ACTIVE provisioning status %v: %v", provisioningStatus, err)
|
|
}
|
|
}
|
|
|
|
// delete loadbalancer
|
|
err = loadbalancers.Delete(lbaas.lb, loadbalancer.ID).ExtractErr()
|
|
if err != nil && !isNotFound(err) {
|
|
return err
|
|
}
|
|
err = waitLoadbalancerDeleted(lbaas.lb, loadbalancer.ID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete loadbalancer: %v", err)
|
|
}
|
|
|
|
// Delete the Security Group
|
|
if lbaas.opts.ManageSecurityGroups {
|
|
err := lbaas.EnsureSecurityGroupDeleted(clusterName, service)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to delete Security Group for loadbalancer service %s/%s: %v", service.Namespace, service.Name, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// EnsureSecurityGroupDeleted deleting security group for specific loadbalancer service.
|
|
func (lbaas *LbaasV2) EnsureSecurityGroupDeleted(clusterName string, service *v1.Service) error {
|
|
// Generate Name
|
|
lbSecGroupName := getSecurityGroupName(service)
|
|
lbSecGroupID, err := groups.IDFromName(lbaas.network, lbSecGroupName)
|
|
if err != nil {
|
|
if isSecurityGroupNotFound(err) {
|
|
// It is OK when the security group has been deleted by others.
|
|
return nil
|
|
}
|
|
return fmt.Errorf("error occurred finding security group: %s: %v", lbSecGroupName, err)
|
|
}
|
|
|
|
lbSecGroup := groups.Delete(lbaas.network, lbSecGroupID)
|
|
if lbSecGroup.Err != nil && !isNotFound(lbSecGroup.Err) {
|
|
return lbSecGroup.Err
|
|
}
|
|
|
|
if len(lbaas.opts.NodeSecurityGroupIDs) == 0 {
|
|
// Just happen when nodes have not Security Group, or should not happen
|
|
// UpdateLoadBalancer and EnsureLoadBalancer can set lbaas.opts.NodeSecurityGroupIDs when it is empty
|
|
// And service controller call UpdateLoadBalancer to set lbaas.opts.NodeSecurityGroupIDs when controller manager service is restarted.
|
|
klog.Warningf("Can not find node-security-group from all the nodes of this cluster when delete loadbalancer service %s/%s",
|
|
service.Namespace, service.Name)
|
|
} else {
|
|
// Delete the rules in the Node Security Group
|
|
for _, nodeSecurityGroupID := range lbaas.opts.NodeSecurityGroupIDs {
|
|
opts := rules.ListOpts{
|
|
SecGroupID: nodeSecurityGroupID,
|
|
RemoteGroupID: lbSecGroupID,
|
|
}
|
|
secGroupRules, err := getSecurityGroupRules(lbaas.network, opts)
|
|
|
|
if err != nil && !isNotFound(err) {
|
|
msg := fmt.Sprintf("Error finding rules for remote group id %s in security group id %s: %v", lbSecGroupID, nodeSecurityGroupID, err)
|
|
return fmt.Errorf(msg)
|
|
}
|
|
|
|
for _, rule := range secGroupRules {
|
|
res := rules.Delete(lbaas.network, rule.ID)
|
|
if res.Err != nil && !isNotFound(res.Err) {
|
|
return fmt.Errorf("error occurred deleting security group rule: %s: %v", rule.ID, res.Err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|