Use aws-sdk-go

pull/6/head
Adam Sunderland 2015-05-14 14:18:25 -05:00
parent 5e7a7b9206
commit ff9996c100
2 changed files with 202 additions and 138 deletions

View File

@ -20,7 +20,9 @@ import (
"errors"
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"net/url"
"regexp"
"strings"
@ -28,8 +30,8 @@ import (
"time"
"code.google.com/p/gcfg"
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/ec2"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
@ -41,7 +43,7 @@ import (
// Abstraction over EC2, to allow mocking/other implementations
type EC2 interface {
// Query EC2 for instances matching the filter
Instances(instIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error)
Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error)
// Attach a volume to an instance
AttachVolume(volumeID string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error)
@ -87,7 +89,7 @@ type AWSCloud struct {
metadata AWSMetadata
cfg *AWSCloudConfig
availabilityZone string
region aws.Region
region string
// The AWS instance that we are running on
selfAWSInstance *awsInstance
@ -109,73 +111,122 @@ type ec2InstanceFilter struct {
}
// True if the passed instance matches the filter
func (f *ec2InstanceFilter) Matches(instance ec2.Instance) bool {
if f.PrivateDNSName != "" && instance.PrivateDNSName != f.PrivateDNSName {
func (f *ec2InstanceFilter) Matches(instance *ec2.Instance) bool {
if f.PrivateDNSName != "" && *instance.PrivateDNSName != f.PrivateDNSName {
return false
}
return true
}
// goamzEC2 is an implementation of the EC2 interface, backed by goamz
type goamzEC2 struct {
// awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go
type awsSdkEC2 struct {
ec2 *ec2.EC2
}
// Implementation of EC2.Instances
func (self *goamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
var goamzFilter *ec2.Filter
if filter != nil {
goamzFilter = ec2.NewFilter()
if filter.PrivateDNSName != "" {
goamzFilter.Add("private-dns-name", filter.PrivateDNSName)
func (self *awsSdkEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp []*ec2.Instance, err error) {
var filters []*ec2.Filter
if filter != nil && filter.PrivateDNSName != "" {
filters = []*ec2.Filter{
{
Name: aws.String("private-dns-name"),
Values: []*string{
aws.String(filter.PrivateDNSName),
},
},
}
}
return self.ec2.Instances(instanceIds, goamzFilter)
}
type goamzMetadata struct {
fetchedInstances := []*ec2.Instance{}
nextToken := ""
for {
res, err := self.ec2.DescribeInstances(&ec2.DescribeInstancesInput{
InstanceIDs: instanceIds,
Filters: filters,
NextToken: &nextToken,
})
if err != nil {
return nil, err
}
for _, reservation := range res.Reservations {
fetchedInstances = append(fetchedInstances, reservation.Instances...)
}
if *res.NextToken == "" {
break
}
nextToken = *res.NextToken
}
return fetchedInstances, nil
}
type awsSdkMetadata struct {
}
var metadataClient = http.Client{
Timeout: time.Second * 1,
}
// Implements AWSMetadata.GetMetaData
func (self *goamzMetadata) GetMetaData(key string) ([]byte, error) {
v, err := aws.GetMetaData(key)
func (self *awsSdkMetadata) GetMetaData(key string) ([]byte, error) {
// TODO Get an implementation of this merged into aws-sdk-go
url := "http://169.254.169.254/latest/meta-data" + path
res, err := metadataClient.Get(url)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode != 200 {
err = fmt.Errorf("Code %d returned for url %s", res.StatusCode, url)
return nil, fmt.Errorf("Error querying AWS metadata for key %s: %v", key, err)
}
body, err := ioutil.ReadAll(res.Body)
if err != nil {
return nil, fmt.Errorf("Error querying AWS metadata for key %s: %v", key, err)
}
return v, nil
return []byte(body), nil
}
type AuthFunc func() (auth aws.Auth, err error)
type AuthFunc func() (creds aws.CredentialsProvider, err error)
func (s *goamzEC2) AttachVolume(volumeID string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) {
func (s *awsSdkEC2) AttachVolume(volumeID string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) {
return s.ec2.AttachVolume(volumeID, instanceId, device)
}
func (s *goamzEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
func (s *awsSdkEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
return s.ec2.DetachVolume(volumeID)
}
func (s *goamzEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) {
func (s *awsSdkEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) {
return s.ec2.Volumes(volumeIDs, filter)
}
func (s *goamzEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) {
func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) {
return s.ec2.CreateVolume(request)
}
func (s *goamzEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
func (s *awsSdkEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) {
return s.ec2.DeleteVolume(volumeID)
}
func init() {
cloudprovider.RegisterCloudProvider("aws", func(config io.Reader) (cloudprovider.Interface, error) {
metadata := &goamzMetadata{}
metadata := &awsSdkMetadata{}
return newAWSCloud(config, getAuth, metadata)
})
}
func getAuth() (auth aws.Auth, err error) {
return aws.GetAuth("", "")
func getAuth() (creds aws.CredentialsProvider) {
return aws.DetectCreds("", "", "")
}
// readAWSCloudConfig reads an instance of AWSCloudConfig from config reader.
@ -217,6 +268,10 @@ func getAvailabilityZone(metadata AWSMetadata) (string, error) {
return string(availabilityZoneBytes), nil
}
func isRegionValid(region string) bool {
return true
}
// newAWSCloud creates a new instance of AWSCloud.
// authFunc and instanceId are primarily for tests
func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AWSCloud, error) {
@ -225,10 +280,7 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW
return nil, fmt.Errorf("unable to read AWS cloud provider config file: %v", err)
}
auth, err := authFunc()
if err != nil {
return nil, err
}
creds := authFunc()
zone := cfg.Global.Zone
if len(zone) <= 1 {
@ -236,17 +288,22 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW
}
regionName := zone[:len(zone)-1]
region, ok := aws.Regions[regionName]
if !ok {
valid := isRegionValid(regionName)
if !valid {
return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone)
}
ec2 := &goamzEC2{ec2: ec2.New(auth, region)}
ec2 := &awsSdkEC2{
ec2: ec2.New(&aws.Config{
Region: regionName,
Credentials: creds,
}),
}
awsCloud := &AWSCloud{
ec2: ec2,
cfg: cfg,
region: region,
region: regionName,
availabilityZone: zone,
metadata: metadata,
}
@ -282,11 +339,11 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) {
addresses := []api.NodeAddress{}
if instance.PrivateIpAddress != "" {
ipAddress := instance.PrivateIpAddress
if *instance.PrivateIpAddress != "" {
ipAddress := *instance.PrivateIpAddress
ip := net.ParseIP(ipAddress)
if ip == nil {
return nil, fmt.Errorf("EC2 instance had invalid private address: %s (%s)", instance.InstanceId, ipAddress)
return nil, fmt.Errorf("EC2 instance had invalid private address: %s (%s)", *instance.InstanceId, ipAddress)
}
addresses = append(addresses, api.NodeAddress{Type: api.NodeInternalIP, Address: ip.String()})
@ -295,11 +352,11 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) {
}
// TODO: Other IP addresses (multiple ips)?
if instance.PublicIpAddress != "" {
ipAddress := instance.PublicIpAddress
if *instance.PublicIpAddress != "" {
ipAddress := *instance.PublicIpAddress
ip := net.ParseIP(ipAddress)
if ip == nil {
return nil, fmt.Errorf("EC2 instance had invalid public address: %s (%s)", instance.InstanceId, ipAddress)
return nil, fmt.Errorf("EC2 instance had invalid public address: %s (%s)", *instance.InstanceId, ipAddress)
}
addresses = append(addresses, api.NodeAddress{Type: api.NodeExternalIP, Address: ip.String()})
}
@ -321,60 +378,58 @@ func (aws *AWSCloud) getInstancesByDnsName(name string) (*ec2.Instance, error) {
f := &ec2InstanceFilter{}
f.PrivateDNSName = name
resp, err := aws.ec2.Instances(nil, f)
instances, err := aws.ec2.Instances(nil, f)
if err != nil {
return nil, err
}
instances := []*ec2.Instance{}
for _, reservation := range resp.Reservations {
for _, instance := range reservation.Instances {
matchingInstances := []*ec2.Instance{}
for _, instance := range instances {
// TODO: Push running logic down into filter?
if !isAlive(&instance) {
if !isAlive(instance) {
continue
}
if instance.PrivateDNSName != name {
if *instance.PrivateDNSName != name {
// TODO: Should we warn here? - the filter should have caught this
// (this will happen in the tests if they don't fully mock the EC2 API)
continue
}
instances = append(instances, &instance)
}
matchingInstances = append(matchingInstances, instance)
}
if len(instances) == 0 {
if len(matchingInstances) == 0 {
return nil, fmt.Errorf("no instances found for host: %s", name)
}
if len(instances) > 1 {
if len(matchingInstances) > 1 {
return nil, fmt.Errorf("multiple instances found for host: %s", name)
}
return instances[0], nil
return matchingInstances[0], nil
}
// Check if the instance is alive (running or pending)
// We typically ignore instances that are not alive
func isAlive(instance *ec2.Instance) bool {
switch instance.State.Name {
switch *instance.State.Name {
case "shutting-down", "terminated", "stopping", "stopped":
return false
case "pending", "running":
return true
default:
glog.Errorf("unknown EC2 instance state: %s", instance.State)
glog.Errorf("unknown EC2 instance state: %s", *instance.State)
return false
}
}
// Return a list of instances matching regex string.
func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) {
resp, err := aws.ec2.Instances(nil, nil)
instances, err := aws.ec2.Instances(nil, nil)
if err != nil {
return []string{}, err
}
if resp == nil {
return []string{}, fmt.Errorf("no InstanceResp returned")
if len(instances) == 0 {
return []string{}, fmt.Errorf("no instances returned")
}
if strings.HasPrefix(regex, "'") && strings.HasSuffix(regex, "'") {
@ -387,38 +442,36 @@ func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) {
return []string{}, err
}
instances := []string{}
for _, reservation := range resp.Reservations {
for _, instance := range reservation.Instances {
matchingInstances := []string{}
for _, instance := range instances {
// TODO: Push filtering down into EC2 API filter?
if !isAlive(&instance) {
if !isAlive(instance) {
glog.V(2).Infof("skipping EC2 instance (%s): %s",
instance.State.Name, instance.InstanceId)
*instance.State.Name, *instance.InstanceID)
continue
}
// Only return fully-ready instances when listing instances
// (vs a query by name, where we will return it if we find it)
if instance.State.Name == "pending" {
glog.V(2).Infof("skipping EC2 instance (pending): %s", instance.InstanceId)
if *instance.State.Name == "pending" {
glog.V(2).Infof("skipping EC2 instance (pending): %s", *instance.InstanceID)
continue
}
if instance.PrivateDNSName == "" {
if *instance.PrivateDNSName == "" {
glog.V(2).Infof("skipping EC2 instance (no PrivateDNSName): %s",
instance.InstanceId)
*instance.InstanceID)
continue
}
for _, tag := range instance.Tags {
if tag.Key == "Name" && re.MatchString(tag.Value) {
instances = append(instances, instance.PrivateDNSName)
if *tag.Key == "Name" && re.MatchString(*tag.Value) {
matchingInstances = append(matchingInstances, *instance.PrivateDNSName)
break
}
}
}
}
glog.V(2).Infof("Matched EC2 instances: %s", instances)
return instances, nil
glog.V(2).Infof("Matched EC2 instances: %s", matchingInstances)
return matchingInstances, nil
}
// List is an implementation of Instances.List.
@ -434,7 +487,7 @@ func (aws *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) {
return nil, err
}
resources, err := getResourcesByInstanceType(instance.InstanceType)
resources, err := getResourcesByInstanceType(*instance.InstanceType)
if err != nil {
return nil, err
}
@ -590,7 +643,7 @@ func (self *AWSCloud) GetZone() (cloudprovider.Zone, error) {
}
return cloudprovider.Zone{
FailureDomain: self.availabilityZone,
Region: self.region.Name,
Region: self.region,
}, nil
}

View File

@ -22,8 +22,8 @@ import (
"strings"
"testing"
"github.com/mitchellh/goamz/aws"
"github.com/mitchellh/goamz/ec2"
"github.com/awslabs/aws-sdk-go/aws"
"github.com/awslabs/aws-sdk-go/service/ec2"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
@ -96,8 +96,8 @@ func TestReadAWSCloudConfig(t *testing.T) {
}
func TestNewAWSCloud(t *testing.T) {
fakeAuthFunc := func() (auth aws.Auth, err error) {
return aws.Auth{"", "", ""}, nil
fakeAuthFunc := func() (creds aws.CredentialProvider, err error) {
return aws.DetectCreds("", "", "")
}
tests := []struct {
@ -161,7 +161,7 @@ func TestNewAWSCloud(t *testing.T) {
}
type FakeEC2 struct {
instances []ec2.Instance
instances []*ec2.Instance
}
func contains(haystack []string, needle string) bool {
@ -174,7 +174,7 @@ func contains(haystack []string, needle string) bool {
}
func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) {
matches := []ec2.Instance{}
matches := []*ec2.Instance{}
for _, instance := range self.instances {
if filter != nil && !filter.Matches(instance) {
continue
@ -184,9 +184,8 @@ func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter)
}
matches = append(matches, instance)
}
return &ec2.InstancesResp{"",
[]ec2.Reservation{
{"", "", "", nil, matches}}}, nil
return matches, nil
}
type FakeMetadata struct {
@ -224,7 +223,7 @@ func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err err
panic("Not implemented")
}
func mockInstancesResp(instances []ec2.Instance) (aws *AWSCloud) {
func mockInstancesResp(instances []*ec2.Instance) (aws *AWSCloud) {
availabilityZone := "us-west-2d"
return &AWSCloud{
ec2: &FakeEC2{
@ -238,25 +237,37 @@ func mockAvailabilityZone(region string, availabilityZone string) *AWSCloud {
return &AWSCloud{
ec2: &FakeEC2{},
availabilityZone: availabilityZone,
region: aws.Regions[region],
region: region,
}
}
func TestList(t *testing.T) {
instances := make([]ec2.Instance, 4)
instances[0].Tags = []ec2.Tag{{"Name", "foo"}}
instances[0].PrivateDNSName = "instance1"
instances[0].State.Name = "running"
instances[1].Tags = []ec2.Tag{{"Name", "bar"}}
instances[1].PrivateDNSName = "instance2"
instances[1].State.Name = "running"
instances[2].Tags = []ec2.Tag{{"Name", "baz"}}
instances[2].PrivateDNSName = "instance3"
instances[2].State.Name = "running"
instances[3].Tags = []ec2.Tag{{"Name", "quux"}}
instances[3].PrivateDNSName = "instance4"
instances[3].State.Name = "running"
instances := make([]*ec2.Instance, 4)
instances[0].Tags = []ec2.Tag{{
Key: aws.String("Name"),
Value: aws.String("foo"),
}}
instances[0].PrivateDNSName = aws.String("instance1")
instances[0].State.Name = aws.String("running")
instances[1].Tags = []ec2.Tag{{
Key: aws.String("Name"),
Value: aws.String("bar"),
}}
instances[1].PrivateDNSName = aws.String("instance2")
instances[1].State.Name = aws.String("running")
instances[2].Tags = []ec2.Tag{{
Key: aws.String("Name"),
Value: aws.String("baz"),
}}
instances[2].PrivateDNSName = aws.String("instance3")
instances[2].State.Name = aws.String("running")
instances[3].Tags = []ec2.Tag{{
Key: aws.String("Name"),
Value: aws.String("quux"),
}}
instances[3].PrivateDNSName = aws.String("instance4")
instances[3].State.Name = aws.String("running")
aws := mockInstancesResp(instances)
@ -292,16 +303,16 @@ func testHasNodeAddress(t *testing.T, addrs []api.NodeAddress, addressType api.N
func TestNodeAddresses(t *testing.T) {
// Note these instances have the same name
// (we test that this produces an error)
instances := make([]ec2.Instance, 2)
instances[0].PrivateDNSName = "instance1"
instances[0].PrivateIpAddress = "192.168.0.1"
instances[0].PublicIpAddress = "1.2.3.4"
instances[0].State.Name = "running"
instances[1].PrivateDNSName = "instance1"
instances[1].PrivateIpAddress = "192.168.0.2"
instances[1].State.Name = "running"
instances := make([]*ec2.Instance, 2)
instances[0].PrivateDNSName = aws.String("instance1")
instances[0].PrivateIpAddress = aws.String("192.168.0.1")
instances[0].PublicIpAddress = aws.String("1.2.3.4")
instances[0].State.Name = aws.String("running")
instances[1].PrivateDNSName = aws.String("instance1")
instances[1].PrivateIpAddress = aws.String("192.168.0.2")
instances[1].State.Name = aws.String("running")
aws1 := mockInstancesResp([]ec2.Instance{})
aws1 := mockInstancesResp([]*ec2.Instance{})
_, err1 := aws1.NodeAddresses("instance")
if err1 == nil {
t.Errorf("Should error when no instance found")
@ -345,16 +356,16 @@ func TestGetRegion(t *testing.T) {
}
func TestGetResources(t *testing.T) {
instances := make([]ec2.Instance, 3)
instances[0].PrivateDNSName = "m3.medium"
instances[0].InstanceType = "m3.medium"
instances[0].State.Name = "running"
instances[1].PrivateDNSName = "r3.8xlarge"
instances[1].InstanceType = "r3.8xlarge"
instances[1].State.Name = "running"
instances[2].PrivateDNSName = "unknown.type"
instances[2].InstanceType = "unknown.type"
instances[2].State.Name = "running"
instances := make([]*ec2.Instance, 3)
instances[0].PrivateDNSName = aws.String("m3.medium")
instances[0].InstanceType = aws.String("m3.medium")
instances[0].State.Name = aws.String("running")
instances[1].PrivateDNSName = aws.String("r3.8xlarge")
instances[1].InstanceType = aws.String("r3.8xlarge")
instances[1].State.Name = aws.String("running")
instances[2].PrivateDNSName = aws.String("unknown.type")
instances[2].InstanceType = aws.String("unknown.type")
instances[2].State.Name = aws.String("running")
aws1 := mockInstancesResp(instances)