Enable managing public IP’s and work with projects

This commit adds logic for allocating and associating a public IP, if the `—load-balancer-ip` option is not used. It will do proper management of IP’s that are allocated by this provider, so IP’s that are no longer needed/used will also be released again.

Additionally the provider can now also work with CloudStack projects and advanced (VPC) networks.

Lastly the Zone interface now returns an actual zone (supplied by the cloud config), a few logical errors are fixed and the first few tests are added.

All the functionality is extensively tested against both basic and advanced (VPC) networks.
pull/6/head
Sander van Harmelen 2016-07-28 16:20:26 +02:00
parent 52cb7b0755
commit 7c3e644162
6 changed files with 747 additions and 430 deletions

View File

@ -80,6 +80,7 @@ pkg/client/unversioned/auth
pkg/client/unversioned/remotecommand
pkg/cloudprovider/providers
pkg/cloudprovider/providers/azure
pkg/cloudprovider/providers/cloudstack
pkg/controller/framework
pkg/controller/volume
pkg/controller/volume/attachdetach/cache

View File

@ -1,3 +0,0 @@
assignees:
- ngtuna
- runseb

View File

@ -1,34 +1,51 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudstack
import (
"fmt"
"io"
"gopkg.in/gcfg.v1"
"github.com/xanzy/go-cloudstack/cloudstack"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/api"
//"github.com/kubernetes/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/service"
//"github.com/kubernetes/kubernetes/pkg/api/service"
"github.com/golang/glog"
"github.com/xanzy/go-cloudstack/cloudstack"
"gopkg.in/gcfg.v1"
"k8s.io/kubernetes/pkg/cloudprovider"
)
// ProviderName is the name of this cloud provider.
const ProviderName = "cloudstack"
type Config struct {
// CSConfig wraps the config for the CloudStack cloud provider.
type CSConfig struct {
Global struct {
APIUrl string `gcfg:"api-url"`
APIKey string `gcfg:"api-key"`
SecretKey string `gcfg:"secret-key"`
VerifySSL bool `gcfg:"verify-ssl"`
}
APIURL string `gcfg:"api-url"`
APIKey string `gcfg:"api-key"`
SecretKey string `gcfg:"secret-key"`
SSLNoVerify bool `gcfg:"ssl-no-verify"`
ProjectID string `gcfg:"project-id"`
Zone string `gcfg:"zone"`
}
}
// CSCloud is an implementation of cloud provider Interface for CloudStack.
// CSCloud is an implementation of Interface for CloudStack.
type CSCloud struct {
client *cloudstack.CloudStackClient
// InstanceID of the server where this CloudStack object is instantiated.
localInstanceID string
client *cloudstack.CloudStackClient
projectID string // If non-"", all resources will be created within this project
zone string
}
func init() {
@ -37,69 +54,59 @@ func init() {
if err != nil {
return nil, err
}
return newCSCloud(cfg)
})
}
func readConfig(config io.Reader) (Config, error) {
func readConfig(config io.Reader) (*CSConfig, error) {
if config == nil {
err := fmt.Errorf("no cloud provider config given")
return Config{}, err
return nil, err
}
cfg := Config{}
if err := gcfg.ReadInto(&cfg, config); err != nil {
cfg := &CSConfig{}
if err := gcfg.ReadInto(cfg, config); err != nil {
glog.Errorf("Couldn't parse config: %v", err)
return Config{}, err
return nil, err
}
return cfg, nil
}
// newCSCloud creates a new instance of CSCloud
func newCSCloud(cfg Config) (*CSCloud, error) {
client := cloudstack.NewAsyncClient(cfg.Global.APIUrl, cfg.Global.APIKey, cfg.Global.SecretKey, cfg.Global.VerifySSL)
// newCSCloud creates a new instance of CSCloud.
func newCSCloud(cfg *CSConfig) (*CSCloud, error) {
client := cloudstack.NewAsyncClient(cfg.Global.APIURL, cfg.Global.APIKey, cfg.Global.SecretKey, !cfg.Global.SSLNoVerify)
id, err := readInstanceID()
if err != nil {
return nil, err
}
cs := CSCloud{
client: client,
localInstanceID: id,
}
return &cs, nil
}
func readInstanceID() (string, error) {
// TODO: get instanceID from virtual router metadata
return "", nil
return &CSCloud{client, cfg.Global.ProjectID, cfg.Global.Zone}, nil
}
// LoadBalancer returns an implementation of LoadBalancer for CloudStack.
func (cs *CSCloud) LoadBalancer() (cloudprovider.LoadBalancer, bool) {
glog.V(4).Info("cloudstack.LoadBalancer() called")
return &LoadBalancer{cs}, true
}
func (cs *CSCloud) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
return cs, true
}
// Instances returns an implementation of Instances for CloudStack.
func (cs *CSCloud) Instances() (cloudprovider.Instances, bool) {
return &Instances{cs}, true
}
func (cs *CSCloud) Routes() (cloudprovider.Routes, bool) {
return nil, false
}
// Zones returns an implementation of Zones for CloudStack.
func (cs *CSCloud) Zones() (cloudprovider.Zones, bool) {
return cs, true
}
// Clusters returns an implementation of Clusters for CloudStack.
func (cs *CSCloud) Clusters() (cloudprovider.Clusters, bool) {
return nil, false
}
// Routes returns an implementation of Routes for CloudStack.
func (cs *CSCloud) Routes() (cloudprovider.Routes, bool) {
return nil, false
}
// ProviderName returns the cloud provider ID.
func (cs *CSCloud) ProviderName() string {
return ProviderName
}
@ -109,380 +116,8 @@ func (cs *CSCloud) ScrubDNS(nameservers, searches []string) (nsOut, srchOut []st
return nameservers, searches
}
func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
return fmt.Errorf("unimplemented")
}
// GetZone returns the Zone containing the region that the program is running in.
func (cs *CSCloud) GetZone() (cloudprovider.Zone, error) {
glog.V(1).Infof("Current zone is null")
return cloudprovider.Zone{Region: ""}, nil
}
func (i *Instances) CurrentNodeName(hostname string) (string, error) {
return hostname, nil
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (i *Instances) ExternalID(name string) (string, error) {
var lb LoadBalancer
var hosts []string
hosts = append(hosts, name)
vmIDs, err := lb.getVirtualMachineIds(hosts)
if err != nil {
return "", err
}
return vmIDs[0], nil
}
// InstanceID returns the cloud provider ID of the specified instance.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
func (i *Instances) InstanceID(name string) (string, error) {
var lb LoadBalancer
var hosts []string
hosts = append(hosts, name)
vmIDs, err := lb.getVirtualMachineIds(hosts)
if err != nil {
return "", cloudprovider.InstanceNotFound
}
return vmIDs[0], nil
}
// InstanceType returns the type of the specified instance.
func (i *Instances) InstanceType(name string) (string, error) {
return "", nil
}
// List lists instances that match 'filter' which is a regular expression which must match the entire instance name (fqdn)
func (i *Instances) List(name_filter string) ([]string, error) {
vmParams := i.cs.client.VirtualMachine.NewListVirtualMachinesParams()
vmParams.SetName(name_filter)
vmParamsResponse, err := i.cs.client.VirtualMachine.ListVirtualMachines(vmParams)
if err != nil {
return nil, err
}
var vms []string
for _, vm := range vmParamsResponse.VirtualMachines {
vms = append(vms, vm.Name)
}
return vms, nil
}
// NodeAddresses returns the addresses of the specified instance.
// TODO(roberthbailey): This currently is only used in such a way that it
// returns the address of the calling instance. We should do a rename to
// make this clearer.
func (i *Instances) NodeAddresses(name string) ([]api.NodeAddress, error) {
vmParams := i.cs.client.VirtualMachine.NewListVirtualMachinesParams()
vmParams.SetName(name)
vmParamsResponse, err := i.cs.client.VirtualMachine.ListVirtualMachines(vmParams)
if err != nil {
return nil, err
}
addrs := []api.NodeAddress{}
publicIP := vmParamsResponse.VirtualMachines[0].Publicip
addrs = append(addrs, api.NodeAddress{Type: api.NodeExternalIP, Address: publicIP})
for _, nic := range vmParamsResponse.VirtualMachines[0].Nic {
addrs = append(addrs, api.NodeAddress{Type: api.NodeInternalIP, Address: nic.Ipaddress})
addrs = append(addrs, api.NodeAddress{Type: api.NodeLegacyHostIP, Address: nic.Ipaddress})
}
return addrs, nil
}
type LoadBalancer struct {
cs *CSCloud
}
type Instances struct {
cs *CSCloud
}
func (lb *LoadBalancer) GetLoadBalancer(apiService *api.Service) (*api.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
loadBalancer, _, err := lb.cs.client.LoadBalancer.GetLoadBalancerByName(loadBalancerName)
if err != nil {
return nil, false, nil
}
vip := loadBalancer.Sourceipaddress
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: vip}}
return status, true, err
}
func (lb *LoadBalancer) EnsureLoadBalancer(apiService *api.Service, hosts []string, annotations map[string]string) (*api.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, hosts, annotations)
sourceRanges, err := service.GetLoadBalancerSourceRanges(annotations)
if err != nil {
return nil, err
}
if !service.IsAllowAll(sourceRanges) {
return nil, fmt.Errorf("Source range restrictions are not supported for CloudStack load balancers")
}
glog.V(2).Infof("Checking if CloudStack load balancer already exists: %s", cloudprovider.GetLoadBalancerName(apiService))
_, exists, err := lb.GetLoadBalancer(apiService)
if err != nil {
return nil, fmt.Errorf("error checking if CloudStack load balancer already exists: %v", err)
}
// TODO: Implement a more efficient update strategy for common changes than delete & create
if exists {
err := lb.EnsureLoadBalancerDeleted(apiService)
if err != nil {
return nil, fmt.Errorf("error deleting existing CloudStack load balancer: %v", err)
}
}
//Config algorithm for the new LB
var algorithm string
switch apiService.Spec.SessionAffinity {
case api.ServiceAffinityNone:
algorithm = "roundrobin"
case api.ServiceAffinityClientIP:
algorithm = "source"
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", apiService.Spec.SessionAffinity)
}
//Get public IP address will be associated to the new LB
lbIpAddr := apiService.Spec.LoadBalancerIP
if lbIpAddr == "" {
return nil, fmt.Errorf("unsupported service without predefined Load Balancer IPaddress")
}
publicIpId, err := lb.getPublicIpId(lbIpAddr)
if err != nil {
return nil, fmt.Errorf("error getting public IP address information for creating CloudStack load balancer")
}
//Config name for new LB
lbName := apiService.ObjectMeta.Name
if lbName == "" {
return nil, fmt.Errorf("name is a required field for a CloudStack load balancer")
}
ports := apiService.Spec.Ports
if len(ports) == 0 {
return nil, fmt.Errorf("no ports provided to CloudStack load balancer")
}
//support multiple ports
for _, port := range ports {
//Init a new LB configuration
lbParams := lb.cs.client.LoadBalancer.NewCreateLoadBalancerRuleParams(
algorithm,
lbName,
port.NodePort,
port.Port,
)
//Config protocol for new LB
switch port.Protocol {
case api.ProtocolTCP:
lbParams.SetProtocol("TCP")
case api.ProtocolUDP:
lbParams.SetProtocol("UDP")
}
//Config LB IP
lbParams.SetPublicipid(publicIpId)
//Do not create corresponding firewall rule
lbParams.SetOpenfirewall(false)
// create a Load Balancer rule
createLBRuleResponse, err := lb.cs.client.LoadBalancer.CreateLoadBalancerRule(lbParams)
if err != nil {
return nil, err
}
// associate vms to new LB
assignLbParams := lb.cs.client.LoadBalancer.NewAssignToLoadBalancerRuleParams(createLBRuleResponse.Id)
vmIds, err := lb.getVirtualMachineIds(hosts)
if err != nil {
return nil, fmt.Errorf("error getting list of vms associated with CloudStack load balancer")
}
assignLbParams.SetVirtualmachineids(vmIds)
assignLBRuleResponse, err := lb.cs.client.LoadBalancer.AssignToLoadBalancerRule(assignLbParams)
if err != nil || !assignLBRuleResponse.Success {
return nil, err
}
}
status := &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: lbIpAddr}}
return status, nil
}
func (lb *LoadBalancer) UpdateLoadBalancer(apiService *api.Service, hosts []string) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v)", loadBalancerName, hosts)
lbParams := lb.cs.client.LoadBalancer.NewListLoadBalancerRulesParams()
//Get new list of vms associated with LB of service
//Set of member (addresses) that _should_ exist
vmIds, err := lb.getVirtualMachineIds(hosts)
if err != nil {
return fmt.Errorf("error getting list of vms associated with CloudStack load balancer")
}
vms := map[string]bool{}
for _, vmId := range vmIds {
vms[vmId] = true
}
//Now get the current list of vms. And then make comparison to update the list.
//Public IPaddress associated with LB of service
lbIpAddr := apiService.Spec.LoadBalancerIP
if lbIpAddr == "" {
return fmt.Errorf("unsupported service without predefined Load Balancer IPaddress")
}
//list all LB rules associated with this public IPaddress
publicIpId, err := lb.getPublicIpId(lbIpAddr)
if err != nil {
return fmt.Errorf("error getting public IP address information for creating CloudStack load balancer")
}
lbParams.SetPublicipid(publicIpId)
lbRulesResponse, err := lb.cs.client.LoadBalancer.ListLoadBalancerRules(lbParams)
if err != nil {
return err
}
lbRuleId := lbRulesResponse.LoadBalancerRules[0].Id
lbInstancesParams := lb.cs.client.LoadBalancer.NewListLoadBalancerRuleInstancesParams(lbRuleId)
lbInstancesParams.SetLbvmips(true)
//list out all VMs currently associated to this LB
lbInstancesResponse, err := lb.cs.client.LoadBalancer.ListLoadBalancerRuleInstances(lbInstancesParams)
if err != nil {
return err
}
var oldvmIds []string
for _, lbInstance := range lbInstancesResponse.LoadBalancerRuleInstances {
oldvmIds = append(oldvmIds, lbInstance.Loadbalancerruleinstance.Id)
}
//Compare two list of vms to thus update LB
var removedVmIds []string
for _, oldvmId := range oldvmIds {
if _, found := vms[oldvmId]; found {
delete(vms, oldvmId)
} else {
removedVmIds = append(removedVmIds, oldvmId)
}
}
//remove old vms from all LB rules associated with the public IP
for _, lbRule := range lbRulesResponse.LoadBalancerRules {
removeFromLbRuleParams := lb.cs.client.LoadBalancer.NewRemoveFromLoadBalancerRuleParams(lbRule.Id)
removeFromLbRuleParams.SetVirtualmachineids(removedVmIds)
_, err := lb.cs.client.LoadBalancer.RemoveFromLoadBalancerRule(removeFromLbRuleParams)
if err != nil {
return err
}
}
//assign new vms (the rest of vms map) to all LB rules associated with the public IP
var assignVmIds []string
for vm := range vms {
assignVmIds = append(assignVmIds, vm)
}
for _, lbRule := range lbRulesResponse.LoadBalancerRules {
assignToLbRuleParams := lb.cs.client.LoadBalancer.NewAssignToLoadBalancerRuleParams(lbRule.Id)
assignToLbRuleParams.SetVirtualmachineids(assignVmIds)
_, err := lb.cs.client.LoadBalancer.AssignToLoadBalancerRule(assignToLbRuleParams)
if err != nil {
return err
}
}
return nil
}
func (lb *LoadBalancer) EnsureLoadBalancerDeleted(apiService *api.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(apiService)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v)", loadBalancerName)
lbIpAddr := apiService.Spec.LoadBalancerIP
if lbIpAddr != "" {
//list all LB rules associated to this public ipaddress.
listLBParams := lb.cs.client.LoadBalancer.NewListLoadBalancerRulesParams()
publicIpId, err := lb.getPublicIpId(lbIpAddr)
if err != nil {
return fmt.Errorf("error getting public IP address information for creating CloudStack load balancer")
}
listLBParams.SetPublicipid(publicIpId)
listLoadBalancerResponse, err := lb.cs.client.LoadBalancer.ListLoadBalancerRules(listLBParams)
if err != nil {
return err
}
lbRules := listLoadBalancerResponse.LoadBalancerRules
//delete all found load balancer rules associated to this public ipaddress.
for _, lbRule := range lbRules {
lbParams := lb.cs.client.LoadBalancer.NewDeleteLoadBalancerRuleParams(lbRule.Id)
_, err := lb.cs.client.LoadBalancer.DeleteLoadBalancerRule(lbParams)
if err != nil {
return err
}
}
} else {
//only support delete load balancer with existing IP address
return nil
}
return nil
}
func (lb *LoadBalancer) getPublicIpId(lbIP string) (string, error) {
addressParams := lb.cs.client.Address.NewListPublicIpAddressesParams()
addressParams.SetIpaddress(lbIP)
addressResponse, err := lb.cs.client.Address.ListPublicIpAddresses(addressParams)
if err != nil {
return "", err
}
if addressResponse.Count > 1 {
return "", fmt.Errorf("Found more than one address objects with IP = %s", lbIP)
} else if addressResponse.Count == 0 {
//TODO: acquire new IP address with lbIP from CloudStack
}
return addressResponse.PublicIpAddresses[0].Id, nil
}
func (lb *LoadBalancer) getVirtualMachineIds(hosts []string) ([]string, error) {
var vmIDs []string
ipAddrs := map[string]bool{}
for _, host := range hosts {
ipAddrs[host] = true
}
//list all vms
listVMParams := lb.cs.client.VirtualMachine.NewListVirtualMachinesParams()
listVMParams.SetListall(true)
listVMResponse, err := lb.cs.client.VirtualMachine.ListVirtualMachines(listVMParams)
if err != nil {
return nil, err
}
//check if ipaddress belongs to the hosts slice, then add the corresponding vmid
for i := 0; i < listVMResponse.Count; i++ {
//check only the first Nic
ipAddr := listVMResponse.VirtualMachines[i].Nic[0].Ipaddress
if _, found := ipAddrs[ipAddr]; found {
vmIDs = append(vmIDs, listVMResponse.VirtualMachines[i].Id)
}
}
return vmIDs, nil
glog.V(2).Infof("Current zone is %v", cs.zone)
return cloudprovider.Zone{Region: cs.zone}, nil
}

