diff --git a/pkg/cloudprovider/openstack/openstack.go b/pkg/cloudprovider/openstack/openstack.go index e696eb0114..7bad4d003a 100644 --- a/pkg/cloudprovider/openstack/openstack.go +++ b/pkg/cloudprovider/openstack/openstack.go @@ -22,12 +22,17 @@ import ( "io" "net" "regexp" + "time" "code.google.com/p/gcfg" "github.com/rackspace/gophercloud" "github.com/rackspace/gophercloud/openstack" "github.com/rackspace/gophercloud/openstack/compute/v2/flavors" "github.com/rackspace/gophercloud/openstack/compute/v2/servers" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/members" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/monitors" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/pools" + "github.com/rackspace/gophercloud/openstack/networking/v2/extensions/lbaas/vips" "github.com/rackspace/gophercloud/pagination" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" @@ -35,28 +40,54 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) -var ErrServerNotFound = errors.New("Server not found") -var ErrMultipleServersFound = errors.New("Multiple servers matched query") -var ErrFlavorNotFound = errors.New("Flavor not found") +var ErrNotFound = errors.New("Failed to find object") +var ErrMultipleResults = errors.New("Multiple results where only one expected") var ErrNoAddressFound = errors.New("No address found for host") -var ErrInvalidAddress = errors.New("Invalid address") var ErrAttrNotFound = errors.New("Expected attribute not found") +// encoding.TextUnmarshaler interface for time.Duration +type MyDuration struct { + time.Duration +} + +func (d *MyDuration) UnmarshalText(text []byte) error { + res, err := time.ParseDuration(string(text)) + if err != nil { + return err + } + d.Duration = res + return nil +} + +type LoadBalancerOpts struct { + SubnetId string `gcfg:"subnet-id"` // required + CreateMonitor bool `gcfg:"create-monitor"` + MonitorDelay MyDuration `gcfg:"monitor-delay"` + MonitorTimeout MyDuration `gcfg:"monitor-timeout"` + MonitorMaxRetries uint `gcfg:"monitor-max-retries"` +} + // OpenStack is an implementation of cloud provider Interface for OpenStack. type OpenStack struct { provider *gophercloud.ProviderClient region string + lbOpts LoadBalancerOpts } type Config struct { Global struct { - AuthUrl string - Username, UserId string - Password, ApiKey string - TenantId, TenantName string - DomainId, DomainName string - Region string + AuthUrl string `gcfg:"auth-url"` + Username string + UserId string `gcfg:"user-id"` + Password string + ApiKey string `gcfg:"api-key"` + TenantId string `gcfg:"tenant-id"` + TenantName string `gcfg:"tenant-name"` + DomainId string `gcfg:"domain-id"` + DomainName string `gcfg:"domain-name"` + Region string } + LoadBalancer LoadBalancerOpts } func init() { @@ -104,6 +135,7 @@ func newOpenStack(cfg Config) (*OpenStack, error) { os := OpenStack{ provider: provider, region: cfg.Global.Region, + lbOpts: cfg.LoadBalancer, } return &os, nil } @@ -192,7 +224,7 @@ func getServerByName(client *gophercloud.ServiceClient, name string) (*servers.S } serverList = append(serverList, s...) if len(serverList) > 1 { - return false, ErrMultipleServersFound + return false, ErrMultipleResults } return true, nil }) @@ -201,9 +233,9 @@ func getServerByName(client *gophercloud.ServiceClient, name string) (*servers.S } if len(serverList) == 0 { - return nil, ErrServerNotFound + return nil, ErrNotFound } else if len(serverList) > 1 { - return nil, ErrMultipleServersFound + return nil, ErrMultipleResults } return &serverList[0], nil @@ -230,10 +262,10 @@ func firstAddr(netblob interface{}) string { return addr } -func (i *Instances) IPAddress(name string) (net.IP, error) { - srv, err := getServerByName(i.compute, name) +func getAddressByName(api *gophercloud.ServiceClient, name string) (string, error) { + srv, err := getServerByName(api, name) if err != nil { - return nil, err + return "", err } var s string @@ -250,13 +282,17 @@ func (i *Instances) IPAddress(name string) (net.IP, error) { s = srv.AccessIPv6 } if s == "" { - return nil, ErrNoAddressFound + return "", ErrNoAddressFound } - ip := net.ParseIP(s) - if ip == nil { - return nil, ErrInvalidAddress + return s, nil +} + +func (i *Instances) IPAddress(name string) (net.IP, error) { + ip, err := getAddressByName(i.compute, name) + if err != nil { + return nil, err } - return ip, nil + return net.ParseIP(ip), err } func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { @@ -275,20 +311,242 @@ func (i *Instances) GetNodeResources(name string) (*api.NodeResources, error) { } rsrc, ok := i.flavor_to_resource[flavId] if !ok { - return nil, ErrFlavorNotFound + return nil, ErrNotFound } return rsrc, nil } -func (aws *OpenStack) Clusters() (cloudprovider.Clusters, bool) { +func (os *OpenStack) Clusters() (cloudprovider.Clusters, bool) { return nil, false } +type LoadBalancer struct { + network *gophercloud.ServiceClient + compute *gophercloud.ServiceClient + opts LoadBalancerOpts +} + func (os *OpenStack) TCPLoadBalancer() (cloudprovider.TCPLoadBalancer, bool) { - return nil, false + // TODO: Search for and support Rackspace loadbalancer API, and others. + network, err := openstack.NewNetworkV2(os.provider, gophercloud.EndpointOpts{ + Region: os.region, + }) + if err != nil { + return nil, false + } + + compute, err := openstack.NewComputeV2(os.provider, gophercloud.EndpointOpts{ + Region: os.region, + }) + if err != nil { + return nil, false + } + + return &LoadBalancer{network, compute, os.lbOpts}, true +} + +func getVipByName(client *gophercloud.ServiceClient, name string) (*vips.VirtualIP, error) { + opts := vips.ListOpts{ + Name: name, + } + pager := vips.List(client, opts) + + vipList := make([]vips.VirtualIP, 0, 1) + + err := pager.EachPage(func(page pagination.Page) (bool, error) { + v, err := vips.ExtractVIPs(page) + if err != nil { + return false, err + } + vipList = append(vipList, v...) + if len(vipList) > 1 { + return false, ErrMultipleResults + } + return true, nil + }) + if err != nil { + return nil, err + } + + if len(vipList) == 0 { + return nil, ErrNotFound + } else if len(vipList) > 1 { + return nil, ErrMultipleResults + } + + return &vipList[0], nil +} + +func (lb *LoadBalancer) TCPLoadBalancerExists(name, region string) (bool, error) { + vip, err := getVipByName(lb.network, name) + if err == ErrNotFound { + return false, nil + } + return vip != nil, err +} + +// TODO: This code currently ignores 'region' and always creates a +// loadbalancer in only the current OpenStack region. We should take +// a list of regions (from config) and query/create loadbalancers in +// each region. + +func (lb *LoadBalancer) CreateTCPLoadBalancer(name, region string, externalIP net.IP, port int, hosts []string) (net.IP, error) { + pool, err := pools.Create(lb.network, pools.CreateOpts{ + Name: name, + Protocol: pools.ProtocolTCP, + SubnetID: lb.opts.SubnetId, + }).Extract() + if err != nil { + return nil, err + } + + for _, host := range hosts { + addr, err := getAddressByName(lb.compute, host) + if err != nil { + return nil, err + } + + _, err = members.Create(lb.network, members.CreateOpts{ + PoolID: pool.ID, + ProtocolPort: port, + Address: addr, + }).Extract() + if err != nil { + pools.Delete(lb.network, pool.ID) + return nil, err + } + } + + var mon *monitors.Monitor + if lb.opts.CreateMonitor { + mon, err = monitors.Create(lb.network, monitors.CreateOpts{ + Type: monitors.TypeTCP, + Delay: int(lb.opts.MonitorDelay.Duration.Seconds()), + Timeout: int(lb.opts.MonitorTimeout.Duration.Seconds()), + MaxRetries: int(lb.opts.MonitorMaxRetries), + }).Extract() + if err != nil { + pools.Delete(lb.network, pool.ID) + return nil, err + } + + _, err = pools.AssociateMonitor(lb.network, pool.ID, mon.ID).Extract() + if err != nil { + monitors.Delete(lb.network, mon.ID) + pools.Delete(lb.network, pool.ID) + return nil, err + } + } + + vip, err := vips.Create(lb.network, vips.CreateOpts{ + Name: name, + Description: fmt.Sprintf("Kubernetes external service %s", name), + Address: externalIP.String(), + Protocol: "TCP", + ProtocolPort: port, + PoolID: pool.ID, + }).Extract() + if err != nil { + if mon != nil { + monitors.Delete(lb.network, mon.ID) + } + pools.Delete(lb.network, pool.ID) + return nil, err + } + + return net.ParseIP(vip.Address), nil +} + +func (lb *LoadBalancer) UpdateTCPLoadBalancer(name, region string, hosts []string) error { + vip, err := getVipByName(lb.network, name) + if err != nil { + return err + } + + // Set of member (addresses) that _should_ exist + addrs := map[string]bool{} + for _, host := range hosts { + addr, err := getAddressByName(lb.compute, host) + if err != nil { + return err + } + + addrs[addr] = true + } + + // Iterate over members that _do_ exist + pager := members.List(lb.network, members.ListOpts{PoolID: vip.PoolID}) + err = pager.EachPage(func(page pagination.Page) (bool, error) { + memList, err := members.ExtractMembers(page) + if err != nil { + return false, err + } + + for _, member := range memList { + if _, found := addrs[member.Address]; found { + // Member already exists + delete(addrs, member.Address) + } else { + // Member needs to be deleted + err = members.Delete(lb.network, member.ID).ExtractErr() + if err != nil { + return false, err + } + } + } + + return true, nil + }) + if err != nil { + return err + } + + // Anything left in addrs is a new member that needs to be added + for addr := range addrs { + _, err := members.Create(lb.network, members.CreateOpts{ + PoolID: vip.PoolID, + Address: addr, + ProtocolPort: vip.ProtocolPort, + }).Extract() + if err != nil { + return err + } + } + + return nil +} + +func (lb *LoadBalancer) DeleteTCPLoadBalancer(name, region string) error { + vip, err := getVipByName(lb.network, name) + if err != nil { + return err + } + + pool, err := pools.Get(lb.network, vip.PoolID).Extract() + if err != nil { + return err + } + + // Have to delete VIP before pool can be deleted + err = vips.Delete(lb.network, vip.ID).ExtractErr() + if err != nil { + return err + } + + // Ignore errors for everything following here + + for _, monId := range pool.MonitorIDs { + pools.DisassociateMonitor(lb.network, pool.ID, monId) + } + pools.Delete(lb.network, pool.ID) + + return nil } func (os *OpenStack) Zones() (cloudprovider.Zones, bool) { - return nil, false + return os, true +} +func (os *OpenStack) GetZone() (cloudprovider.Zone, error) { + return cloudprovider.Zone{Region: os.region}, nil } diff --git a/pkg/cloudprovider/openstack/openstack_test.go b/pkg/cloudprovider/openstack/openstack_test.go index d6474f1f54..2c88d38deb 100644 --- a/pkg/cloudprovider/openstack/openstack_test.go +++ b/pkg/cloudprovider/openstack/openstack_test.go @@ -20,6 +20,9 @@ import ( "os" "strings" "testing" + "time" + + "github.com/rackspace/gophercloud" ) func TestReadConfig(t *testing.T) { @@ -30,8 +33,13 @@ func TestReadConfig(t *testing.T) { cfg, err := readConfig(strings.NewReader(` [Global] -authurl = http://auth.url +auth-url = http://auth.url username = user +[LoadBalancer] +create-monitor = yes +monitor-delay = 1m +monitor-timeout = 30s +monitor-max-retries = 3 `)) if err != nil { t.Fatalf("Should succeed when a valid config is provided: %s", err) @@ -39,6 +47,19 @@ username = user if cfg.Global.AuthUrl != "http://auth.url" { t.Errorf("incorrect authurl: %s", cfg.Global.AuthUrl) } + + if !cfg.LoadBalancer.CreateMonitor { + t.Errorf("incorrect lb.createmonitor: %s", cfg.LoadBalancer.CreateMonitor) + } + if cfg.LoadBalancer.MonitorDelay.Duration != 1*time.Minute { + t.Errorf("incorrect lb.monitordelay: %s", cfg.LoadBalancer.MonitorDelay) + } + if cfg.LoadBalancer.MonitorTimeout.Duration != 30*time.Second { + t.Errorf("incorrect lb.monitortimeout: %s", cfg.LoadBalancer.MonitorTimeout) + } + if cfg.LoadBalancer.MonitorMaxRetries != 3 { + t.Errorf("incorrect lb.monitormaxretries: %s", cfg.LoadBalancer.MonitorMaxRetries) + } } func TestToAuthOptions(t *testing.T) { @@ -56,8 +77,11 @@ func TestToAuthOptions(t *testing.T) { } } -// This allows testing against an existing OpenStack install, using the -// standard OS_* OpenStack client environment variables. +// This allows acceptance testing against an existing OpenStack +// install, using the standard OS_* OpenStack client environment +// variables. +// FIXME: it would be better to hermetically test against canned JSON +// requests/responses. func configFromEnv() (cfg Config, ok bool) { cfg.Global.AuthUrl = os.Getenv("OS_AUTH_URL") @@ -132,3 +156,51 @@ func TestInstances(t *testing.T) { } t.Logf("Found GetNodeResources(%s) = %s\n", srvs[0], rsrcs) } + +func TestTCPLoadBalancer(t *testing.T) { + cfg, ok := configFromEnv() + if !ok { + t.Skipf("No config found in environment") + } + + os, err := newOpenStack(cfg) + if err != nil { + t.Fatalf("Failed to construct/authenticate OpenStack: %s", err) + } + + lb, ok := os.TCPLoadBalancer() + if !ok { + t.Fatalf("TCPLoadBalancer() returned false - perhaps your stack doesn't support Neutron?") + } + + exists, err := lb.TCPLoadBalancerExists("noexist", "region") + if err != nil { + t.Fatalf("TCPLoadBalancerExists(\"noexist\") returned error: %s", err) + } + if exists { + t.Fatalf("TCPLoadBalancerExists(\"noexist\") returned true") + } +} + +func TestZones(t *testing.T) { + os := OpenStack{ + provider: &gophercloud.ProviderClient{ + IdentityBase: "http://auth.url/", + }, + region: "myRegion", + } + + z, ok := os.Zones() + if !ok { + t.Fatalf("Zones() returned false") + } + + zone, err := z.GetZone() + if err != nil { + t.Fatalf("GetZone() returned error: %s", err) + } + + if zone.Region != "myRegion" { + t.Fatalf("GetZone() returned wrong region (%s)", zone.Region) + } +}