From ff9996c100401769291c9ce9813b63da65fd0a0e Mon Sep 17 00:00:00 2001 From: Adam Sunderland Date: Thu, 14 May 2015 14:18:25 -0500 Subject: [PATCH] Use aws-sdk-go --- pkg/cloudprovider/aws/aws.go | 243 ++++++++++++++++++------------ pkg/cloudprovider/aws/aws_test.go | 97 ++++++------ 2 files changed, 202 insertions(+), 138 deletions(-) diff --git a/pkg/cloudprovider/aws/aws.go b/pkg/cloudprovider/aws/aws.go index d369634381..f2b4c0260b 100644 --- a/pkg/cloudprovider/aws/aws.go +++ b/pkg/cloudprovider/aws/aws.go @@ -20,7 +20,9 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net" + "net/http" "net/url" "regexp" "strings" @@ -28,8 +30,8 @@ import ( "time" "code.google.com/p/gcfg" - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/ec2" + "github.com/awslabs/aws-sdk-go/aws" + "github.com/awslabs/aws-sdk-go/service/ec2" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" @@ -41,7 +43,7 @@ import ( // Abstraction over EC2, to allow mocking/other implementations type EC2 interface { // Query EC2 for instances matching the filter - Instances(instIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) + Instances(instanceIds []string, filter *ec2InstanceFilter) (instances []*ec2.Instance, err error) // Attach a volume to an instance AttachVolume(volumeID string, instanceId string, mountDevice string) (resp *ec2.AttachVolumeResp, err error) @@ -87,7 +89,7 @@ type AWSCloud struct { metadata AWSMetadata cfg *AWSCloudConfig availabilityZone string - region aws.Region + region string // The AWS instance that we are running on selfAWSInstance *awsInstance @@ -109,73 +111,122 @@ type ec2InstanceFilter struct { } // True if the passed instance matches the filter -func (f *ec2InstanceFilter) Matches(instance ec2.Instance) bool { - if f.PrivateDNSName != "" && instance.PrivateDNSName != f.PrivateDNSName { +func (f *ec2InstanceFilter) Matches(instance *ec2.Instance) bool { + if f.PrivateDNSName != "" && *instance.PrivateDNSName != f.PrivateDNSName { return false } return true } -// goamzEC2 is an implementation of the EC2 interface, backed by goamz -type goamzEC2 struct { +// awsSdkEC2 is an implementation of the EC2 interface, backed by aws-sdk-go +type awsSdkEC2 struct { ec2 *ec2.EC2 } // Implementation of EC2.Instances -func (self *goamzEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) { - var goamzFilter *ec2.Filter - if filter != nil { - goamzFilter = ec2.NewFilter() - if filter.PrivateDNSName != "" { - goamzFilter.Add("private-dns-name", filter.PrivateDNSName) +func (self *awsSdkEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp []*ec2.Instance, err error) { + var filters []*ec2.Filter + if filter != nil && filter.PrivateDNSName != "" { + filters = []*ec2.Filter{ + { + Name: aws.String("private-dns-name"), + Values: []*string{ + aws.String(filter.PrivateDNSName), + }, + }, } } - return self.ec2.Instances(instanceIds, goamzFilter) + + fetchedInstances := []*ec2.Instance{} + nextToken := "" + + for { + res, err := self.ec2.DescribeInstances(&ec2.DescribeInstancesInput{ + InstanceIDs: instanceIds, + Filters: filters, + NextToken: &nextToken, + }) + + if err != nil { + return nil, err + } + + for _, reservation := range res.Reservations { + fetchedInstances = append(fetchedInstances, reservation.Instances...) + } + + if *res.NextToken == "" { + break + } + + nextToken = *res.NextToken + } + + return fetchedInstances, nil } -type goamzMetadata struct { +type awsSdkMetadata struct { +} + +var metadataClient = http.Client{ + Timeout: time.Second * 1, } // Implements AWSMetadata.GetMetaData -func (self *goamzMetadata) GetMetaData(key string) ([]byte, error) { - v, err := aws.GetMetaData(key) +func (self *awsSdkMetadata) GetMetaData(key string) ([]byte, error) { + // TODO Get an implementation of this merged into aws-sdk-go + url := "http://169.254.169.254/latest/meta-data" + path + + res, err := metadataClient.Get(url) + if err != nil { + return nil, err + } + defer res.Body.Close() + + if res.StatusCode != 200 { + err = fmt.Errorf("Code %d returned for url %s", res.StatusCode, url) + return nil, fmt.Errorf("Error querying AWS metadata for key %s: %v", key, err) + } + + body, err := ioutil.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("Error querying AWS metadata for key %s: %v", key, err) } - return v, nil + + return []byte(body), nil } -type AuthFunc func() (auth aws.Auth, err error) +type AuthFunc func() (creds aws.CredentialsProvider, err error) -func (s *goamzEC2) AttachVolume(volumeID string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) { +func (s *awsSdkEC2) AttachVolume(volumeID string, instanceId string, device string) (resp *ec2.AttachVolumeResp, err error) { return s.ec2.AttachVolume(volumeID, instanceId, device) } -func (s *goamzEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) { +func (s *awsSdkEC2) DetachVolume(volumeID string) (resp *ec2.SimpleResp, err error) { return s.ec2.DetachVolume(volumeID) } -func (s *goamzEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) { +func (s *awsSdkEC2) Volumes(volumeIDs []string, filter *ec2.Filter) (resp *ec2.VolumesResp, err error) { return s.ec2.Volumes(volumeIDs, filter) } -func (s *goamzEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) { +func (s *awsSdkEC2) CreateVolume(request *ec2.CreateVolume) (resp *ec2.CreateVolumeResp, err error) { return s.ec2.CreateVolume(request) } -func (s *goamzEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) { +func (s *awsSdkEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err error) { return s.ec2.DeleteVolume(volumeID) } func init() { cloudprovider.RegisterCloudProvider("aws", func(config io.Reader) (cloudprovider.Interface, error) { - metadata := &goamzMetadata{} + metadata := &awsSdkMetadata{} return newAWSCloud(config, getAuth, metadata) }) } -func getAuth() (auth aws.Auth, err error) { - return aws.GetAuth("", "") +func getAuth() (creds aws.CredentialsProvider) { + return aws.DetectCreds("", "", "") } // readAWSCloudConfig reads an instance of AWSCloudConfig from config reader. @@ -217,6 +268,10 @@ func getAvailabilityZone(metadata AWSMetadata) (string, error) { return string(availabilityZoneBytes), nil } +func isRegionValid(region string) bool { + return true +} + // newAWSCloud creates a new instance of AWSCloud. // authFunc and instanceId are primarily for tests func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AWSCloud, error) { @@ -225,10 +280,7 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW return nil, fmt.Errorf("unable to read AWS cloud provider config file: %v", err) } - auth, err := authFunc() - if err != nil { - return nil, err - } + creds := authFunc() zone := cfg.Global.Zone if len(zone) <= 1 { @@ -236,17 +288,22 @@ func newAWSCloud(config io.Reader, authFunc AuthFunc, metadata AWSMetadata) (*AW } regionName := zone[:len(zone)-1] - region, ok := aws.Regions[regionName] - if !ok { + valid := isRegionValid(regionName) + if !valid { return nil, fmt.Errorf("not a valid AWS zone (unknown region): %s", zone) } - ec2 := &goamzEC2{ec2: ec2.New(auth, region)} + ec2 := &awsSdkEC2{ + ec2: ec2.New(&aws.Config{ + Region: regionName, + Credentials: creds, + }), + } awsCloud := &AWSCloud{ ec2: ec2, cfg: cfg, - region: region, + region: regionName, availabilityZone: zone, metadata: metadata, } @@ -282,11 +339,11 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { addresses := []api.NodeAddress{} - if instance.PrivateIpAddress != "" { - ipAddress := instance.PrivateIpAddress + if *instance.PrivateIpAddress != "" { + ipAddress := *instance.PrivateIpAddress ip := net.ParseIP(ipAddress) if ip == nil { - return nil, fmt.Errorf("EC2 instance had invalid private address: %s (%s)", instance.InstanceId, ipAddress) + return nil, fmt.Errorf("EC2 instance had invalid private address: %s (%s)", *instance.InstanceId, ipAddress) } addresses = append(addresses, api.NodeAddress{Type: api.NodeInternalIP, Address: ip.String()}) @@ -295,11 +352,11 @@ func (aws *AWSCloud) NodeAddresses(name string) ([]api.NodeAddress, error) { } // TODO: Other IP addresses (multiple ips)? - if instance.PublicIpAddress != "" { - ipAddress := instance.PublicIpAddress + if *instance.PublicIpAddress != "" { + ipAddress := *instance.PublicIpAddress ip := net.ParseIP(ipAddress) if ip == nil { - return nil, fmt.Errorf("EC2 instance had invalid public address: %s (%s)", instance.InstanceId, ipAddress) + return nil, fmt.Errorf("EC2 instance had invalid public address: %s (%s)", *instance.InstanceId, ipAddress) } addresses = append(addresses, api.NodeAddress{Type: api.NodeExternalIP, Address: ip.String()}) } @@ -321,60 +378,58 @@ func (aws *AWSCloud) getInstancesByDnsName(name string) (*ec2.Instance, error) { f := &ec2InstanceFilter{} f.PrivateDNSName = name - resp, err := aws.ec2.Instances(nil, f) + instances, err := aws.ec2.Instances(nil, f) if err != nil { return nil, err } - instances := []*ec2.Instance{} - for _, reservation := range resp.Reservations { - for _, instance := range reservation.Instances { - // TODO: Push running logic down into filter? - if !isAlive(&instance) { - continue - } - - if instance.PrivateDNSName != name { - // TODO: Should we warn here? - the filter should have caught this - // (this will happen in the tests if they don't fully mock the EC2 API) - continue - } - - instances = append(instances, &instance) + matchingInstances := []*ec2.Instance{} + for _, instance := range instances { + // TODO: Push running logic down into filter? + if !isAlive(instance) { + continue } + + if *instance.PrivateDNSName != name { + // TODO: Should we warn here? - the filter should have caught this + // (this will happen in the tests if they don't fully mock the EC2 API) + continue + } + + matchingInstances = append(matchingInstances, instance) } - if len(instances) == 0 { + if len(matchingInstances) == 0 { return nil, fmt.Errorf("no instances found for host: %s", name) } - if len(instances) > 1 { + if len(matchingInstances) > 1 { return nil, fmt.Errorf("multiple instances found for host: %s", name) } - return instances[0], nil + return matchingInstances[0], nil } // Check if the instance is alive (running or pending) // We typically ignore instances that are not alive func isAlive(instance *ec2.Instance) bool { - switch instance.State.Name { + switch *instance.State.Name { case "shutting-down", "terminated", "stopping", "stopped": return false case "pending", "running": return true default: - glog.Errorf("unknown EC2 instance state: %s", instance.State) + glog.Errorf("unknown EC2 instance state: %s", *instance.State) return false } } // Return a list of instances matching regex string. func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { - resp, err := aws.ec2.Instances(nil, nil) + instances, err := aws.ec2.Instances(nil, nil) if err != nil { return []string{}, err } - if resp == nil { - return []string{}, fmt.Errorf("no InstanceResp returned") + if len(instances) == 0 { + return []string{}, fmt.Errorf("no instances returned") } if strings.HasPrefix(regex, "'") && strings.HasSuffix(regex, "'") { @@ -387,38 +442,36 @@ func (aws *AWSCloud) getInstancesByRegex(regex string) ([]string, error) { return []string{}, err } - instances := []string{} - for _, reservation := range resp.Reservations { - for _, instance := range reservation.Instances { - // TODO: Push filtering down into EC2 API filter? - if !isAlive(&instance) { - glog.V(2).Infof("skipping EC2 instance (%s): %s", - instance.State.Name, instance.InstanceId) - continue - } + matchingInstances := []string{} + for _, instance := range instances { + // TODO: Push filtering down into EC2 API filter? + if !isAlive(instance) { + glog.V(2).Infof("skipping EC2 instance (%s): %s", + *instance.State.Name, *instance.InstanceID) + continue + } - // Only return fully-ready instances when listing instances - // (vs a query by name, where we will return it if we find it) - if instance.State.Name == "pending" { - glog.V(2).Infof("skipping EC2 instance (pending): %s", instance.InstanceId) - continue - } - if instance.PrivateDNSName == "" { - glog.V(2).Infof("skipping EC2 instance (no PrivateDNSName): %s", - instance.InstanceId) - continue - } + // Only return fully-ready instances when listing instances + // (vs a query by name, where we will return it if we find it) + if *instance.State.Name == "pending" { + glog.V(2).Infof("skipping EC2 instance (pending): %s", *instance.InstanceID) + continue + } + if *instance.PrivateDNSName == "" { + glog.V(2).Infof("skipping EC2 instance (no PrivateDNSName): %s", + *instance.InstanceID) + continue + } - for _, tag := range instance.Tags { - if tag.Key == "Name" && re.MatchString(tag.Value) { - instances = append(instances, instance.PrivateDNSName) - break - } + for _, tag := range instance.Tags { + if *tag.Key == "Name" && re.MatchString(*tag.Value) { + matchingInstances = append(matchingInstances, *instance.PrivateDNSName) + break } } } - glog.V(2).Infof("Matched EC2 instances: %s", instances) - return instances, nil + glog.V(2).Infof("Matched EC2 instances: %s", matchingInstances) + return matchingInstances, nil } // List is an implementation of Instances.List. @@ -434,7 +487,7 @@ func (aws *AWSCloud) GetNodeResources(name string) (*api.NodeResources, error) { return nil, err } - resources, err := getResourcesByInstanceType(instance.InstanceType) + resources, err := getResourcesByInstanceType(*instance.InstanceType) if err != nil { return nil, err } @@ -590,7 +643,7 @@ func (self *AWSCloud) GetZone() (cloudprovider.Zone, error) { } return cloudprovider.Zone{ FailureDomain: self.availabilityZone, - Region: self.region.Name, + Region: self.region, }, nil } diff --git a/pkg/cloudprovider/aws/aws_test.go b/pkg/cloudprovider/aws/aws_test.go index 95b0bc7449..05732a09e6 100644 --- a/pkg/cloudprovider/aws/aws_test.go +++ b/pkg/cloudprovider/aws/aws_test.go @@ -22,8 +22,8 @@ import ( "strings" "testing" - "github.com/mitchellh/goamz/aws" - "github.com/mitchellh/goamz/ec2" + "github.com/awslabs/aws-sdk-go/aws" + "github.com/awslabs/aws-sdk-go/service/ec2" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" @@ -96,8 +96,8 @@ func TestReadAWSCloudConfig(t *testing.T) { } func TestNewAWSCloud(t *testing.T) { - fakeAuthFunc := func() (auth aws.Auth, err error) { - return aws.Auth{"", "", ""}, nil + fakeAuthFunc := func() (creds aws.CredentialProvider, err error) { + return aws.DetectCreds("", "", "") } tests := []struct { @@ -161,7 +161,7 @@ func TestNewAWSCloud(t *testing.T) { } type FakeEC2 struct { - instances []ec2.Instance + instances []*ec2.Instance } func contains(haystack []string, needle string) bool { @@ -174,7 +174,7 @@ func contains(haystack []string, needle string) bool { } func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) (resp *ec2.InstancesResp, err error) { - matches := []ec2.Instance{} + matches := []*ec2.Instance{} for _, instance := range self.instances { if filter != nil && !filter.Matches(instance) { continue @@ -184,9 +184,8 @@ func (self *FakeEC2) Instances(instanceIds []string, filter *ec2InstanceFilter) } matches = append(matches, instance) } - return &ec2.InstancesResp{"", - []ec2.Reservation{ - {"", "", "", nil, matches}}}, nil + + return matches, nil } type FakeMetadata struct { @@ -224,7 +223,7 @@ func (ec2 *FakeEC2) DeleteVolume(volumeID string) (resp *ec2.SimpleResp, err err panic("Not implemented") } -func mockInstancesResp(instances []ec2.Instance) (aws *AWSCloud) { +func mockInstancesResp(instances []*ec2.Instance) (aws *AWSCloud) { availabilityZone := "us-west-2d" return &AWSCloud{ ec2: &FakeEC2{ @@ -238,25 +237,37 @@ func mockAvailabilityZone(region string, availabilityZone string) *AWSCloud { return &AWSCloud{ ec2: &FakeEC2{}, availabilityZone: availabilityZone, - region: aws.Regions[region], + region: region, } } func TestList(t *testing.T) { - instances := make([]ec2.Instance, 4) - instances[0].Tags = []ec2.Tag{{"Name", "foo"}} - instances[0].PrivateDNSName = "instance1" - instances[0].State.Name = "running" - instances[1].Tags = []ec2.Tag{{"Name", "bar"}} - instances[1].PrivateDNSName = "instance2" - instances[1].State.Name = "running" - instances[2].Tags = []ec2.Tag{{"Name", "baz"}} - instances[2].PrivateDNSName = "instance3" - instances[2].State.Name = "running" - instances[3].Tags = []ec2.Tag{{"Name", "quux"}} - instances[3].PrivateDNSName = "instance4" - instances[3].State.Name = "running" + instances := make([]*ec2.Instance, 4) + instances[0].Tags = []ec2.Tag{{ + Key: aws.String("Name"), + Value: aws.String("foo"), + }} + instances[0].PrivateDNSName = aws.String("instance1") + instances[0].State.Name = aws.String("running") + instances[1].Tags = []ec2.Tag{{ + Key: aws.String("Name"), + Value: aws.String("bar"), + }} + instances[1].PrivateDNSName = aws.String("instance2") + instances[1].State.Name = aws.String("running") + instances[2].Tags = []ec2.Tag{{ + Key: aws.String("Name"), + Value: aws.String("baz"), + }} + instances[2].PrivateDNSName = aws.String("instance3") + instances[2].State.Name = aws.String("running") + instances[3].Tags = []ec2.Tag{{ + Key: aws.String("Name"), + Value: aws.String("quux"), + }} + instances[3].PrivateDNSName = aws.String("instance4") + instances[3].State.Name = aws.String("running") aws := mockInstancesResp(instances) @@ -292,16 +303,16 @@ func testHasNodeAddress(t *testing.T, addrs []api.NodeAddress, addressType api.N func TestNodeAddresses(t *testing.T) { // Note these instances have the same name // (we test that this produces an error) - instances := make([]ec2.Instance, 2) - instances[0].PrivateDNSName = "instance1" - instances[0].PrivateIpAddress = "192.168.0.1" - instances[0].PublicIpAddress = "1.2.3.4" - instances[0].State.Name = "running" - instances[1].PrivateDNSName = "instance1" - instances[1].PrivateIpAddress = "192.168.0.2" - instances[1].State.Name = "running" + instances := make([]*ec2.Instance, 2) + instances[0].PrivateDNSName = aws.String("instance1") + instances[0].PrivateIpAddress = aws.String("192.168.0.1") + instances[0].PublicIpAddress = aws.String("1.2.3.4") + instances[0].State.Name = aws.String("running") + instances[1].PrivateDNSName = aws.String("instance1") + instances[1].PrivateIpAddress = aws.String("192.168.0.2") + instances[1].State.Name = aws.String("running") - aws1 := mockInstancesResp([]ec2.Instance{}) + aws1 := mockInstancesResp([]*ec2.Instance{}) _, err1 := aws1.NodeAddresses("instance") if err1 == nil { t.Errorf("Should error when no instance found") @@ -345,16 +356,16 @@ func TestGetRegion(t *testing.T) { } func TestGetResources(t *testing.T) { - instances := make([]ec2.Instance, 3) - instances[0].PrivateDNSName = "m3.medium" - instances[0].InstanceType = "m3.medium" - instances[0].State.Name = "running" - instances[1].PrivateDNSName = "r3.8xlarge" - instances[1].InstanceType = "r3.8xlarge" - instances[1].State.Name = "running" - instances[2].PrivateDNSName = "unknown.type" - instances[2].InstanceType = "unknown.type" - instances[2].State.Name = "running" + instances := make([]*ec2.Instance, 3) + instances[0].PrivateDNSName = aws.String("m3.medium") + instances[0].InstanceType = aws.String("m3.medium") + instances[0].State.Name = aws.String("running") + instances[1].PrivateDNSName = aws.String("r3.8xlarge") + instances[1].InstanceType = aws.String("r3.8xlarge") + instances[1].State.Name = aws.String("running") + instances[2].PrivateDNSName = aws.String("unknown.type") + instances[2].InstanceType = aws.String("unknown.type") + instances[2].State.Name = aws.String("running") aws1 := mockInstancesResp(instances)