View File

@ -0,0 +1,543 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudstack
import (
"fmt"
"strconv"
"github.com/golang/glog"
"github.com/xanzy/go-cloudstack/cloudstack"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/cloudprovider"
)
type loadBalancer struct {
*cloudstack.CloudStackClient
name string
algorithm string
hostIDs []string
ipAddr string
ipAddrID string
networkID string
projectID string
rules map[string]*cloudstack.LoadBalancerRule
}
// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
func (cs *CSCloud) GetLoadBalancer(clusterName string, service *api.Service) (*api.LoadBalancerStatus, bool, error) {
glog.V(4).Infof("GetLoadBalancer(%v, %v, %v)", clusterName, service.Namespace, service.Name)
// Get the load balancer details and existing rules.
lb, err := cs.getLoadBalancer(service)
if err != nil {
return nil, false, err
}
// If we don't have any rules, the load balancer does not exist.
if len(lb.rules) == 0 {
return nil, false, nil
}
glog.V(4).Infof("Found a load balancer associated with IP %v", lb.ipAddr)
status := &api.LoadBalancerStatus{}
status.Ingress = append(status.Ingress, api.LoadBalancerIngress{IP: lb.ipAddr})
return status, true, nil
}
// EnsureLoadBalancer creates a new load balancer, or updates the existing one. Returns the status of the balancer.
func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *api.Service, hosts []string) (status *api.LoadBalancerStatus, err error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, service.Spec.LoadBalancerIP, service.Spec.Ports, hosts)
if len(service.Spec.Ports) == 0 {
return nil, fmt.Errorf("requested load balancer with no ports")
}
// Get the load balancer details and existing rules.
lb, err := cs.getLoadBalancer(service)
if err != nil {
return nil, err
}
// Set the load balancer algorithm.
switch service.Spec.SessionAffinity {
case api.ServiceAffinityNone:
lb.algorithm = "roundrobin"
case api.ServiceAffinityClientIP:
lb.algorithm = "source"
default:
return nil, fmt.Errorf("unsupported load balancer affinity: %v", service.Spec.SessionAffinity)
}
// Verify that all the hosts belong to the same network, and retrieve their ID's.
lb.hostIDs, lb.networkID, err = cs.verifyHosts(hosts)
if err != nil {
return nil, err
}
if !lb.hasLoadBalancerIP() {
// Create or retrieve the load balancer IP.
if err := lb.getLoadBalancerIP(service.Spec.LoadBalancerIP); err != nil {
return nil, err
}
if lb.ipAddr != "" && lb.ipAddr != service.Spec.LoadBalancerIP {
defer func(lb *loadBalancer) {
if err != nil {
if err := lb.releaseLoadBalancerIP(); err != nil {
glog.Errorf(err.Error())
}
}
}(lb)
}
}
glog.V(4).Infof("Load balancer %v is associated with IP %v", lb.name, lb.ipAddr)
for _, port := range service.Spec.Ports {
// All ports have their own load balancer rule, so add the port to lbName to keep the names unique.
lbRuleName := fmt.Sprintf("%s-%d", lb.name, port.Port)
// If the load balancer rule exists and is up-to-date, we move on to the next rule.
exists, needsUpdate, err := lb.checkLoadBalancerRule(lbRuleName, port)
if err != nil {
return nil, err
}
if exists && !needsUpdate {
glog.V(4).Infof("Load balancer rule %v is up-to-date", lbRuleName)
// Delete the rule from the map, to prevent it being deleted.
delete(lb.rules, lbRuleName)
continue
}
if needsUpdate {
glog.V(4).Infof("Updating load balancer rule: %v", lbRuleName)
if err := lb.updateLoadBalancerRule(lbRuleName); err != nil {
return nil, err
}
// Delete the rule from the map, to prevent it being deleted.
delete(lb.rules, lbRuleName)
continue
}
glog.V(4).Infof("Creating load balancer rule: %v", lbRuleName)
lbRule, err := lb.createLoadBalancerRule(lbRuleName, port)
if err != nil {
return nil, err
}
glog.V(4).Infof("Assigning hosts (%v) to load balancer rule: %v", lb.hostIDs, lbRuleName)
if err = lb.assignHostsToRule(lbRule, lb.hostIDs); err != nil {
return nil, err
}
}
// Cleanup any rules that are now still in the rules map, as they are no longer needed.
for _, lbRule := range lb.rules {
glog.V(4).Infof("Deleting obsolete load balancer rule: %v", lbRule.Name)
if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
return nil, err
}
}
status = &api.LoadBalancerStatus{}
status.Ingress = []api.LoadBalancerIngress{{IP: lb.ipAddr}}
return status, nil
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *api.Service, hosts []string) error {
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v)", clusterName, service.Namespace, service.Name, hosts)
// Get the load balancer details and existing rules.
lb, err := cs.getLoadBalancer(service)
if err != nil {
return err
}
// Verify that all the hosts belong to the same network, and retrieve their ID's.
lb.hostIDs, _, err = cs.verifyHosts(hosts)
if err != nil {
return err
}
for _, lbRule := range lb.rules {
p := lb.LoadBalancer.NewListLoadBalancerRuleInstancesParams(lbRule.Id)
// Retrieve all VMs currently associated to this load balancer rule.
l, err := lb.LoadBalancer.ListLoadBalancerRuleInstances(p)
if err != nil {
return fmt.Errorf("error retrieving associated instances: %v", err)
}
assign, remove := symmetricDifference(lb.hostIDs, l.LoadBalancerRuleInstances)
if len(assign) > 0 {
glog.V(4).Infof("Assigning new hosts (%v) to load balancer rule: %v", assign, lbRule.Name)
if err := lb.assignHostsToRule(lbRule, assign); err != nil {
return err
}
}
if len(remove) > 0 {
glog.V(4).Infof("Removing old hosts (%v) from load balancer rule: %v", assign, lbRule.Name)
if err := lb.removeHostsFromRule(lbRule, remove); err != nil {
return err
}
}
}
return nil
}
// EnsureLoadBalancerDeleted deletes the specified load balancer if it exists, returning
// nil if the load balancer specified either didn't exist or was successfully deleted.
func (cs *CSCloud) EnsureLoadBalancerDeleted(clusterName string, service *api.Service) error {
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v, %v)", clusterName, service.Namespace, service.Name)
// Get the load balancer details and existing rules.
lb, err := cs.getLoadBalancer(service)
if err != nil {
return err
}
for _, lbRule := range lb.rules {
glog.V(4).Infof("Deleting load balancer rule: %v", lbRule.Name)
if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
return err
}
}
if lb.ipAddr != service.Spec.LoadBalancerIP {
glog.V(4).Infof("Releasing load balancer IP: %v", lb.ipAddr)
if err := lb.releaseLoadBalancerIP(); err != nil {
return err
}
}
return nil
}
// getLoadBalancer retrieves the IP address and ID and all the existing rules it can find.
func (cs *CSCloud) getLoadBalancer(service *api.Service) (*loadBalancer, error) {
lb := &loadBalancer{
CloudStackClient: cs.client,
name: cloudprovider.GetLoadBalancerName(service),
projectID: cs.projectID,
rules: make(map[string]*cloudstack.LoadBalancerRule),
}
p := cs.client.LoadBalancer.NewListLoadBalancerRulesParams()
p.SetKeyword(lb.name)
p.SetListall(true)
if cs.projectID != "" {
p.SetProjectid(cs.projectID)
}
l, err := cs.client.LoadBalancer.ListLoadBalancerRules(p)
if err != nil {
return nil, fmt.Errorf("error retrieving load balancer rules: %v", err)
}
for _, lbRule := range l.LoadBalancerRules {
lb.rules[lbRule.Name] = lbRule
if lb.ipAddr != "" && lb.ipAddr != lbRule.Publicip {
glog.Warningf("Load balancer for service %v/%v has rules associated with different IP's: %v, %v", service.Namespace, service.Name, lb.ipAddr, lbRule.Publicip)
}
lb.ipAddr = lbRule.Publicip
lb.ipAddrID = lbRule.Publicipid
}
glog.V(4).Infof("Load balancer %v contains %d rule(s)", lb.name, len(lb.rules))
return lb, nil
}
// verifyHosts verifies if all hosts belong to the same network, and returns the network and host ID's.
func (cs *CSCloud) verifyHosts(hosts []string) ([]string, string, error) {
ipAddrs := map[string]bool{}
for _, host := range hosts {
ipAddrs[host] = true
}
p := cs.client.VirtualMachine.NewListVirtualMachinesParams()
p.SetListall(true)
if cs.projectID != "" {
p.SetProjectid(cs.projectID)
}
l, err := cs.client.VirtualMachine.ListVirtualMachines(p)
if err != nil {
return nil, "", fmt.Errorf("error retrieving a list of hosts: %v", err)
}
var hostIDs []string
var networkID string
// Check if the address belongs to the hosts slice, then add the corresponding vm ID.
for _, vm := range l.VirtualMachines {
// We only check the primary NIC.
if ipAddrs[vm.Nic[0].Ipaddress] {
if networkID != "" && networkID != vm.Nic[0].Networkid {
return nil, "", fmt.Errorf("found hosts that belong to different networks")
}
networkID = vm.Nic[0].Networkid
hostIDs = append(hostIDs, vm.Id)
}
}
return hostIDs, networkID, nil
}
// getLoadBalancerIP retieves an existing IP or associates a new IP and returns the address and it's ID.
func (lb *loadBalancer) hasLoadBalancerIP() bool {
return lb.ipAddr != "" && lb.ipAddrID != ""
}
// getLoadBalancerIP retieves an existing IP or associates a new IP and returns the address and it's ID.
func (lb *loadBalancer) getLoadBalancerIP(loadBalancerIP string) error {
if loadBalancerIP != "" {
return lb.getPublicIPAddress(loadBalancerIP)
}
return lb.associatePublicIPAddress()
}
// getPublicIPAddressID retrieves the ID of the given IP, and returns the address and it's ID.
func (lb *loadBalancer) getPublicIPAddress(loadBalancerIP string) error {
glog.V(4).Infof("Retrieve load balancer IP details: %v", loadBalancerIP)
p := lb.Address.NewListPublicIpAddressesParams()
p.SetIpaddress(loadBalancerIP)
p.SetListall(true)
if lb.projectID != "" {
p.SetProjectid(lb.projectID)
}
l, err := lb.Address.ListPublicIpAddresses(p)
if err != nil {
return fmt.Errorf("error retrieving the IP address: %v", err)
}
if l.Count != 1 {
return fmt.Errorf("could not find IP address %v", loadBalancerIP)
}
lb.ipAddr = l.PublicIpAddresses[0].Ipaddress
lb.ipAddrID = l.PublicIpAddresses[0].Id
return nil
}
// associatePublicIPAddress associates a new IP and returns the address and it's ID.
func (lb *loadBalancer) associatePublicIPAddress() error {
glog.V(4).Infof("Allocate new IP for load balancer: %v", lb.name)
// If a network belongs to a VPC, the IP address needs to be associated with
// the VPC instead of with the network.
network, count, err := lb.Network.GetNetworkByID(lb.networkID, cloudstack.WithProject(lb.projectID))
if err != nil {
if count == 0 {
return fmt.Errorf("could not find network %v", lb.networkID)
}
return fmt.Errorf("error retrieving network: %v", err)
}
p := lb.Address.NewAssociateIpAddressParams()
if network.Vpcid != "" {
p.SetVpcid(network.Vpcid)
} else {
p.SetNetworkid(lb.networkID)
}
if lb.projectID != "" {
p.SetProjectid(lb.projectID)
}
// Associate a new IP address
r, err := lb.Address.AssociateIpAddress(p)
if err != nil {
return fmt.Errorf("error associating a new IP address: %v", err)
}
lb.ipAddr = r.Ipaddress
lb.ipAddrID = r.Id
return nil
}
// releasePublicIPAddress releases an associated IP.
func (lb *loadBalancer) releaseLoadBalancerIP() error {
p := lb.Address.NewDisassociateIpAddressParams(lb.ipAddrID)
if _, err := lb.Address.DisassociateIpAddress(p); err != nil {
return fmt.Errorf("error releasing load balancer IP %v: %v", lb.ipAddr, err)
}
return nil
}
// checkLoadBalancerRule checks if the rule already exists and if it does, if it can be updated. If
// it does exist but cannot be updated, it will delete the existing rule so it can be created again.
func (lb *loadBalancer) checkLoadBalancerRule(lbRuleName string, port api.ServicePort) (bool, bool, error) {
lbRule, ok := lb.rules[lbRuleName]
if !ok {
return false, false, nil
}
// Check if any of the values we cannot update (those that require a new load balancer rule) are changed.
if lbRule.Publicip == lb.ipAddr && lbRule.Privateport == strconv.Itoa(int(port.NodePort)) && lbRule.Publicport == strconv.Itoa(int(port.Port)) {
return true, lbRule.Algorithm != lb.algorithm, nil
}
// Delete the load balancer rule so we can create a new one using the new values.
if err := lb.deleteLoadBalancerRule(lbRule); err != nil {
return false, false, err
}
return false, false, nil
}
// updateLoadBalancerRule updates a load balancer rule.
func (lb *loadBalancer) updateLoadBalancerRule(lbRuleName string) error {
lbRule := lb.rules[lbRuleName]
p := lb.LoadBalancer.NewUpdateLoadBalancerRuleParams(lbRule.Id)
p.SetAlgorithm(lb.algorithm)
_, err := lb.LoadBalancer.UpdateLoadBalancerRule(p)
return err
}
// createLoadBalancerRule creates a new load balancer rule and returns it's ID.
func (lb *loadBalancer) createLoadBalancerRule(lbRuleName string, port api.ServicePort) (*cloudstack.LoadBalancerRule, error) {
p := lb.LoadBalancer.NewCreateLoadBalancerRuleParams(
lb.algorithm,
lbRuleName,
int(port.NodePort),
int(port.Port),
)
p.SetNetworkid(lb.networkID)
p.SetPublicipid(lb.ipAddrID)
switch port.Protocol {
case api.ProtocolTCP:
p.SetProtocol("TCP")
case api.ProtocolUDP:
p.SetProtocol("UDP")
default:
return nil, fmt.Errorf("unsupported load balancer protocol: %v", port.Protocol)
}
// Do not create corresponding firewall rule.
p.SetOpenfirewall(false)
// Create a new load balancer rule.
r, err := lb.LoadBalancer.CreateLoadBalancerRule(p)
if err != nil {
return nil, fmt.Errorf("error creating the load balancer rule %v: %v", lbRuleName, err)
}
lbRule := &cloudstack.LoadBalancerRule{
Id: r.Id,
Algorithm: r.Algorithm,
Cidrlist: r.Cidrlist,
Name: r.Name,
Networkid: r.Networkid,
Privateport: r.Privateport,
Publicport: r.Publicport,
Publicip: r.Publicip,
Publicipid: r.Publicipid,
}
return lbRule, nil
}
// deleteLoadBalancerRule deletes a load balancer rule.
func (lb *loadBalancer) deleteLoadBalancerRule(lbRule *cloudstack.LoadBalancerRule) error {
p := lb.LoadBalancer.NewDeleteLoadBalancerRuleParams(lbRule.Id)
if _, err := lb.LoadBalancer.DeleteLoadBalancerRule(p); err != nil {
return fmt.Errorf("error deleting load balancer rule %v: %v", lbRule.Name, err)
}
// Delete the rule from the map as it no longer exists
delete(lb.rules, lbRule.Name)
return nil
}
// assignHostsToRule assigns hosts to a load balancer rule.
func (lb *loadBalancer) assignHostsToRule(lbRule *cloudstack.LoadBalancerRule, hostIDs []string) error {
p := lb.LoadBalancer.NewAssignToLoadBalancerRuleParams(lbRule.Id)
p.SetVirtualmachineids(hostIDs)
if _, err := lb.LoadBalancer.AssignToLoadBalancerRule(p); err != nil {
return fmt.Errorf("error assigning hosts to load balancer rule %v: %v", lbRule.Name, err)
}
return nil
}
// removeHostsFromRule removes hosts from a load balancer rule.
func (lb *loadBalancer) removeHostsFromRule(lbRule *cloudstack.LoadBalancerRule, hostIDs []string) error {
p := lb.LoadBalancer.NewRemoveFromLoadBalancerRuleParams(lbRule.Id)
p.SetVirtualmachineids(hostIDs)
if _, err := lb.LoadBalancer.RemoveFromLoadBalancerRule(p); err != nil {
return fmt.Errorf("error removing hosts from load balancer rule %v: %v", lbRule.Name, err)
}
return nil
}
// symmetricDifference returns the symmetric difference between the old (existing) and new (wanted) host ID's.
func symmetricDifference(hostIDs []string, lbInstances []*cloudstack.VirtualMachine) ([]string, []string) {
new := make(map[string]bool)
for _, hostID := range hostIDs {
new[hostID] = true
}
var remove []string
for _, instance := range lbInstances {
if new[instance.Id] {
delete(new, instance.Id)
continue
}
remove = append(remove, instance.Id)
}
var assign []string
for hostID := range new {
assign = append(assign, hostID)
}
return assign, remove
}

