mirror of https://github.com/hashicorp/consul
Browse Source
This isn't used in OSS but CE needs it. It'll be easier to manage common deps in OSS and take them to CE.pull/2508/head
James Phillips
8 years ago
18 changed files with 16576 additions and 0 deletions
@ -0,0 +1,69 @@
|
||||
// Package restxml provides RESTful XML serialization of AWS
|
||||
// requests and responses.
|
||||
package restxml |
||||
|
||||
//go:generate go run -tags codegen ../../../models/protocol_tests/generate.go ../../../models/protocol_tests/input/rest-xml.json build_test.go
|
||||
//go:generate go run -tags codegen ../../../models/protocol_tests/generate.go ../../../models/protocol_tests/output/rest-xml.json unmarshal_test.go
|
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/xml" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
"github.com/aws/aws-sdk-go/private/protocol/query" |
||||
"github.com/aws/aws-sdk-go/private/protocol/rest" |
||||
"github.com/aws/aws-sdk-go/private/protocol/xml/xmlutil" |
||||
) |
||||
|
||||
// BuildHandler is a named request handler for building restxml protocol requests
|
||||
var BuildHandler = request.NamedHandler{Name: "awssdk.restxml.Build", Fn: Build} |
||||
|
||||
// UnmarshalHandler is a named request handler for unmarshaling restxml protocol requests
|
||||
var UnmarshalHandler = request.NamedHandler{Name: "awssdk.restxml.Unmarshal", Fn: Unmarshal} |
||||
|
||||
// UnmarshalMetaHandler is a named request handler for unmarshaling restxml protocol request metadata
|
||||
var UnmarshalMetaHandler = request.NamedHandler{Name: "awssdk.restxml.UnmarshalMeta", Fn: UnmarshalMeta} |
||||
|
||||
// UnmarshalErrorHandler is a named request handler for unmarshaling restxml protocol request errors
|
||||
var UnmarshalErrorHandler = request.NamedHandler{Name: "awssdk.restxml.UnmarshalError", Fn: UnmarshalError} |
||||
|
||||
// Build builds a request payload for the REST XML protocol.
|
||||
func Build(r *request.Request) { |
||||
rest.Build(r) |
||||
|
||||
if t := rest.PayloadType(r.Params); t == "structure" || t == "" { |
||||
var buf bytes.Buffer |
||||
err := xmlutil.BuildXML(r.Params, xml.NewEncoder(&buf)) |
||||
if err != nil { |
||||
r.Error = awserr.New("SerializationError", "failed to encode rest XML request", err) |
||||
return |
||||
} |
||||
r.SetBufferBody(buf.Bytes()) |
||||
} |
||||
} |
||||
|
||||
// Unmarshal unmarshals a payload response for the REST XML protocol.
|
||||
func Unmarshal(r *request.Request) { |
||||
if t := rest.PayloadType(r.Data); t == "structure" || t == "" { |
||||
defer r.HTTPResponse.Body.Close() |
||||
decoder := xml.NewDecoder(r.HTTPResponse.Body) |
||||
err := xmlutil.UnmarshalXML(r.Data, decoder, "") |
||||
if err != nil { |
||||
r.Error = awserr.New("SerializationError", "failed to decode REST XML response", err) |
||||
return |
||||
} |
||||
} else { |
||||
rest.Unmarshal(r) |
||||
} |
||||
} |
||||
|
||||
// UnmarshalMeta unmarshals response headers for the REST XML protocol.
|
||||
func UnmarshalMeta(r *request.Request) { |
||||
rest.UnmarshalMeta(r) |
||||
} |
||||
|
||||
// UnmarshalError unmarshals a response error for the REST XML protocol.
|
||||
func UnmarshalError(r *request.Request) { |
||||
query.UnmarshalError(r) |
||||
} |
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,43 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"io/ioutil" |
||||
"regexp" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/awsutil" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
var reBucketLocation = regexp.MustCompile(`>([^<>]+)<\/Location`) |
||||
|
||||
func buildGetBucketLocation(r *request.Request) { |
||||
if r.DataFilled() { |
||||
out := r.Data.(*GetBucketLocationOutput) |
||||
b, err := ioutil.ReadAll(r.HTTPResponse.Body) |
||||
if err != nil { |
||||
r.Error = awserr.New("SerializationError", "failed reading response body", err) |
||||
return |
||||
} |
||||
|
||||
match := reBucketLocation.FindSubmatch(b) |
||||
if len(match) > 1 { |
||||
loc := string(match[1]) |
||||
out.LocationConstraint = &loc |
||||
} |
||||
} |
||||
} |
||||
|
||||
func populateLocationConstraint(r *request.Request) { |
||||
if r.ParamsFilled() && aws.StringValue(r.Config.Region) != "us-east-1" { |
||||
in := r.Params.(*CreateBucketInput) |
||||
if in.CreateBucketConfiguration == nil { |
||||
r.Params = awsutil.CopyOf(r.Params) |
||||
in = r.Params.(*CreateBucketInput) |
||||
in.CreateBucketConfiguration = &CreateBucketConfiguration{ |
||||
LocationConstraint: r.Config.Region, |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,36 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"crypto/md5" |
||||
"encoding/base64" |
||||
"io" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
// contentMD5 computes and sets the HTTP Content-MD5 header for requests that
|
||||
// require it.
|
||||
func contentMD5(r *request.Request) { |
||||
h := md5.New() |
||||
|
||||
// hash the body. seek back to the first position after reading to reset
|
||||
// the body for transmission. copy errors may be assumed to be from the
|
||||
// body.
|
||||
_, err := io.Copy(h, r.Body) |
||||
if err != nil { |
||||
r.Error = awserr.New("ContentMD5", "failed to read body", err) |
||||
return |
||||
} |
||||
_, err = r.Body.Seek(0, 0) |
||||
if err != nil { |
||||
r.Error = awserr.New("ContentMD5", "failed to seek body", err) |
||||
return |
||||
} |
||||
|
||||
// encode the md5 checksum in base64 and set the request header.
|
||||
sum := h.Sum(nil) |
||||
sum64 := make([]byte, base64.StdEncoding.EncodedLen(len(sum))) |
||||
base64.StdEncoding.Encode(sum64, sum) |
||||
r.HTTPRequest.Header.Set("Content-MD5", string(sum64)) |
||||
} |
@ -0,0 +1,46 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"github.com/aws/aws-sdk-go/aws/client" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
func init() { |
||||
initClient = defaultInitClientFn |
||||
initRequest = defaultInitRequestFn |
||||
} |
||||
|
||||
func defaultInitClientFn(c *client.Client) { |
||||
// Support building custom endpoints based on config
|
||||
c.Handlers.Build.PushFront(updateEndpointForS3Config) |
||||
|
||||
// Require SSL when using SSE keys
|
||||
c.Handlers.Validate.PushBack(validateSSERequiresSSL) |
||||
c.Handlers.Build.PushBack(computeSSEKeys) |
||||
|
||||
// S3 uses custom error unmarshaling logic
|
||||
c.Handlers.UnmarshalError.Clear() |
||||
c.Handlers.UnmarshalError.PushBack(unmarshalError) |
||||
} |
||||
|
||||
func defaultInitRequestFn(r *request.Request) { |
||||
// Add reuest handlers for specific platforms.
|
||||
// e.g. 100-continue support for PUT requests using Go 1.6
|
||||
platformRequestHandlers(r) |
||||
|
||||
switch r.Operation.Name { |
||||
case opPutBucketCors, opPutBucketLifecycle, opPutBucketPolicy, |
||||
opPutBucketTagging, opDeleteObjects, opPutBucketLifecycleConfiguration, |
||||
opPutBucketReplication: |
||||
// These S3 operations require Content-MD5 to be set
|
||||
r.Handlers.Build.PushBack(contentMD5) |
||||
case opGetBucketLocation: |
||||
// GetBucketLocation has custom parsing logic
|
||||
r.Handlers.Unmarshal.PushFront(buildGetBucketLocation) |
||||
case opCreateBucket: |
||||
// Auto-populate LocationConstraint with current region
|
||||
r.Handlers.Validate.PushFront(populateLocationConstraint) |
||||
case opCopyObject, opUploadPartCopy, opCompleteMultipartUpload: |
||||
r.Handlers.Unmarshal.PushFront(copyMultipartStatusOKUnmarhsalError) |
||||
} |
||||
} |
@ -0,0 +1,186 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
"net/url" |
||||
"regexp" |
||||
"strings" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/awsutil" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
// an operationBlacklist is a list of operation names that should a
|
||||
// request handler should not be executed with.
|
||||
type operationBlacklist []string |
||||
|
||||
// Continue will return true of the Request's operation name is not
|
||||
// in the blacklist. False otherwise.
|
||||
func (b operationBlacklist) Continue(r *request.Request) bool { |
||||
for i := 0; i < len(b); i++ { |
||||
if b[i] == r.Operation.Name { |
||||
return false |
||||
} |
||||
} |
||||
return true |
||||
} |
||||
|
||||
var accelerateOpBlacklist = operationBlacklist{ |
||||
opListBuckets, opCreateBucket, opDeleteBucket, |
||||
} |
||||
|
||||
// Request handler to automatically add the bucket name to the endpoint domain
|
||||
// if possible. This style of bucket is valid for all bucket names which are
|
||||
// DNS compatible and do not contain "."
|
||||
func updateEndpointForS3Config(r *request.Request) { |
||||
forceHostStyle := aws.BoolValue(r.Config.S3ForcePathStyle) |
||||
accelerate := aws.BoolValue(r.Config.S3UseAccelerate) |
||||
|
||||
if accelerate && accelerateOpBlacklist.Continue(r) { |
||||
if forceHostStyle { |
||||
if r.Config.Logger != nil { |
||||
r.Config.Logger.Log("ERROR: aws.Config.S3UseAccelerate is not compatible with aws.Config.S3ForcePathStyle, ignoring S3ForcePathStyle.") |
||||
} |
||||
} |
||||
updateEndpointForAccelerate(r) |
||||
} else if !forceHostStyle && r.Operation.Name != opGetBucketLocation { |
||||
updateEndpointForHostStyle(r) |
||||
} |
||||
} |
||||
|
||||
func updateEndpointForHostStyle(r *request.Request) { |
||||
bucket, ok := bucketNameFromReqParams(r.Params) |
||||
if !ok { |
||||
// Ignore operation requests if the bucketname was not provided
|
||||
// if this is an input validation error the validation handler
|
||||
// will report it.
|
||||
return |
||||
} |
||||
|
||||
if !hostCompatibleBucketName(r.HTTPRequest.URL, bucket) { |
||||
// bucket name must be valid to put into the host
|
||||
return |
||||
} |
||||
|
||||
moveBucketToHost(r.HTTPRequest.URL, bucket) |
||||
} |
||||
|
||||
var ( |
||||
accelElem = []byte("s3-accelerate.dualstack.") |
||||
) |
||||
|
||||
func updateEndpointForAccelerate(r *request.Request) { |
||||
bucket, ok := bucketNameFromReqParams(r.Params) |
||||
if !ok { |
||||
// Ignore operation requests if the bucketname was not provided
|
||||
// if this is an input validation error the validation handler
|
||||
// will report it.
|
||||
return |
||||
} |
||||
|
||||
if !hostCompatibleBucketName(r.HTTPRequest.URL, bucket) { |
||||
r.Error = awserr.New("InvalidParameterException", |
||||
fmt.Sprintf("bucket name %s is not compatibile with S3 Accelerate", bucket), |
||||
nil) |
||||
return |
||||
} |
||||
|
||||
// Change endpoint from s3(-[a-z0-1-])?.amazonaws.com to s3-accelerate.amazonaws.com
|
||||
r.HTTPRequest.URL.Host = replaceHostRegion(r.HTTPRequest.URL.Host, "accelerate") |
||||
|
||||
if aws.BoolValue(r.Config.UseDualStack) { |
||||
host := []byte(r.HTTPRequest.URL.Host) |
||||
|
||||
// Strip region from hostname
|
||||
if idx := bytes.Index(host, accelElem); idx >= 0 { |
||||
start := idx + len(accelElem) |
||||
if end := bytes.IndexByte(host[start:], '.'); end >= 0 { |
||||
end += start + 1 |
||||
copy(host[start:], host[end:]) |
||||
host = host[:len(host)-(end-start)] |
||||
r.HTTPRequest.URL.Host = string(host) |
||||
} |
||||
} |
||||
} |
||||
|
||||
moveBucketToHost(r.HTTPRequest.URL, bucket) |
||||
} |
||||
|
||||
// Attempts to retrieve the bucket name from the request input parameters.
|
||||
// If no bucket is found, or the field is empty "", false will be returned.
|
||||
func bucketNameFromReqParams(params interface{}) (string, bool) { |
||||
b, _ := awsutil.ValuesAtPath(params, "Bucket") |
||||
if len(b) == 0 { |
||||
return "", false |
||||
} |
||||
|
||||
if bucket, ok := b[0].(*string); ok { |
||||
if bucketStr := aws.StringValue(bucket); bucketStr != "" { |
||||
return bucketStr, true |
||||
} |
||||
} |
||||
|
||||
return "", false |
||||
} |
||||
|
||||
// hostCompatibleBucketName returns true if the request should
|
||||
// put the bucket in the host. This is false if S3ForcePathStyle is
|
||||
// explicitly set or if the bucket is not DNS compatible.
|
||||
func hostCompatibleBucketName(u *url.URL, bucket string) bool { |
||||
// Bucket might be DNS compatible but dots in the hostname will fail
|
||||
// certificate validation, so do not use host-style.
|
||||
if u.Scheme == "https" && strings.Contains(bucket, ".") { |
||||
return false |
||||
} |
||||
|
||||
// if the bucket is DNS compatible
|
||||
return dnsCompatibleBucketName(bucket) |
||||
} |
||||
|
||||
var reDomain = regexp.MustCompile(`^[a-z0-9][a-z0-9\.\-]{1,61}[a-z0-9]$`) |
||||
var reIPAddress = regexp.MustCompile(`^(\d+\.){3}\d+$`) |
||||
|
||||
// dnsCompatibleBucketName returns true if the bucket name is DNS compatible.
|
||||
// Buckets created outside of the classic region MUST be DNS compatible.
|
||||
func dnsCompatibleBucketName(bucket string) bool { |
||||
return reDomain.MatchString(bucket) && |
||||
!reIPAddress.MatchString(bucket) && |
||||
!strings.Contains(bucket, "..") |
||||
} |
||||
|
||||
// moveBucketToHost moves the bucket name from the URI path to URL host.
|
||||
func moveBucketToHost(u *url.URL, bucket string) { |
||||
u.Host = bucket + "." + u.Host |
||||
u.Path = strings.Replace(u.Path, "/{Bucket}", "", -1) |
||||
if u.Path == "" { |
||||
u.Path = "/" |
||||
} |
||||
} |
||||
|
||||
const s3HostPrefix = "s3" |
||||
|
||||
// replaceHostRegion replaces the S3 region string in the host with the
|
||||
// value provided. If v is empty the host prefix returned will be s3.
|
||||
func replaceHostRegion(host, v string) string { |
||||
if !strings.HasPrefix(host, s3HostPrefix) { |
||||
return host |
||||
} |
||||
|
||||
suffix := host[len(s3HostPrefix):] |
||||
for i := len(s3HostPrefix); i < len(host); i++ { |
||||
if host[i] == '.' { |
||||
// Trim until '.' leave the it in place.
|
||||
suffix = host[i:] |
||||
break |
||||
} |
||||
} |
||||
|
||||
if len(v) == 0 { |
||||
return fmt.Sprintf("s3%s", suffix) |
||||
} |
||||
|
||||
return fmt.Sprintf("s3-%s%s", v, suffix) |
||||
} |
@ -0,0 +1,8 @@
|
||||
// +build !go1.6
|
||||
|
||||
package s3 |
||||
|
||||
import "github.com/aws/aws-sdk-go/aws/request" |
||||
|
||||
func platformRequestHandlers(r *request.Request) { |
||||
} |
@ -0,0 +1,28 @@
|
||||
// +build go1.6
|
||||
|
||||
package s3 |
||||
|
||||
import ( |
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
func platformRequestHandlers(r *request.Request) { |
||||
if r.Operation.HTTPMethod == "PUT" { |
||||
// 100-Continue should only be used on put requests.
|
||||
r.Handlers.Sign.PushBack(add100Continue) |
||||
} |
||||
} |
||||
|
||||
func add100Continue(r *request.Request) { |
||||
if aws.BoolValue(r.Config.S3Disable100Continue) { |
||||
return |
||||
} |
||||
if r.HTTPRequest.ContentLength < 1024*1024*2 { |
||||
// Ignore requests smaller than 2MB. This helps prevent delaying
|
||||
// requests unnecessarily.
|
||||
return |
||||
} |
||||
|
||||
r.HTTPRequest.Header.Set("Expect", "100-Continue") |
||||
} |
@ -0,0 +1,317 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED. DO NOT EDIT.
|
||||
|
||||
// Package s3iface provides an interface to enable mocking the Amazon Simple Storage Service service client
|
||||
// for testing your code.
|
||||
//
|
||||
// It is important to note that this interface will have breaking changes
|
||||
// when the service model is updated and adds new API operations, paginators,
|
||||
// and waiters.
|
||||
package s3iface |
||||
|
||||
import ( |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
"github.com/aws/aws-sdk-go/service/s3" |
||||
) |
||||
|
||||
// S3API provides an interface to enable mocking the
|
||||
// s3.S3 service client's API operation,
|
||||
// paginators, and waiters. This make unit testing your code that calls out
|
||||
// to the SDK's service client's calls easier.
|
||||
//
|
||||
// The best way to use this interface is so the SDK's service client's calls
|
||||
// can be stubbed out for unit testing your code with the SDK without needing
|
||||
// to inject custom request handlers into the the SDK's request pipeline.
|
||||
//
|
||||
// // myFunc uses an SDK service client to make a request to
|
||||
// // Amazon Simple Storage Service.
|
||||
// func myFunc(svc s3iface.S3API) bool {
|
||||
// // Make svc.AbortMultipartUpload request
|
||||
// }
|
||||
//
|
||||
// func main() {
|
||||
// sess := session.New()
|
||||
// svc := s3.New(sess)
|
||||
//
|
||||
// myFunc(svc)
|
||||
// }
|
||||
//
|
||||
// In your _test.go file:
|
||||
//
|
||||
// // Define a mock struct to be used in your unit tests of myFunc.
|
||||
// type mockS3Client struct {
|
||||
// s3iface.S3API
|
||||
// }
|
||||
// func (m *mockS3Client) AbortMultipartUpload(input *s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) {
|
||||
// // mock response/functionality
|
||||
// }
|
||||
//
|
||||
// TestMyFunc(t *testing.T) {
|
||||
// // Setup Test
|
||||
// mockSvc := &mockS3Client{}
|
||||
//
|
||||
// myfunc(mockSvc)
|
||||
//
|
||||
// // Verify myFunc's functionality
|
||||
// }
|
||||
//
|
||||
// It is important to note that this interface will have breaking changes
|
||||
// when the service model is updated and adds new API operations, paginators,
|
||||
// and waiters. Its suggested to use the pattern above for testing, or using
|
||||
// tooling to generate mocks to satisfy the interfaces.
|
||||
type S3API interface { |
||||
AbortMultipartUploadRequest(*s3.AbortMultipartUploadInput) (*request.Request, *s3.AbortMultipartUploadOutput) |
||||
|
||||
AbortMultipartUpload(*s3.AbortMultipartUploadInput) (*s3.AbortMultipartUploadOutput, error) |
||||
|
||||
CompleteMultipartUploadRequest(*s3.CompleteMultipartUploadInput) (*request.Request, *s3.CompleteMultipartUploadOutput) |
||||
|
||||
CompleteMultipartUpload(*s3.CompleteMultipartUploadInput) (*s3.CompleteMultipartUploadOutput, error) |
||||
|
||||
CopyObjectRequest(*s3.CopyObjectInput) (*request.Request, *s3.CopyObjectOutput) |
||||
|
||||
CopyObject(*s3.CopyObjectInput) (*s3.CopyObjectOutput, error) |
||||
|
||||
CreateBucketRequest(*s3.CreateBucketInput) (*request.Request, *s3.CreateBucketOutput) |
||||
|
||||
CreateBucket(*s3.CreateBucketInput) (*s3.CreateBucketOutput, error) |
||||
|
||||
CreateMultipartUploadRequest(*s3.CreateMultipartUploadInput) (*request.Request, *s3.CreateMultipartUploadOutput) |
||||
|
||||
CreateMultipartUpload(*s3.CreateMultipartUploadInput) (*s3.CreateMultipartUploadOutput, error) |
||||
|
||||
DeleteBucketRequest(*s3.DeleteBucketInput) (*request.Request, *s3.DeleteBucketOutput) |
||||
|
||||
DeleteBucket(*s3.DeleteBucketInput) (*s3.DeleteBucketOutput, error) |
||||
|
||||
DeleteBucketCorsRequest(*s3.DeleteBucketCorsInput) (*request.Request, *s3.DeleteBucketCorsOutput) |
||||
|
||||
DeleteBucketCors(*s3.DeleteBucketCorsInput) (*s3.DeleteBucketCorsOutput, error) |
||||
|
||||
DeleteBucketLifecycleRequest(*s3.DeleteBucketLifecycleInput) (*request.Request, *s3.DeleteBucketLifecycleOutput) |
||||
|
||||
DeleteBucketLifecycle(*s3.DeleteBucketLifecycleInput) (*s3.DeleteBucketLifecycleOutput, error) |
||||
|
||||
DeleteBucketPolicyRequest(*s3.DeleteBucketPolicyInput) (*request.Request, *s3.DeleteBucketPolicyOutput) |
||||
|
||||
DeleteBucketPolicy(*s3.DeleteBucketPolicyInput) (*s3.DeleteBucketPolicyOutput, error) |
||||
|
||||
DeleteBucketReplicationRequest(*s3.DeleteBucketReplicationInput) (*request.Request, *s3.DeleteBucketReplicationOutput) |
||||
|
||||
DeleteBucketReplication(*s3.DeleteBucketReplicationInput) (*s3.DeleteBucketReplicationOutput, error) |
||||
|
||||
DeleteBucketTaggingRequest(*s3.DeleteBucketTaggingInput) (*request.Request, *s3.DeleteBucketTaggingOutput) |
||||
|
||||
DeleteBucketTagging(*s3.DeleteBucketTaggingInput) (*s3.DeleteBucketTaggingOutput, error) |
||||
|
||||
DeleteBucketWebsiteRequest(*s3.DeleteBucketWebsiteInput) (*request.Request, *s3.DeleteBucketWebsiteOutput) |
||||
|
||||
DeleteBucketWebsite(*s3.DeleteBucketWebsiteInput) (*s3.DeleteBucketWebsiteOutput, error) |
||||
|
||||
DeleteObjectRequest(*s3.DeleteObjectInput) (*request.Request, *s3.DeleteObjectOutput) |
||||
|
||||
DeleteObject(*s3.DeleteObjectInput) (*s3.DeleteObjectOutput, error) |
||||
|
||||
DeleteObjectsRequest(*s3.DeleteObjectsInput) (*request.Request, *s3.DeleteObjectsOutput) |
||||
|
||||
DeleteObjects(*s3.DeleteObjectsInput) (*s3.DeleteObjectsOutput, error) |
||||
|
||||
GetBucketAccelerateConfigurationRequest(*s3.GetBucketAccelerateConfigurationInput) (*request.Request, *s3.GetBucketAccelerateConfigurationOutput) |
||||
|
||||
GetBucketAccelerateConfiguration(*s3.GetBucketAccelerateConfigurationInput) (*s3.GetBucketAccelerateConfigurationOutput, error) |
||||
|
||||
GetBucketAclRequest(*s3.GetBucketAclInput) (*request.Request, *s3.GetBucketAclOutput) |
||||
|
||||
GetBucketAcl(*s3.GetBucketAclInput) (*s3.GetBucketAclOutput, error) |
||||
|
||||
GetBucketCorsRequest(*s3.GetBucketCorsInput) (*request.Request, *s3.GetBucketCorsOutput) |
||||
|
||||
GetBucketCors(*s3.GetBucketCorsInput) (*s3.GetBucketCorsOutput, error) |
||||
|
||||
GetBucketLifecycleRequest(*s3.GetBucketLifecycleInput) (*request.Request, *s3.GetBucketLifecycleOutput) |
||||
|
||||
GetBucketLifecycle(*s3.GetBucketLifecycleInput) (*s3.GetBucketLifecycleOutput, error) |
||||
|
||||
GetBucketLifecycleConfigurationRequest(*s3.GetBucketLifecycleConfigurationInput) (*request.Request, *s3.GetBucketLifecycleConfigurationOutput) |
||||
|
||||
GetBucketLifecycleConfiguration(*s3.GetBucketLifecycleConfigurationInput) (*s3.GetBucketLifecycleConfigurationOutput, error) |
||||
|
||||
GetBucketLocationRequest(*s3.GetBucketLocationInput) (*request.Request, *s3.GetBucketLocationOutput) |
||||
|
||||
GetBucketLocation(*s3.GetBucketLocationInput) (*s3.GetBucketLocationOutput, error) |
||||
|
||||
GetBucketLoggingRequest(*s3.GetBucketLoggingInput) (*request.Request, *s3.GetBucketLoggingOutput) |
||||
|
||||
GetBucketLogging(*s3.GetBucketLoggingInput) (*s3.GetBucketLoggingOutput, error) |
||||
|
||||
GetBucketNotificationRequest(*s3.GetBucketNotificationConfigurationRequest) (*request.Request, *s3.NotificationConfigurationDeprecated) |
||||
|
||||
GetBucketNotification(*s3.GetBucketNotificationConfigurationRequest) (*s3.NotificationConfigurationDeprecated, error) |
||||
|
||||
GetBucketNotificationConfigurationRequest(*s3.GetBucketNotificationConfigurationRequest) (*request.Request, *s3.NotificationConfiguration) |
||||
|
||||
GetBucketNotificationConfiguration(*s3.GetBucketNotificationConfigurationRequest) (*s3.NotificationConfiguration, error) |
||||
|
||||
GetBucketPolicyRequest(*s3.GetBucketPolicyInput) (*request.Request, *s3.GetBucketPolicyOutput) |
||||
|
||||
GetBucketPolicy(*s3.GetBucketPolicyInput) (*s3.GetBucketPolicyOutput, error) |
||||
|
||||
GetBucketReplicationRequest(*s3.GetBucketReplicationInput) (*request.Request, *s3.GetBucketReplicationOutput) |
||||
|
||||
GetBucketReplication(*s3.GetBucketReplicationInput) (*s3.GetBucketReplicationOutput, error) |
||||
|
||||
GetBucketRequestPaymentRequest(*s3.GetBucketRequestPaymentInput) (*request.Request, *s3.GetBucketRequestPaymentOutput) |
||||
|
||||
GetBucketRequestPayment(*s3.GetBucketRequestPaymentInput) (*s3.GetBucketRequestPaymentOutput, error) |
||||
|
||||
GetBucketTaggingRequest(*s3.GetBucketTaggingInput) (*request.Request, *s3.GetBucketTaggingOutput) |
||||
|
||||
GetBucketTagging(*s3.GetBucketTaggingInput) (*s3.GetBucketTaggingOutput, error) |
||||
|
||||
GetBucketVersioningRequest(*s3.GetBucketVersioningInput) (*request.Request, *s3.GetBucketVersioningOutput) |
||||
|
||||
GetBucketVersioning(*s3.GetBucketVersioningInput) (*s3.GetBucketVersioningOutput, error) |
||||
|
||||
GetBucketWebsiteRequest(*s3.GetBucketWebsiteInput) (*request.Request, *s3.GetBucketWebsiteOutput) |
||||
|
||||
GetBucketWebsite(*s3.GetBucketWebsiteInput) (*s3.GetBucketWebsiteOutput, error) |
||||
|
||||
GetObjectRequest(*s3.GetObjectInput) (*request.Request, *s3.GetObjectOutput) |
||||
|
||||
GetObject(*s3.GetObjectInput) (*s3.GetObjectOutput, error) |
||||
|
||||
GetObjectAclRequest(*s3.GetObjectAclInput) (*request.Request, *s3.GetObjectAclOutput) |
||||
|
||||
GetObjectAcl(*s3.GetObjectAclInput) (*s3.GetObjectAclOutput, error) |
||||
|
||||
GetObjectTorrentRequest(*s3.GetObjectTorrentInput) (*request.Request, *s3.GetObjectTorrentOutput) |
||||
|
||||
GetObjectTorrent(*s3.GetObjectTorrentInput) (*s3.GetObjectTorrentOutput, error) |
||||
|
||||
HeadBucketRequest(*s3.HeadBucketInput) (*request.Request, *s3.HeadBucketOutput) |
||||
|
||||
HeadBucket(*s3.HeadBucketInput) (*s3.HeadBucketOutput, error) |
||||
|
||||
HeadObjectRequest(*s3.HeadObjectInput) (*request.Request, *s3.HeadObjectOutput) |
||||
|
||||
HeadObject(*s3.HeadObjectInput) (*s3.HeadObjectOutput, error) |
||||
|
||||
ListBucketsRequest(*s3.ListBucketsInput) (*request.Request, *s3.ListBucketsOutput) |
||||
|
||||
ListBuckets(*s3.ListBucketsInput) (*s3.ListBucketsOutput, error) |
||||
|
||||
ListMultipartUploadsRequest(*s3.ListMultipartUploadsInput) (*request.Request, *s3.ListMultipartUploadsOutput) |
||||
|
||||
ListMultipartUploads(*s3.ListMultipartUploadsInput) (*s3.ListMultipartUploadsOutput, error) |
||||
|
||||
ListMultipartUploadsPages(*s3.ListMultipartUploadsInput, func(*s3.ListMultipartUploadsOutput, bool) bool) error |
||||
|
||||
ListObjectVersionsRequest(*s3.ListObjectVersionsInput) (*request.Request, *s3.ListObjectVersionsOutput) |
||||
|
||||
ListObjectVersions(*s3.ListObjectVersionsInput) (*s3.ListObjectVersionsOutput, error) |
||||
|
||||
ListObjectVersionsPages(*s3.ListObjectVersionsInput, func(*s3.ListObjectVersionsOutput, bool) bool) error |
||||
|
||||
ListObjectsRequest(*s3.ListObjectsInput) (*request.Request, *s3.ListObjectsOutput) |
||||
|
||||
ListObjects(*s3.ListObjectsInput) (*s3.ListObjectsOutput, error) |
||||
|
||||
ListObjectsPages(*s3.ListObjectsInput, func(*s3.ListObjectsOutput, bool) bool) error |
||||
|
||||
ListObjectsV2Request(*s3.ListObjectsV2Input) (*request.Request, *s3.ListObjectsV2Output) |
||||
|
||||
ListObjectsV2(*s3.ListObjectsV2Input) (*s3.ListObjectsV2Output, error) |
||||
|
||||
ListObjectsV2Pages(*s3.ListObjectsV2Input, func(*s3.ListObjectsV2Output, bool) bool) error |
||||
|
||||
ListPartsRequest(*s3.ListPartsInput) (*request.Request, *s3.ListPartsOutput) |
||||
|
||||
ListParts(*s3.ListPartsInput) (*s3.ListPartsOutput, error) |
||||
|
||||
ListPartsPages(*s3.ListPartsInput, func(*s3.ListPartsOutput, bool) bool) error |
||||
|
||||
PutBucketAccelerateConfigurationRequest(*s3.PutBucketAccelerateConfigurationInput) (*request.Request, *s3.PutBucketAccelerateConfigurationOutput) |
||||
|
||||
PutBucketAccelerateConfiguration(*s3.PutBucketAccelerateConfigurationInput) (*s3.PutBucketAccelerateConfigurationOutput, error) |
||||
|
||||
PutBucketAclRequest(*s3.PutBucketAclInput) (*request.Request, *s3.PutBucketAclOutput) |
||||
|
||||
PutBucketAcl(*s3.PutBucketAclInput) (*s3.PutBucketAclOutput, error) |
||||
|
||||
PutBucketCorsRequest(*s3.PutBucketCorsInput) (*request.Request, *s3.PutBucketCorsOutput) |
||||
|
||||
PutBucketCors(*s3.PutBucketCorsInput) (*s3.PutBucketCorsOutput, error) |
||||
|
||||
PutBucketLifecycleRequest(*s3.PutBucketLifecycleInput) (*request.Request, *s3.PutBucketLifecycleOutput) |
||||
|
||||
PutBucketLifecycle(*s3.PutBucketLifecycleInput) (*s3.PutBucketLifecycleOutput, error) |
||||
|
||||
PutBucketLifecycleConfigurationRequest(*s3.PutBucketLifecycleConfigurationInput) (*request.Request, *s3.PutBucketLifecycleConfigurationOutput) |
||||
|
||||
PutBucketLifecycleConfiguration(*s3.PutBucketLifecycleConfigurationInput) (*s3.PutBucketLifecycleConfigurationOutput, error) |
||||
|
||||
PutBucketLoggingRequest(*s3.PutBucketLoggingInput) (*request.Request, *s3.PutBucketLoggingOutput) |
||||
|
||||
PutBucketLogging(*s3.PutBucketLoggingInput) (*s3.PutBucketLoggingOutput, error) |
||||
|
||||
PutBucketNotificationRequest(*s3.PutBucketNotificationInput) (*request.Request, *s3.PutBucketNotificationOutput) |
||||
|
||||
PutBucketNotification(*s3.PutBucketNotificationInput) (*s3.PutBucketNotificationOutput, error) |
||||
|
||||
PutBucketNotificationConfigurationRequest(*s3.PutBucketNotificationConfigurationInput) (*request.Request, *s3.PutBucketNotificationConfigurationOutput) |
||||
|
||||
PutBucketNotificationConfiguration(*s3.PutBucketNotificationConfigurationInput) (*s3.PutBucketNotificationConfigurationOutput, error) |
||||
|
||||
PutBucketPolicyRequest(*s3.PutBucketPolicyInput) (*request.Request, *s3.PutBucketPolicyOutput) |
||||
|
||||
PutBucketPolicy(*s3.PutBucketPolicyInput) (*s3.PutBucketPolicyOutput, error) |
||||
|
||||
PutBucketReplicationRequest(*s3.PutBucketReplicationInput) (*request.Request, *s3.PutBucketReplicationOutput) |
||||
|
||||
PutBucketReplication(*s3.PutBucketReplicationInput) (*s3.PutBucketReplicationOutput, error) |
||||
|
||||
PutBucketRequestPaymentRequest(*s3.PutBucketRequestPaymentInput) (*request.Request, *s3.PutBucketRequestPaymentOutput) |
||||
|
||||
PutBucketRequestPayment(*s3.PutBucketRequestPaymentInput) (*s3.PutBucketRequestPaymentOutput, error) |
||||
|
||||
PutBucketTaggingRequest(*s3.PutBucketTaggingInput) (*request.Request, *s3.PutBucketTaggingOutput) |
||||
|
||||
PutBucketTagging(*s3.PutBucketTaggingInput) (*s3.PutBucketTaggingOutput, error) |
||||
|
||||
PutBucketVersioningRequest(*s3.PutBucketVersioningInput) (*request.Request, *s3.PutBucketVersioningOutput) |
||||
|
||||
PutBucketVersioning(*s3.PutBucketVersioningInput) (*s3.PutBucketVersioningOutput, error) |
||||
|
||||
PutBucketWebsiteRequest(*s3.PutBucketWebsiteInput) (*request.Request, *s3.PutBucketWebsiteOutput) |
||||
|
||||
PutBucketWebsite(*s3.PutBucketWebsiteInput) (*s3.PutBucketWebsiteOutput, error) |
||||
|
||||
PutObjectRequest(*s3.PutObjectInput) (*request.Request, *s3.PutObjectOutput) |
||||
|
||||
PutObject(*s3.PutObjectInput) (*s3.PutObjectOutput, error) |
||||
|
||||
PutObjectAclRequest(*s3.PutObjectAclInput) (*request.Request, *s3.PutObjectAclOutput) |
||||
|
||||
PutObjectAcl(*s3.PutObjectAclInput) (*s3.PutObjectAclOutput, error) |
||||
|
||||
RestoreObjectRequest(*s3.RestoreObjectInput) (*request.Request, *s3.RestoreObjectOutput) |
||||
|
||||
RestoreObject(*s3.RestoreObjectInput) (*s3.RestoreObjectOutput, error) |
||||
|
||||
UploadPartRequest(*s3.UploadPartInput) (*request.Request, *s3.UploadPartOutput) |
||||
|
||||
UploadPart(*s3.UploadPartInput) (*s3.UploadPartOutput, error) |
||||
|
||||
UploadPartCopyRequest(*s3.UploadPartCopyInput) (*request.Request, *s3.UploadPartCopyOutput) |
||||
|
||||
UploadPartCopy(*s3.UploadPartCopyInput) (*s3.UploadPartCopyOutput, error) |
||||
|
||||
WaitUntilBucketExists(*s3.HeadBucketInput) error |
||||
|
||||
WaitUntilBucketNotExists(*s3.HeadBucketInput) error |
||||
|
||||
WaitUntilObjectExists(*s3.HeadObjectInput) error |
||||
|
||||
WaitUntilObjectNotExists(*s3.HeadObjectInput) error |
||||
} |
||||
|
||||
var _ S3API = (*s3.S3)(nil) |
@ -0,0 +1,3 @@
|
||||
// Package s3manager provides utilities to upload and download objects from
|
||||
// S3 concurrently. Helpful for when working with large objects.
|
||||
package s3manager |
@ -0,0 +1,395 @@
|
||||
package s3manager |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/awsutil" |
||||
"github.com/aws/aws-sdk-go/aws/client" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
"github.com/aws/aws-sdk-go/service/s3" |
||||
"github.com/aws/aws-sdk-go/service/s3/s3iface" |
||||
) |
||||
|
||||
// DefaultDownloadPartSize is the default range of bytes to get at a time when
|
||||
// using Download().
|
||||
const DefaultDownloadPartSize = 1024 * 1024 * 5 |
||||
|
||||
// DefaultDownloadConcurrency is the default number of goroutines to spin up
|
||||
// when using Download().
|
||||
const DefaultDownloadConcurrency = 5 |
||||
|
||||
// The Downloader structure that calls Download(). It is safe to call Download()
|
||||
// on this structure for multiple objects and across concurrent goroutines.
|
||||
// Mutating the Downloader's properties is not safe to be done concurrently.
|
||||
type Downloader struct { |
||||
// The buffer size (in bytes) to use when buffering data into chunks and
|
||||
// sending them as parts to S3. The minimum allowed part size is 5MB, and
|
||||
// if this value is set to zero, the DefaultPartSize value will be used.
|
||||
PartSize int64 |
||||
|
||||
// The number of goroutines to spin up in parallel when sending parts.
|
||||
// If this is set to zero, the DefaultDownloadConcurrency value will be used.
|
||||
Concurrency int |
||||
|
||||
// An S3 client to use when performing downloads.
|
||||
S3 s3iface.S3API |
||||
} |
||||
|
||||
// NewDownloader creates a new Downloader instance to downloads objects from
|
||||
// S3 in concurrent chunks. Pass in additional functional options to customize
|
||||
// the downloader behavior. Requires a client.ConfigProvider in order to create
|
||||
// a S3 service client. The session.Session satisfies the client.ConfigProvider
|
||||
// interface.
|
||||
//
|
||||
// Example:
|
||||
// // The session the S3 Downloader will use
|
||||
// sess, err := session.NewSession()
|
||||
//
|
||||
// // Create a downloader with the session and default options
|
||||
// downloader := s3manager.NewDownloader(sess)
|
||||
//
|
||||
// // Create a downloader with the session and custom options
|
||||
// downloader := s3manager.NewDownloader(sess, func(d *s3manager.Downloader) {
|
||||
// d.PartSize = 64 * 1024 * 1024 // 64MB per part
|
||||
// })
|
||||
func NewDownloader(c client.ConfigProvider, options ...func(*Downloader)) *Downloader { |
||||
d := &Downloader{ |
||||
S3: s3.New(c), |
||||
PartSize: DefaultDownloadPartSize, |
||||
Concurrency: DefaultDownloadConcurrency, |
||||
} |
||||
for _, option := range options { |
||||
option(d) |
||||
} |
||||
|
||||
return d |
||||
} |
||||
|
||||
// NewDownloaderWithClient creates a new Downloader instance to downloads
|
||||
// objects from S3 in concurrent chunks. Pass in additional functional
|
||||
// options to customize the downloader behavior. Requires a S3 service client
|
||||
// to make S3 API calls.
|
||||
//
|
||||
// Example:
|
||||
// // The session the S3 Downloader will use
|
||||
// sess, err := session.NewSession()
|
||||
//
|
||||
// // The S3 client the S3 Downloader will use
|
||||
// s3Svc := s3.new(sess)
|
||||
//
|
||||
// // Create a downloader with the s3 client and default options
|
||||
// downloader := s3manager.NewDownloaderWithClient(s3Svc)
|
||||
//
|
||||
// // Create a downloader with the s3 client and custom options
|
||||
// downloader := s3manager.NewDownloaderWithClient(s3Svc, func(d *s3manager.Downloader) {
|
||||
// d.PartSize = 64 * 1024 * 1024 // 64MB per part
|
||||
// })
|
||||
func NewDownloaderWithClient(svc s3iface.S3API, options ...func(*Downloader)) *Downloader { |
||||
d := &Downloader{ |
||||
S3: svc, |
||||
PartSize: DefaultDownloadPartSize, |
||||
Concurrency: DefaultDownloadConcurrency, |
||||
} |
||||
for _, option := range options { |
||||
option(d) |
||||
} |
||||
|
||||
return d |
||||
} |
||||
|
||||
type maxRetrier interface { |
||||
MaxRetries() int |
||||
} |
||||
|
||||
// Download downloads an object in S3 and writes the payload into w using
|
||||
// concurrent GET requests.
|
||||
//
|
||||
// Additional functional options can be provided to configure the individual
|
||||
// download. These options are copies of the Downloader instance Download is called from.
|
||||
// Modifying the options will not impact the original Downloader instance.
|
||||
//
|
||||
// It is safe to call this method concurrently across goroutines.
|
||||
//
|
||||
// The w io.WriterAt can be satisfied by an os.File to do multipart concurrent
|
||||
// downloads, or in memory []byte wrapper using aws.WriteAtBuffer.
|
||||
func (d Downloader) Download(w io.WriterAt, input *s3.GetObjectInput, options ...func(*Downloader)) (n int64, err error) { |
||||
impl := downloader{w: w, in: input, ctx: d} |
||||
|
||||
for _, option := range options { |
||||
option(&impl.ctx) |
||||
} |
||||
|
||||
if s, ok := d.S3.(maxRetrier); ok { |
||||
impl.partBodyMaxRetries = s.MaxRetries() |
||||
} |
||||
|
||||
impl.totalBytes = -1 |
||||
if impl.ctx.Concurrency == 0 { |
||||
impl.ctx.Concurrency = DefaultDownloadConcurrency |
||||
} |
||||
|
||||
if impl.ctx.PartSize == 0 { |
||||
impl.ctx.PartSize = DefaultDownloadPartSize |
||||
} |
||||
|
||||
return impl.download() |
||||
} |
||||
|
||||
// downloader is the implementation structure used internally by Downloader.
|
||||
type downloader struct { |
||||
ctx Downloader |
||||
|
||||
in *s3.GetObjectInput |
||||
w io.WriterAt |
||||
|
||||
wg sync.WaitGroup |
||||
m sync.Mutex |
||||
|
||||
pos int64 |
||||
totalBytes int64 |
||||
written int64 |
||||
err error |
||||
|
||||
partBodyMaxRetries int |
||||
} |
||||
|
||||
// download performs the implementation of the object download across ranged
|
||||
// GETs.
|
||||
func (d *downloader) download() (n int64, err error) { |
||||
// Spin off first worker to check additional header information
|
||||
d.getChunk() |
||||
|
||||
if total := d.getTotalBytes(); total >= 0 { |
||||
// Spin up workers
|
||||
ch := make(chan dlchunk, d.ctx.Concurrency) |
||||
|
||||
for i := 0; i < d.ctx.Concurrency; i++ { |
||||
d.wg.Add(1) |
||||
go d.downloadPart(ch) |
||||
} |
||||
|
||||
// Assign work
|
||||
for d.getErr() == nil { |
||||
if d.pos >= total { |
||||
break // We're finished queuing chunks
|
||||
} |
||||
|
||||
// Queue the next range of bytes to read.
|
||||
ch <- dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize} |
||||
d.pos += d.ctx.PartSize |
||||
} |
||||
|
||||
// Wait for completion
|
||||
close(ch) |
||||
d.wg.Wait() |
||||
} else { |
||||
// Checking if we read anything new
|
||||
for d.err == nil { |
||||
d.getChunk() |
||||
} |
||||
|
||||
// We expect a 416 error letting us know we are done downloading the
|
||||
// total bytes. Since we do not know the content's length, this will
|
||||
// keep grabbing chunks of data until the range of bytes specified in
|
||||
// the request is out of range of the content. Once, this happens, a
|
||||
// 416 should occur.
|
||||
e, ok := d.err.(awserr.RequestFailure) |
||||
if ok && e.StatusCode() == http.StatusRequestedRangeNotSatisfiable { |
||||
d.err = nil |
||||
} |
||||
} |
||||
|
||||
// Return error
|
||||
return d.written, d.err |
||||
} |
||||
|
||||
// downloadPart is an individual goroutine worker reading from the ch channel
|
||||
// and performing a GetObject request on the data with a given byte range.
|
||||
//
|
||||
// If this is the first worker, this operation also resolves the total number
|
||||
// of bytes to be read so that the worker manager knows when it is finished.
|
||||
func (d *downloader) downloadPart(ch chan dlchunk) { |
||||
defer d.wg.Done() |
||||
for { |
||||
chunk, ok := <-ch |
||||
if !ok || d.getErr() != nil { |
||||
break |
||||
} |
||||
|
||||
if err := d.downloadChunk(chunk); err != nil { |
||||
d.setErr(err) |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
// getChunk grabs a chunk of data from the body.
|
||||
// Not thread safe. Should only used when grabbing data on a single thread.
|
||||
func (d *downloader) getChunk() { |
||||
if d.getErr() != nil { |
||||
return |
||||
} |
||||
|
||||
chunk := dlchunk{w: d.w, start: d.pos, size: d.ctx.PartSize} |
||||
d.pos += d.ctx.PartSize |
||||
|
||||
if err := d.downloadChunk(chunk); err != nil { |
||||
d.setErr(err) |
||||
} |
||||
} |
||||
|
||||
// downloadChunk downloads the chunk froom s3
|
||||
func (d *downloader) downloadChunk(chunk dlchunk) error { |
||||
in := &s3.GetObjectInput{} |
||||
awsutil.Copy(in, d.in) |
||||
|
||||
// Get the next byte range of data
|
||||
rng := fmt.Sprintf("bytes=%d-%d", chunk.start, chunk.start+chunk.size-1) |
||||
in.Range = &rng |
||||
|
||||
var n int64 |
||||
var err error |
||||
for retry := 0; retry <= d.partBodyMaxRetries; retry++ { |
||||
req, resp := d.ctx.S3.GetObjectRequest(in) |
||||
req.Handlers.Build.PushBack(request.MakeAddToUserAgentFreeFormHandler("S3Manager")) |
||||
|
||||
err = req.Send() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
d.setTotalBytes(resp) // Set total if not yet set.
|
||||
|
||||
n, err = io.Copy(&chunk, resp.Body) |
||||
resp.Body.Close() |
||||
if err == nil { |
||||
break |
||||
} |
||||
|
||||
chunk.cur = 0 |
||||
logMessage(d.ctx.S3, aws.LogDebugWithRequestRetries, |
||||
fmt.Sprintf("DEBUG: object part body download interrupted %s, err, %v, retrying attempt %d", |
||||
aws.StringValue(in.Key), err, retry)) |
||||
} |
||||
|
||||
d.incrWritten(n) |
||||
|
||||
return err |
||||
} |
||||
|
||||
func logMessage(svc s3iface.S3API, level aws.LogLevelType, msg string) { |
||||
s, ok := svc.(*s3.S3) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
if s.Config.Logger == nil { |
||||
return |
||||
} |
||||
|
||||
if s.Config.LogLevel.Matches(level) { |
||||
s.Config.Logger.Log(msg) |
||||
} |
||||
} |
||||
|
||||
// getTotalBytes is a thread-safe getter for retrieving the total byte status.
|
||||
func (d *downloader) getTotalBytes() int64 { |
||||
d.m.Lock() |
||||
defer d.m.Unlock() |
||||
|
||||
return d.totalBytes |
||||
} |
||||
|
||||
// setTotalBytes is a thread-safe setter for setting the total byte status.
|
||||
// Will extract the object's total bytes from the Content-Range if the file
|
||||
// will be chunked, or Content-Length. Content-Length is used when the response
|
||||
// does not include a Content-Range. Meaning the object was not chunked. This
|
||||
// occurs when the full file fits within the PartSize directive.
|
||||
func (d *downloader) setTotalBytes(resp *s3.GetObjectOutput) { |
||||
d.m.Lock() |
||||
defer d.m.Unlock() |
||||
|
||||
if d.totalBytes >= 0 { |
||||
return |
||||
} |
||||
|
||||
if resp.ContentRange == nil { |
||||
// ContentRange is nil when the full file contents is provied, and
|
||||
// is not chunked. Use ContentLength instead.
|
||||
if resp.ContentLength != nil { |
||||
d.totalBytes = *resp.ContentLength |
||||
return |
||||
} |
||||
} else { |
||||
parts := strings.Split(*resp.ContentRange, "/") |
||||
|
||||
total := int64(-1) |
||||
var err error |
||||
// Checking for whether or not a numbered total exists
|
||||
// If one does not exist, we will assume the total to be -1, undefined,
|
||||
// and sequentially download each chunk until hitting a 416 error
|
||||
totalStr := parts[len(parts)-1] |
||||
if totalStr != "*" { |
||||
total, err = strconv.ParseInt(totalStr, 10, 64) |
||||
if err != nil { |
||||
d.err = err |
||||
return |
||||
} |
||||
} |
||||
|
||||
d.totalBytes = total |
||||
} |
||||
} |
||||
|
||||
func (d *downloader) incrWritten(n int64) { |
||||
d.m.Lock() |
||||
defer d.m.Unlock() |
||||
|
||||
d.written += n |
||||
} |
||||
|
||||
// getErr is a thread-safe getter for the error object
|
||||
func (d *downloader) getErr() error { |
||||
d.m.Lock() |
||||
defer d.m.Unlock() |
||||
|
||||
return d.err |
||||
} |
||||
|
||||
// setErr is a thread-safe setter for the error object
|
||||
func (d *downloader) setErr(e error) { |
||||
d.m.Lock() |
||||
defer d.m.Unlock() |
||||
|
||||
d.err = e |
||||
} |
||||
|
||||
// dlchunk represents a single chunk of data to write by the worker routine.
|
||||
// This structure also implements an io.SectionReader style interface for
|
||||
// io.WriterAt, effectively making it an io.SectionWriter (which does not
|
||||
// exist).
|
||||
type dlchunk struct { |
||||
w io.WriterAt |
||||
start int64 |
||||
size int64 |
||||
cur int64 |
||||
} |
||||
|
||||
// Write wraps io.WriterAt for the dlchunk, writing from the dlchunk's start
|
||||
// position to its end (or EOF).
|
||||
func (c *dlchunk) Write(p []byte) (n int, err error) { |
||||
if c.cur >= c.size { |
||||
return 0, io.EOF |
||||
} |
||||
|
||||
n, err = c.w.WriteAt(p, c.start+c.cur) |
||||
c.cur += int64(n) |
||||
|
||||
return |
||||
} |
@ -0,0 +1,86 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED. DO NOT EDIT.
|
||||
|
||||
package s3 |
||||
|
||||
import ( |
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/client" |
||||
"github.com/aws/aws-sdk-go/aws/client/metadata" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
"github.com/aws/aws-sdk-go/aws/signer/v4" |
||||
"github.com/aws/aws-sdk-go/private/protocol/restxml" |
||||
) |
||||
|
||||
// S3 is a client for Amazon S3.
|
||||
//The service client's operations are safe to be used concurrently.
|
||||
// It is not safe to mutate any of the client's properties though.
|
||||
type S3 struct { |
||||
*client.Client |
||||
} |
||||
|
||||
// Used for custom client initialization logic
|
||||
var initClient func(*client.Client) |
||||
|
||||
// Used for custom request initialization logic
|
||||
var initRequest func(*request.Request) |
||||
|
||||
// A ServiceName is the name of the service the client will make API calls to.
|
||||
const ServiceName = "s3" |
||||
|
||||
// New creates a new instance of the S3 client with a session.
|
||||
// If additional configuration is needed for the client instance use the optional
|
||||
// aws.Config parameter to add your extra config.
|
||||
//
|
||||
// Example:
|
||||
// // Create a S3 client from just a session.
|
||||
// svc := s3.New(mySession)
|
||||
//
|
||||
// // Create a S3 client with additional configuration
|
||||
// svc := s3.New(mySession, aws.NewConfig().WithRegion("us-west-2"))
|
||||
func New(p client.ConfigProvider, cfgs ...*aws.Config) *S3 { |
||||
c := p.ClientConfig(ServiceName, cfgs...) |
||||
return newClient(*c.Config, c.Handlers, c.Endpoint, c.SigningRegion) |
||||
} |
||||
|
||||
// newClient creates, initializes and returns a new service client instance.
|
||||
func newClient(cfg aws.Config, handlers request.Handlers, endpoint, signingRegion string) *S3 { |
||||
svc := &S3{ |
||||
Client: client.New( |
||||
cfg, |
||||
metadata.ClientInfo{ |
||||
ServiceName: ServiceName, |
||||
SigningRegion: signingRegion, |
||||
Endpoint: endpoint, |
||||
APIVersion: "2006-03-01", |
||||
}, |
||||
handlers, |
||||
), |
||||
} |
||||
|
||||
// Handlers
|
||||
svc.Handlers.Sign.PushBackNamed(v4.SignRequestHandler) |
||||
svc.Handlers.Build.PushBackNamed(restxml.BuildHandler) |
||||
svc.Handlers.Unmarshal.PushBackNamed(restxml.UnmarshalHandler) |
||||
svc.Handlers.UnmarshalMeta.PushBackNamed(restxml.UnmarshalMetaHandler) |
||||
svc.Handlers.UnmarshalError.PushBackNamed(restxml.UnmarshalErrorHandler) |
||||
|
||||
// Run custom client initialization if present
|
||||
if initClient != nil { |
||||
initClient(svc.Client) |
||||
} |
||||
|
||||
return svc |
||||
} |
||||
|
||||
// newRequest creates a new request for a S3 operation and runs any
|
||||
// custom request initialization.
|
||||
func (c *S3) newRequest(op *request.Operation, params, data interface{}) *request.Request { |
||||
req := c.NewRequest(op, params, data) |
||||
|
||||
// Run custom request initialization if present
|
||||
if initRequest != nil { |
||||
initRequest(req) |
||||
} |
||||
|
||||
return req |
||||
} |
@ -0,0 +1,44 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"crypto/md5" |
||||
"encoding/base64" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/awsutil" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
var errSSERequiresSSL = awserr.New("ConfigError", "cannot send SSE keys over HTTP.", nil) |
||||
|
||||
func validateSSERequiresSSL(r *request.Request) { |
||||
if r.HTTPRequest.URL.Scheme != "https" { |
||||
p, _ := awsutil.ValuesAtPath(r.Params, "SSECustomerKey||CopySourceSSECustomerKey") |
||||
if len(p) > 0 { |
||||
r.Error = errSSERequiresSSL |
||||
} |
||||
} |
||||
} |
||||
|
||||
func computeSSEKeys(r *request.Request) { |
||||
headers := []string{ |
||||
"x-amz-server-side-encryption-customer-key", |
||||
"x-amz-copy-source-server-side-encryption-customer-key", |
||||
} |
||||
|
||||
for _, h := range headers { |
||||
md5h := h + "-md5" |
||||
if key := r.HTTPRequest.Header.Get(h); key != "" { |
||||
// Base64-encode the value
|
||||
b64v := base64.StdEncoding.EncodeToString([]byte(key)) |
||||
r.HTTPRequest.Header.Set(h, b64v) |
||||
|
||||
// Add MD5 if it wasn't computed
|
||||
if r.HTTPRequest.Header.Get(md5h) == "" { |
||||
sum := md5.Sum([]byte(key)) |
||||
b64sum := base64.StdEncoding.EncodeToString(sum[:]) |
||||
r.HTTPRequest.Header.Set(md5h, b64sum) |
||||
} |
||||
} |
||||
} |
||||
} |
@ -0,0 +1,36 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"bytes" |
||||
"io/ioutil" |
||||
"net/http" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
func copyMultipartStatusOKUnmarhsalError(r *request.Request) { |
||||
b, err := ioutil.ReadAll(r.HTTPResponse.Body) |
||||
if err != nil { |
||||
r.Error = awserr.New("SerializationError", "unable to read response body", err) |
||||
return |
||||
} |
||||
body := bytes.NewReader(b) |
||||
r.HTTPResponse.Body = aws.ReadSeekCloser(body) |
||||
defer r.HTTPResponse.Body.(aws.ReaderSeekerCloser).Seek(0, 0) |
||||
|
||||
if body.Len() == 0 { |
||||
// If there is no body don't attempt to parse the body.
|
||||
return |
||||
} |
||||
|
||||
unmarshalError(r) |
||||
if err, ok := r.Error.(awserr.Error); ok && err != nil { |
||||
if err.Code() == "SerializationError" { |
||||
r.Error = nil |
||||
return |
||||
} |
||||
r.HTTPResponse.StatusCode = http.StatusServiceUnavailable |
||||
} |
||||
} |
@ -0,0 +1,65 @@
|
||||
package s3 |
||||
|
||||
import ( |
||||
"encoding/xml" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"strings" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
type xmlErrorResponse struct { |
||||
XMLName xml.Name `xml:"Error"` |
||||
Code string `xml:"Code"` |
||||
Message string `xml:"Message"` |
||||
} |
||||
|
||||
func unmarshalError(r *request.Request) { |
||||
defer r.HTTPResponse.Body.Close() |
||||
defer io.Copy(ioutil.Discard, r.HTTPResponse.Body) |
||||
|
||||
// Bucket exists in a different region, and request needs
|
||||
// to be made to the correct region.
|
||||
if r.HTTPResponse.StatusCode == http.StatusMovedPermanently { |
||||
r.Error = awserr.NewRequestFailure( |
||||
awserr.New("BucketRegionError", |
||||
fmt.Sprintf("incorrect region, the bucket is not in '%s' region", |
||||
aws.StringValue(r.Config.Region)), |
||||
nil), |
||||
r.HTTPResponse.StatusCode, |
||||
r.RequestID, |
||||
) |
||||
return |
||||
} |
||||
|
||||
var errCode, errMsg string |
||||
|
||||
// Attempt to parse error from body if it is known
|
||||
resp := &xmlErrorResponse{} |
||||
err := xml.NewDecoder(r.HTTPResponse.Body).Decode(resp) |
||||
if err != nil && err != io.EOF { |
||||
errCode = "SerializationError" |
||||
errMsg = "failed to decode S3 XML error response" |
||||
} else { |
||||
errCode = resp.Code |
||||
errMsg = resp.Message |
||||
} |
||||
|
||||
// Fallback to status code converted to message if still no error code
|
||||
if len(errCode) == 0 { |
||||
statusText := http.StatusText(r.HTTPResponse.StatusCode) |
||||
errCode = strings.Replace(statusText, " ", "", -1) |
||||
errMsg = statusText |
||||
} |
||||
|
||||
r.Error = awserr.NewRequestFailure( |
||||
awserr.New(errCode, errMsg, nil), |
||||
r.HTTPResponse.StatusCode, |
||||
r.RequestID, |
||||
) |
||||
} |
@ -0,0 +1,139 @@
|
||||
// THIS FILE IS AUTOMATICALLY GENERATED. DO NOT EDIT.
|
||||
|
||||
package s3 |
||||
|
||||
import ( |
||||
"github.com/aws/aws-sdk-go/private/waiter" |
||||
) |
||||
|
||||
// WaitUntilBucketExists uses the Amazon S3 API operation
|
||||
// HeadBucket to wait for a condition to be met before returning.
|
||||
// If the condition is not meet within the max attempt window an error will
|
||||
// be returned.
|
||||
func (c *S3) WaitUntilBucketExists(input *HeadBucketInput) error { |
||||
waiterCfg := waiter.Config{ |
||||
Operation: "HeadBucket", |
||||
Delay: 5, |
||||
MaxAttempts: 20, |
||||
Acceptors: []waiter.WaitAcceptor{ |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 200, |
||||
}, |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 301, |
||||
}, |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 403, |
||||
}, |
||||
{ |
||||
State: "retry", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 404, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
w := waiter.Waiter{ |
||||
Client: c, |
||||
Input: input, |
||||
Config: waiterCfg, |
||||
} |
||||
return w.Wait() |
||||
} |
||||
|
||||
// WaitUntilBucketNotExists uses the Amazon S3 API operation
|
||||
// HeadBucket to wait for a condition to be met before returning.
|
||||
// If the condition is not meet within the max attempt window an error will
|
||||
// be returned.
|
||||
func (c *S3) WaitUntilBucketNotExists(input *HeadBucketInput) error { |
||||
waiterCfg := waiter.Config{ |
||||
Operation: "HeadBucket", |
||||
Delay: 5, |
||||
MaxAttempts: 20, |
||||
Acceptors: []waiter.WaitAcceptor{ |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 404, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
w := waiter.Waiter{ |
||||
Client: c, |
||||
Input: input, |
||||
Config: waiterCfg, |
||||
} |
||||
return w.Wait() |
||||
} |
||||
|
||||
// WaitUntilObjectExists uses the Amazon S3 API operation
|
||||
// HeadObject to wait for a condition to be met before returning.
|
||||
// If the condition is not meet within the max attempt window an error will
|
||||
// be returned.
|
||||
func (c *S3) WaitUntilObjectExists(input *HeadObjectInput) error { |
||||
waiterCfg := waiter.Config{ |
||||
Operation: "HeadObject", |
||||
Delay: 5, |
||||
MaxAttempts: 20, |
||||
Acceptors: []waiter.WaitAcceptor{ |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 200, |
||||
}, |
||||
{ |
||||
State: "retry", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 404, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
w := waiter.Waiter{ |
||||
Client: c, |
||||
Input: input, |
||||
Config: waiterCfg, |
||||
} |
||||
return w.Wait() |
||||
} |
||||
|
||||
// WaitUntilObjectNotExists uses the Amazon S3 API operation
|
||||
// HeadObject to wait for a condition to be met before returning.
|
||||
// If the condition is not meet within the max attempt window an error will
|
||||
// be returned.
|
||||
func (c *S3) WaitUntilObjectNotExists(input *HeadObjectInput) error { |
||||
waiterCfg := waiter.Config{ |
||||
Operation: "HeadObject", |
||||
Delay: 5, |
||||
MaxAttempts: 20, |
||||
Acceptors: []waiter.WaitAcceptor{ |
||||
{ |
||||
State: "success", |
||||
Matcher: "status", |
||||
Argument: "", |
||||
Expected: 404, |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
w := waiter.Waiter{ |
||||
Client: c, |
||||
Input: input, |
||||
Config: waiterCfg, |
||||
} |
||||
return w.Wait() |
||||
} |
Loading…
Reference in new issue