View File

@ -0,0 +1,141 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloudstack
import (
"os"
"strconv"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
)
const testClusterName = "testCluster"
func TestReadConfig(t *testing.T) {
_, err := readConfig(nil)
if err == nil {
t.Errorf("Should fail when no config is provided: %v", err)
}
cfg, err := readConfig(strings.NewReader(`
[Global]
api-url = https://cloudstack.url
api-key = a-valid-api-key
secret-key = a-valid-secret-key
ssl-no-verify = true
project-id = a-valid-project-id
zone = a-valid-zone
`))
if err != nil {
t.Fatalf("Should succeed when a valid config is provided: %v", err)
}
if cfg.Global.APIURL != "https://cloudstack.url" {
t.Errorf("incorrect api-url: %s", cfg.Global.APIURL)
}
if cfg.Global.APIKey != "a-valid-api-key" {
t.Errorf("incorrect api-key: %s", cfg.Global.APIKey)
}
if cfg.Global.SecretKey != "a-valid-secret-key" {
t.Errorf("incorrect secret-key: %s", cfg.Global.SecretKey)
}
if !cfg.Global.SSLNoVerify {
t.Errorf("incorrect ssl-no-verify: %t", cfg.Global.SSLNoVerify)
}
if cfg.Global.Zone != "a-valid-zone" {
t.Errorf("incorrect zone: %s", cfg.Global.Zone)
}
}
// This allows acceptance testing against an existing CloudStack environment.
func configFromEnv() (*CSConfig, bool) {
cfg := &CSConfig{}
cfg.Global.APIURL = os.Getenv("CS_API_URL")
cfg.Global.APIKey = os.Getenv("CS_API_KEY")
cfg.Global.SecretKey = os.Getenv("CS_SECRET_KEY")
cfg.Global.ProjectID = os.Getenv("CS_PROJECT_ID")
cfg.Global.Zone = os.Getenv("CS_ZONE")
// It is save to ignore the error here. If the input cannot be parsed SSLNoVerify
// will still be a bool with it's zero value (false) which is the expected default.
cfg.Global.SSLNoVerify, _ = strconv.ParseBool(os.Getenv("CS_SSL_NO_VERIFY"))
// Check if we have the minimum required info to be able to connect to CloudStack.
ok := cfg.Global.APIURL != "" && cfg.Global.APIKey != "" && cfg.Global.SecretKey != ""
return cfg, ok
}
func TestNewCSCloud(t *testing.T) {
cfg, ok := configFromEnv()
if !ok {
t.Skipf("No config found in environment")
}
_, err := newCSCloud(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate CloudStack: %v", err)
}
}
func TestLoadBalancer(t *testing.T) {
cfg, ok := configFromEnv()
if !ok {
t.Skipf("No config found in environment")
}
cs, err := newCSCloud(cfg)
if err != nil {
t.Fatalf("Failed to construct/authenticate CloudStack: %v", err)
}
lb, ok := cs.LoadBalancer()
if !ok {
t.Fatalf("LoadBalancer() returned false")
}
_, exists, err := lb.GetLoadBalancer(testClusterName, &api.Service{ObjectMeta: api.ObjectMeta{Name: "noexist"}})
if err != nil {
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
}
if exists {
t.Fatalf("GetLoadBalancer(\"noexist\") returned exists")
}
}
func TestZones(t *testing.T) {
cs := &CSCloud{
zone: "myRegion",
}
z, ok := cs.Zones()
if !ok {
t.Fatalf("Zones() returned false")
}
zone, err := z.GetZone()
if err != nil {
t.Fatalf("GetZone() returned error: %s", err)
}
if zone.Region != "myRegion" {
t.Fatalf("GetZone() returned wrong region (%s)", zone.Region)
}
}

View File

@ -20,11 +20,11 @@ import (
// Cloud providers
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/cloudstack"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/mesos"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/ovirt"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/rackspace"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/vsphere"
_ "k8s.io/kubernetes/pkg/cloudprovider/providers/cloudstack"
)