diff --git a/pkg/cloudprovider/providers/gce/cloud/BUILD b/pkg/cloudprovider/providers/gce/cloud/BUILD index b4c51d476e..04cd70a811 100644 --- a/pkg/cloudprovider/providers/gce/cloud/BUILD +++ b/pkg/cloudprovider/providers/gce/cloud/BUILD @@ -6,6 +6,7 @@ go_library( "constants.go", "context.go", "doc.go", + "errors.go", "gce_projects.go", "gen.go", "op.go", @@ -32,6 +33,8 @@ go_test( srcs = [ "gen_test.go", "mock_test.go", + "ratelimit_test.go", + "service_test.go", "utils_test.go", ], embed = [":go_default_library"], diff --git a/pkg/cloudprovider/providers/gce/cloud/errors.go b/pkg/cloudprovider/providers/gce/cloud/errors.go new file mode 100644 index 0000000000..1896a6d9e0 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/errors.go @@ -0,0 +1,48 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import "fmt" + +// OperationPollingError occurs when the GCE Operation cannot be retrieved for a prolonged period. +type OperationPollingError struct { + LastPollError error +} + +// Error returns a string representation including the last poll error encountered. +func (e *OperationPollingError) Error() string { + return fmt.Sprintf("GCE operation polling error: %v", e.LastPollError) +} + +// GCEOperationError occurs when the GCE Operation finishes with an error. +type GCEOperationError struct { + // HTTPStatusCode is the HTTP status code of the final error. + // For example, a failed operation may have 400 - BadRequest. + HTTPStatusCode int + // Code is GCE's code of what went wrong. + // For example, RESOURCE_IN_USE_BY_ANOTHER_RESOURCE + Code string + // Message is a human readable message. + // For example, "The network resource 'xxx' is already being used by 'xxx'" + Message string +} + +// Error returns a string representation including the HTTP Status code, GCE's error code +// and a human readable message. +func (e *GCEOperationError) Error() string { + return fmt.Sprintf("GCE %v - %v: %v", e.HTTPStatusCode, e.Code, e.Message) +} diff --git a/pkg/cloudprovider/providers/gce/cloud/op.go b/pkg/cloudprovider/providers/gce/cloud/op.go index 1bd2e0c484..358598776f 100644 --- a/pkg/cloudprovider/providers/gce/cloud/op.go +++ b/pkg/cloudprovider/providers/gce/cloud/op.go @@ -29,10 +29,17 @@ import ( "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) +const ( + operationStatusDone = "DONE" +) + // operation is a GCE operation that can be watied on. type operation interface { // isDone queries GCE for the done status. This call can block. isDone(ctx context.Context) (bool, error) + // error returns the resulting error of the operation. This may be nil if the operations + // was successful. + error() error // rateLimitKey returns the rate limit key to use for the given operation. // This rate limit will govern how fast the server will be polled for // operation completion status. @@ -43,6 +50,7 @@ type gaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *gaOperation) String() string { @@ -71,7 +79,15 @@ func (o *gaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *gaOperation) rateLimitKey() *RateLimitKey { @@ -83,10 +99,15 @@ func (o *gaOperation) rateLimitKey() *RateLimitKey { } } +func (o *gaOperation) error() error { + return o.err +} + type alphaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *alphaOperation) String() string { @@ -115,7 +136,15 @@ func (o *alphaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *alphaOperation) rateLimitKey() *RateLimitKey { @@ -127,10 +156,15 @@ func (o *alphaOperation) rateLimitKey() *RateLimitKey { } } +func (o *alphaOperation) error() error { + return o.err +} + type betaOperation struct { s *Service projectID string key *meta.Key + err error } func (o *betaOperation) String() string { @@ -159,7 +193,15 @@ func (o *betaOperation) isDone(ctx context.Context) (bool, error) { if err != nil { return false, err } - return op != nil && op.Status == "DONE", nil + if op == nil || op.Status != operationStatusDone { + return false, nil + } + + if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil { + e := op.Error.Errors[0] + o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message} + } + return true, nil } func (o *betaOperation) rateLimitKey() *RateLimitKey { @@ -170,3 +212,7 @@ func (o *betaOperation) rateLimitKey() *RateLimitKey { Version: meta.VersionBeta, } } + +func (o *betaOperation) error() error { + return o.err +} diff --git a/pkg/cloudprovider/providers/gce/cloud/ratelimit.go b/pkg/cloudprovider/providers/gce/cloud/ratelimit.go index e38b8f7de3..ca1278a008 100644 --- a/pkg/cloudprovider/providers/gce/cloud/ratelimit.go +++ b/pkg/cloudprovider/providers/gce/cloud/ratelimit.go @@ -47,22 +47,60 @@ type RateLimiter interface { Accept(ctx context.Context, key *RateLimitKey) error } +// acceptor is an object which blocks within Accept until a call is allowed to run. +// Accept is a behavior of the flowcontrol.RateLimiter interface. +type acceptor interface { + // Accept blocks until a call is allowed to run. + Accept() +} + +// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters. +type AcceptRateLimiter struct { + // Acceptor is the underlying rate limiter. + Acceptor acceptor +} + +// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored. +func (rl *AcceptRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { + ch := make(chan struct{}) + go func() { + rl.Acceptor.Accept() + close(ch) + }() + select { + case <-ch: + break + case <-ctx.Done(): + return ctx.Err() + } + return nil +} + // NopRateLimiter is a rate limiter that performs no rate limiting. type NopRateLimiter struct { } -// Accept the operation to be rate limited. +// Accept everything immediately. func (*NopRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { - // Rate limit polling of the Operation status to avoid hammering GCE - // for the status of an operation. - const pollTime = time.Duration(1) * time.Second - if key.Operation == "Get" && key.Service == "Operations" { - select { - case <-time.NewTimer(pollTime).C: - break - case <-ctx.Done(): - return ctx.Err() - } - } return nil } + +// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum +// duration has been met or the context is cancelled. +type MinimumRateLimiter struct { + // RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd. + RateLimiter RateLimiter + // Minimum is the minimum wait time before the underlying ratelimiter is called. + Minimum time.Duration +} + +// Accept blocks on the minimum duration and context. Once the minimum duration is met, +// the func is blocked on the underlying ratelimiter. +func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error { + select { + case <-time.After(m.Minimum): + return m.RateLimiter.Accept(ctx, key) + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go b/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go new file mode 100644 index 0000000000..4bb4512e13 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/ratelimit_test.go @@ -0,0 +1,80 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "context" + "testing" + "time" +) + +type FakeAcceptor struct{ accept func() } + +func (f *FakeAcceptor) Accept() { + f.accept() +} + +func TestAcceptRateLimiter(t *testing.T) { + fa := &FakeAcceptor{accept: func() {}} + arl := &AcceptRateLimiter{fa} + err := arl.Accept(context.Background(), nil) + if err != nil { + t.Errorf("AcceptRateLimiter.Accept() = %v, want nil", err) + } + + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + + fa.accept = func() { time.Sleep(1 * time.Second) } + err = arl.Accept(ctxCancelled, nil) + if err != ctxCancelled.Err() { + t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err()) + } +} + +func TestMinimumRateLimiter(t *testing.T) { + fa := &FakeAcceptor{accept: func() {}} + arl := &AcceptRateLimiter{fa} + var called bool + fa.accept = func() { called = true } + m := &MinimumRateLimiter{RateLimiter: arl, Minimum: 10 * time.Millisecond} + + err := m.Accept(context.Background(), nil) + if err != nil { + t.Errorf("MinimumRateLimiter.Accept = %v, want nil", err) + } + if !called { + t.Errorf("`called` = false, want true") + } + + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + called = false + err = m.Accept(ctxCancelled, nil) + if err != ctxCancelled.Err() { + t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err()) + } + if called { + t.Errorf("`called` = true, want false") + } +} diff --git a/pkg/cloudprovider/providers/gce/cloud/service.go b/pkg/cloudprovider/providers/gce/cloud/service.go index 99ed7d226b..2f332dfff8 100644 --- a/pkg/cloudprovider/providers/gce/cloud/service.go +++ b/pkg/cloudprovider/providers/gce/cloud/service.go @@ -45,19 +45,19 @@ func (s *Service) wrapOperation(anyOp interface{}) (operation, error) { if err != nil { return nil, err } - return &gaOperation{s, r.ProjectID, r.Key}, nil + return &gaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil case *alpha.Operation: r, err := ParseResourceURL(o.SelfLink) if err != nil { return nil, err } - return &alphaOperation{s, r.ProjectID, r.Key}, nil + return &alphaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil case *beta.Operation: r, err := ParseResourceURL(o.SelfLink) if err != nil { return nil, err } - return &betaOperation{s, r.ProjectID, r.Key}, nil + return &betaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil default: return nil, fmt.Errorf("invalid type %T", anyOp) } @@ -72,14 +72,39 @@ func (s *Service) WaitForCompletion(ctx context.Context, genericOp interface{}) glog.Errorf("wrapOperation(%+v) error: %v", genericOp, err) return err } - for done, err := op.isDone(ctx); !done; done, err = op.isDone(ctx) { - if err != nil { - glog.V(4).Infof("op.isDone(%v) error; op = %v, err = %v", ctx, op, err) - return err - } - glog.V(5).Infof("op.isDone(%v) waiting; op = %v", ctx, op) - s.RateLimiter.Accept(ctx, op.rateLimitKey()) - } - glog.V(5).Infof("op.isDone(%v) complete; op = %v", ctx, op) - return nil + + return s.pollOperation(ctx, op) +} + +// pollOperation calls operations.isDone until the function comes back true or context is Done. +// If an error occurs retrieving the operation, the loop will continue until the context is done. +// This is to prevent a transient error from bubbling up to controller-level logic. +func (s *Service) pollOperation(ctx context.Context, op operation) error { + var pollCount int + for { + // Check if context has been cancelled. Note that ctx.Done() must be checked before + // returning ctx.Err(). + select { + case <-ctx.Done(): + glog.V(5).Infof("op.pollOperation(%v, %v) not completed, poll count = %d, ctx.Err = %v", ctx, op, pollCount, ctx.Err()) + return ctx.Err() + default: + // ctx is not canceled, continue immediately + } + + pollCount++ + glog.V(5).Infof("op.isDone(%v) waiting; op = %v, poll count = %d", ctx, op, pollCount) + s.RateLimiter.Accept(ctx, op.rateLimitKey()) + done, err := op.isDone(ctx) + if err != nil { + glog.V(5).Infof("op.isDone(%v) error; op = %v, poll count = %d, err = %v, retrying", ctx, op, pollCount, err) + } + + if done { + break + } + } + + glog.V(5).Infof("op.isDone(%v) complete; op = %v, poll count = %d, op.err = %v", ctx, op, pollCount, op.error()) + return op.error() } diff --git a/pkg/cloudprovider/providers/gce/cloud/service_test.go b/pkg/cloudprovider/providers/gce/cloud/service_test.go new file mode 100644 index 0000000000..fc6fbb51d8 --- /dev/null +++ b/pkg/cloudprovider/providers/gce/cloud/service_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2018 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cloud + +import ( + "context" + "fmt" + "testing" +) + +func TestPollOperation(t *testing.T) { + const totalAttempts = 10 + var attempts int + fo := &fakeOperation{isDoneFunc: func(ctx context.Context) (bool, error) { + attempts++ + if attempts < totalAttempts { + return false, nil + } + return true, nil + }} + s := Service{RateLimiter: &NopRateLimiter{}} + // Check that pollOperation will retry the operation multiple times. + err := s.pollOperation(context.Background(), fo) + if err != nil { + t.Errorf("pollOperation() = %v, want nil", err) + } + if attempts != totalAttempts { + t.Errorf("`attempts` = %d, want %d", attempts, totalAttempts) + } + + // Check that the operation's error is returned. + fo.err = fmt.Errorf("test operation failed") + err = s.pollOperation(context.Background(), fo) + if err != fo.err { + t.Errorf("pollOperation() = %v, want %v", err, fo.err) + } + fo.err = nil + + fo.isDoneFunc = func(ctx context.Context) (bool, error) { + return false, nil + } + // Use context that has been cancelled and expect a context error returned. + ctxCancelled, cancelled := context.WithCancel(context.Background()) + cancelled() + // Verify context is cancelled by now. + <-ctxCancelled.Done() + // Check that pollOperation returns because the context is cancelled. + err = s.pollOperation(ctxCancelled, fo) + if err == nil { + t.Errorf("pollOperation() = nil, want: %v", ctxCancelled.Err()) + } +} + +type fakeOperation struct { + isDoneFunc func(ctx context.Context) (bool, error) + err error + rateKey *RateLimitKey +} + +func (f *fakeOperation) isDone(ctx context.Context) (bool, error) { + return f.isDoneFunc(ctx) +} + +func (f *fakeOperation) error() error { + return f.err +} + +func (f *fakeOperation) rateLimitKey() *RateLimitKey { + return f.rateKey +} diff --git a/pkg/cloudprovider/providers/gce/gce.go b/pkg/cloudprovider/providers/gce/gce.go index 383744f23d..c2defec389 100644 --- a/pkg/cloudprovider/providers/gce/gce.go +++ b/pkg/cloudprovider/providers/gce/gce.go @@ -68,7 +68,7 @@ const ( // AffinityTypeClientIPProto - affinity based on Client IP and port. gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO" - operationPollInterval = 3 * time.Second + operationPollInterval = time.Second // Creating Route in very large clusters, may take more than half an hour. operationPollTimeoutDuration = time.Hour @@ -484,7 +484,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) { glog.Infof("managing multiple zones: %v", config.ManagedZones) } - operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size. + operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(5, 5) // 5 qps, 5 burst. gce := &GCECloud{ service: service, diff --git a/pkg/cloudprovider/providers/gce/gce_addresses.go b/pkg/cloudprovider/providers/gce/gce_addresses.go index 37e2f5e9a7..2bf3e20b05 100644 --- a/pkg/cloudprovider/providers/gce/gce_addresses.go +++ b/pkg/cloudprovider/providers/gce/gce_addresses.go @@ -17,7 +17,6 @@ limitations under the License. package gce import ( - "context" "fmt" "github.com/golang/glog" @@ -44,72 +43,105 @@ func newAddressMetricContextWithVersion(request, region, version string) *metric // ipAddress is specified, it must belong to the current project, eg: an // ephemeral IP associated with a global forwarding rule. func (gce *GCECloud) ReserveGlobalAddress(addr *compute.Address) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("reserve", "") - return mc.Observe(gce.c.GlobalAddresses().Insert(context.Background(), meta.GlobalKey(addr.Name), addr)) + return mc.Observe(gce.c.GlobalAddresses().Insert(ctx, meta.GlobalKey(addr.Name), addr)) } // DeleteGlobalAddress deletes a global address by name. func (gce *GCECloud) DeleteGlobalAddress(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("delete", "") - return mc.Observe(gce.c.GlobalAddresses().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.GlobalAddresses().Delete(ctx, meta.GlobalKey(name))) } // GetGlobalAddress returns the global address by name. func (gce *GCECloud) GetGlobalAddress(name string) (*compute.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("get", "") - v, err := gce.c.GlobalAddresses().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.GlobalAddresses().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // ReserveRegionAddress creates a region address func (gce *GCECloud) ReserveRegionAddress(addr *compute.Address, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("reserve", region) - return mc.Observe(gce.c.Addresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr)) + return mc.Observe(gce.c.Addresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr)) } // ReserveAlphaRegionAddress creates an Alpha, regional address. func (gce *GCECloud) ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("reserve", region) - return mc.Observe(gce.c.AlphaAddresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr)) + return mc.Observe(gce.c.AlphaAddresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr)) } // ReserveBetaRegionAddress creates a beta region address func (gce *GCECloud) ReserveBetaRegionAddress(addr *computebeta.Address, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("reserve", region) - return mc.Observe(gce.c.BetaAddresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr)) + return mc.Observe(gce.c.BetaAddresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr)) } // DeleteRegionAddress deletes a region address by name. func (gce *GCECloud) DeleteRegionAddress(name, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("delete", region) - return mc.Observe(gce.c.Addresses().Delete(context.Background(), meta.RegionalKey(name, region))) + return mc.Observe(gce.c.Addresses().Delete(ctx, meta.RegionalKey(name, region))) } // GetRegionAddress returns the region address by name func (gce *GCECloud) GetRegionAddress(name, region string) (*compute.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("get", region) - v, err := gce.c.Addresses().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.Addresses().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // GetAlphaRegionAddress returns the Alpha, regional address by name. func (gce *GCECloud) GetAlphaRegionAddress(name, region string) (*computealpha.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("get", region) - v, err := gce.c.AlphaAddresses().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.AlphaAddresses().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // GetBetaRegionAddress returns the beta region address by name func (gce *GCECloud) GetBetaRegionAddress(name, region string) (*computebeta.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("get", region) - v, err := gce.c.BetaAddresses().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.BetaAddresses().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // GetRegionAddressByIP returns the regional address matching the given IP address. func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("list", region) - addrs, err := gce.c.Addresses().List(context.Background(), region, filter.Regexp("address", ipAddress)) + addrs, err := gce.c.Addresses().List(ctx, region, filter.Regexp("address", ipAddress)) mc.Observe(err) if err != nil { @@ -129,8 +161,11 @@ func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Ad // GetBetaRegionAddressByIP returns the beta regional address matching the given IP address. func (gce *GCECloud) GetBetaRegionAddressByIP(region, ipAddress string) (*computebeta.Address, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newAddressMetricContext("list", region) - addrs, err := gce.c.BetaAddresses().List(context.Background(), region, filter.Regexp("address", ipAddress)) + addrs, err := gce.c.BetaAddresses().List(ctx, region, filter.Regexp("address", ipAddress)) mc.Observe(err) if err != nil { diff --git a/pkg/cloudprovider/providers/gce/gce_backendservice.go b/pkg/cloudprovider/providers/gce/gce_backendservice.go index d63728038e..4b330a49ed 100644 --- a/pkg/cloudprovider/providers/gce/gce_backendservice.go +++ b/pkg/cloudprovider/providers/gce/gce_backendservice.go @@ -17,12 +17,11 @@ limitations under the License. package gce import ( - "context" - computealpha "google.golang.org/api/compute/v0.alpha" computebeta "google.golang.org/api/compute/v0.beta" compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -37,61 +36,88 @@ func newBackendServiceMetricContextWithVersion(request, region, version string) // GetGlobalBackendService retrieves a backend by name. func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("get", "") - v, err := gce.c.BackendServices().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.BackendServices().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // GetBetaGlobalBackendService retrieves beta backend by name. func (gce *GCECloud) GetBetaGlobalBackendService(name string) (*computebeta.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContextWithVersion("get", "", computeBetaVersion) - v, err := gce.c.BetaBackendServices().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.BetaBackendServices().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // GetAlphaGlobalBackendService retrieves alpha backend by name. func (gce *GCECloud) GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContextWithVersion("get", "", computeAlphaVersion) - v, err := gce.c.AlphaBackendServices().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.AlphaBackendServices().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // UpdateGlobalBackendService applies the given BackendService as an update to // an existing service. func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("update", "") - return mc.Observe(gce.c.BackendServices().Update(context.Background(), meta.GlobalKey(bg.Name), bg)) + return mc.Observe(gce.c.BackendServices().Update(ctx, meta.GlobalKey(bg.Name), bg)) } // UpdateAlphaGlobalBackendService applies the given alpha BackendService as an // update to an existing service. func (gce *GCECloud) UpdateAlphaGlobalBackendService(bg *computealpha.BackendService) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("update", "") - return mc.Observe(gce.c.AlphaBackendServices().Update(context.Background(), meta.GlobalKey(bg.Name), bg)) + return mc.Observe(gce.c.AlphaBackendServices().Update(ctx, meta.GlobalKey(bg.Name), bg)) } // DeleteGlobalBackendService deletes the given BackendService by name. func (gce *GCECloud) DeleteGlobalBackendService(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("delete", "") - return mc.Observe(gce.c.BackendServices().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.BackendServices().Delete(ctx, meta.GlobalKey(name))) } // CreateGlobalBackendService creates the given BackendService. func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("create", "") - return mc.Observe(gce.c.BackendServices().Insert(context.Background(), meta.GlobalKey(bg.Name), bg)) + return mc.Observe(gce.c.BackendServices().Insert(ctx, meta.GlobalKey(bg.Name), bg)) } // CreateAlphaGlobalBackendService creates the given alpha BackendService. func (gce *GCECloud) CreateAlphaGlobalBackendService(bg *computealpha.BackendService) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("create", "") - return mc.Observe(gce.c.AlphaBackendServices().Insert(context.Background(), meta.GlobalKey(bg.Name), bg)) + return mc.Observe(gce.c.AlphaBackendServices().Insert(ctx, meta.GlobalKey(bg.Name), bg)) } // ListGlobalBackendServices lists all backend services in the project. func (gce *GCECloud) ListGlobalBackendServices() ([]*compute.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("list", "") - v, err := gce.c.BackendServices().List(context.Background(), filter.None) + v, err := gce.c.BackendServices().List(ctx, filter.None) return v, mc.Observe(err) } @@ -99,42 +125,60 @@ func (gce *GCECloud) ListGlobalBackendServices() ([]*compute.BackendService, err // identified by the given name, in the given instanceGroup. The // instanceGroupLink is the fully qualified self link of an instance group. func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("get_health", "") groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink} - v, err := gce.c.BackendServices().GetHealth(context.Background(), meta.GlobalKey(name), groupRef) + v, err := gce.c.BackendServices().GetHealth(ctx, meta.GlobalKey(name), groupRef) return v, mc.Observe(err) } // GetRegionBackendService retrieves a backend by name. func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("get", region) - v, err := gce.c.RegionBackendServices().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.RegionBackendServices().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // UpdateRegionBackendService applies the given BackendService as an update to // an existing service. func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("update", region) - return mc.Observe(gce.c.RegionBackendServices().Update(context.Background(), meta.RegionalKey(bg.Name, region), bg)) + return mc.Observe(gce.c.RegionBackendServices().Update(ctx, meta.RegionalKey(bg.Name, region), bg)) } // DeleteRegionBackendService deletes the given BackendService by name. func (gce *GCECloud) DeleteRegionBackendService(name, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("delete", region) - return mc.Observe(gce.c.RegionBackendServices().Delete(context.Background(), meta.RegionalKey(name, region))) + return mc.Observe(gce.c.RegionBackendServices().Delete(ctx, meta.RegionalKey(name, region))) } // CreateRegionBackendService creates the given BackendService. func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("create", region) - return mc.Observe(gce.c.RegionBackendServices().Insert(context.Background(), meta.RegionalKey(bg.Name, region), bg)) + return mc.Observe(gce.c.RegionBackendServices().Insert(ctx, meta.RegionalKey(bg.Name, region), bg)) } // ListRegionBackendServices lists all backend services in the project. func (gce *GCECloud) ListRegionBackendServices(region string) ([]*compute.BackendService, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("list", region) - v, err := gce.c.RegionBackendServices().List(context.Background(), region, filter.None) + v, err := gce.c.RegionBackendServices().List(ctx, region, filter.None) return v, mc.Observe(err) } @@ -142,22 +186,31 @@ func (gce *GCECloud) ListRegionBackendServices(region string) ([]*compute.Backen // identified by the given name, in the given instanceGroup. The // instanceGroupLink is the fully qualified self link of an instance group. func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContext("get_health", region) ref := &compute.ResourceGroupReference{Group: instanceGroupLink} - v, err := gce.c.RegionBackendServices().GetHealth(context.Background(), meta.RegionalKey(name, region), ref) + v, err := gce.c.RegionBackendServices().GetHealth(ctx, meta.RegionalKey(name, region), ref) return v, mc.Observe(err) } // SetSecurityPolicyForBetaGlobalBackendService sets the given // SecurityPolicyReference for the BackendService identified by the given name. func (gce *GCECloud) SetSecurityPolicyForBetaGlobalBackendService(backendServiceName string, securityPolicyReference *computebeta.SecurityPolicyReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContextWithVersion("set_security_policy", "", computeBetaVersion) - return mc.Observe(gce.c.BetaBackendServices().SetSecurityPolicy(context.Background(), meta.GlobalKey(backendServiceName), securityPolicyReference)) + return mc.Observe(gce.c.BetaBackendServices().SetSecurityPolicy(ctx, meta.GlobalKey(backendServiceName), securityPolicyReference)) } // SetSecurityPolicyForAlphaGlobalBackendService sets the given // SecurityPolicyReference for the BackendService identified by the given name. func (gce *GCECloud) SetSecurityPolicyForAlphaGlobalBackendService(backendServiceName string, securityPolicyReference *computealpha.SecurityPolicyReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newBackendServiceMetricContextWithVersion("set_security_policy", "", computeAlphaVersion) - return mc.Observe(gce.c.AlphaBackendServices().SetSecurityPolicy(context.Background(), meta.GlobalKey(backendServiceName), securityPolicyReference)) + return mc.Observe(gce.c.AlphaBackendServices().SetSecurityPolicy(ctx, meta.GlobalKey(backendServiceName), securityPolicyReference)) } diff --git a/pkg/cloudprovider/providers/gce/gce_cert.go b/pkg/cloudprovider/providers/gce/gce_cert.go index f91ce879e3..3b6614f816 100644 --- a/pkg/cloudprovider/providers/gce/gce_cert.go +++ b/pkg/cloudprovider/providers/gce/gce_cert.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -31,15 +30,21 @@ func newCertMetricContext(request string) *metricContext { // GetSslCertificate returns the SslCertificate by name. func (gce *GCECloud) GetSslCertificate(name string) (*compute.SslCertificate, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newCertMetricContext("get") - v, err := gce.c.SslCertificates().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.SslCertificates().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // CreateSslCertificate creates and returns a SslCertificate. func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*compute.SslCertificate, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newCertMetricContext("create") - err := gce.c.SslCertificates().Insert(context.Background(), meta.GlobalKey(sslCerts.Name), sslCerts) + err := gce.c.SslCertificates().Insert(ctx, meta.GlobalKey(sslCerts.Name), sslCerts) if err != nil { return nil, mc.Observe(err) } @@ -48,13 +53,19 @@ func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*co // DeleteSslCertificate deletes the SslCertificate by name. func (gce *GCECloud) DeleteSslCertificate(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newCertMetricContext("delete") - return mc.Observe(gce.c.SslCertificates().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.SslCertificates().Delete(ctx, meta.GlobalKey(name))) } // ListSslCertificates lists all SslCertificates in the project. func (gce *GCECloud) ListSslCertificates() ([]*compute.SslCertificate, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newCertMetricContext("list") - v, err := gce.c.SslCertificates().List(context.Background(), filter.None) + v, err := gce.c.SslCertificates().List(ctx, filter.None) return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_firewall.go b/pkg/cloudprovider/providers/gce/gce_firewall.go index 88db0ebd3b..e138df8747 100644 --- a/pkg/cloudprovider/providers/gce/gce_firewall.go +++ b/pkg/cloudprovider/providers/gce/gce_firewall.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -30,25 +29,37 @@ func newFirewallMetricContext(request string) *metricContext { // GetFirewall returns the Firewall by name. func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newFirewallMetricContext("get") - v, err := gce.c.Firewalls().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.Firewalls().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // CreateFirewall creates the passed firewall func (gce *GCECloud) CreateFirewall(f *compute.Firewall) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newFirewallMetricContext("create") - return mc.Observe(gce.c.Firewalls().Insert(context.Background(), meta.GlobalKey(f.Name), f)) + return mc.Observe(gce.c.Firewalls().Insert(ctx, meta.GlobalKey(f.Name), f)) } // DeleteFirewall deletes the given firewall rule. func (gce *GCECloud) DeleteFirewall(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newFirewallMetricContext("delete") - return mc.Observe(gce.c.Firewalls().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.Firewalls().Delete(ctx, meta.GlobalKey(name))) } // UpdateFirewall applies the given firewall as an update to an existing service. func (gce *GCECloud) UpdateFirewall(f *compute.Firewall) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newFirewallMetricContext("update") - return mc.Observe(gce.c.Firewalls().Update(context.Background(), meta.GlobalKey(f.Name), f)) + return mc.Observe(gce.c.Firewalls().Update(ctx, meta.GlobalKey(f.Name), f)) } diff --git a/pkg/cloudprovider/providers/gce/gce_forwardingrule.go b/pkg/cloudprovider/providers/gce/gce_forwardingrule.go index 9fe0da9f7c..b40652c98e 100644 --- a/pkg/cloudprovider/providers/gce/gce_forwardingrule.go +++ b/pkg/cloudprovider/providers/gce/gce_forwardingrule.go @@ -17,8 +17,6 @@ limitations under the License. package gce import ( - "context" - computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" @@ -35,84 +33,120 @@ func newForwardingRuleMetricContextWithVersion(request, region, version string) // CreateGlobalForwardingRule creates the passed GlobalForwardingRule func (gce *GCECloud) CreateGlobalForwardingRule(rule *compute.ForwardingRule) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("create", "") - return mc.Observe(gce.c.GlobalForwardingRules().Insert(context.Background(), meta.GlobalKey(rule.Name), rule)) + return mc.Observe(gce.c.GlobalForwardingRules().Insert(ctx, meta.GlobalKey(rule.Name), rule)) } // SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule. // targetProxyLink is the SelfLink of a TargetHttp(s)Proxy. func (gce *GCECloud) SetProxyForGlobalForwardingRule(forwardingRuleName, targetProxyLink string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("set_proxy", "") target := &compute.TargetReference{Target: targetProxyLink} - return mc.Observe(gce.c.GlobalForwardingRules().SetTarget(context.Background(), meta.GlobalKey(forwardingRuleName), target)) + return mc.Observe(gce.c.GlobalForwardingRules().SetTarget(ctx, meta.GlobalKey(forwardingRuleName), target)) } // DeleteGlobalForwardingRule deletes the GlobalForwardingRule by name. func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("delete", "") - return mc.Observe(gce.c.GlobalForwardingRules().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.GlobalForwardingRules().Delete(ctx, meta.GlobalKey(name))) } // GetGlobalForwardingRule returns the GlobalForwardingRule by name. func (gce *GCECloud) GetGlobalForwardingRule(name string) (*compute.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("get", "") - v, err := gce.c.GlobalForwardingRules().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.GlobalForwardingRules().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // ListGlobalForwardingRules lists all GlobalForwardingRules in the project. func (gce *GCECloud) ListGlobalForwardingRules() ([]*compute.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("list", "") - v, err := gce.c.GlobalForwardingRules().List(context.Background(), filter.None) + v, err := gce.c.GlobalForwardingRules().List(ctx, filter.None) return v, mc.Observe(err) } // GetRegionForwardingRule returns the RegionalForwardingRule by name & region. func (gce *GCECloud) GetRegionForwardingRule(name, region string) (*compute.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("get", region) - v, err := gce.c.ForwardingRules().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.ForwardingRules().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // GetAlphaRegionForwardingRule returns the Alpha forwarding rule by name & region. func (gce *GCECloud) GetAlphaRegionForwardingRule(name, region string) (*computealpha.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContextWithVersion("get", region, computeAlphaVersion) - v, err := gce.c.AlphaForwardingRules().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.AlphaForwardingRules().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // ListRegionForwardingRules lists all RegionalForwardingRules in the project & region. func (gce *GCECloud) ListRegionForwardingRules(region string) ([]*compute.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("list", region) - v, err := gce.c.ForwardingRules().List(context.Background(), region, filter.None) + v, err := gce.c.ForwardingRules().List(ctx, region, filter.None) return v, mc.Observe(err) } // ListAlphaRegionForwardingRules lists all RegionalForwardingRules in the project & region. func (gce *GCECloud) ListAlphaRegionForwardingRules(region string) ([]*computealpha.ForwardingRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContextWithVersion("list", region, computeAlphaVersion) - v, err := gce.c.AlphaForwardingRules().List(context.Background(), region, filter.None) + v, err := gce.c.AlphaForwardingRules().List(ctx, region, filter.None) return v, mc.Observe(err) } // CreateRegionForwardingRule creates and returns a // RegionalForwardingRule that points to the given BackendService func (gce *GCECloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("create", region) - return mc.Observe(gce.c.ForwardingRules().Insert(context.Background(), meta.RegionalKey(rule.Name, region), rule)) + return mc.Observe(gce.c.ForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule)) } // CreateAlphaRegionForwardingRule creates and returns an Alpha // forwarding fule in the given region. func (gce *GCECloud) CreateAlphaRegionForwardingRule(rule *computealpha.ForwardingRule, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContextWithVersion("create", region, computeAlphaVersion) - return mc.Observe(gce.c.AlphaForwardingRules().Insert(context.Background(), meta.RegionalKey(rule.Name, region), rule)) + return mc.Observe(gce.c.AlphaForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule)) } // DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region. func (gce *GCECloud) DeleteRegionForwardingRule(name, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newForwardingRuleMetricContext("delete", region) - return mc.Observe(gce.c.ForwardingRules().Delete(context.Background(), meta.RegionalKey(name, region))) + return mc.Observe(gce.c.ForwardingRules().Delete(ctx, meta.RegionalKey(name, region))) } // TODO(#51665): retire this function once Network Tiers becomes Beta in GCP. diff --git a/pkg/cloudprovider/providers/gce/gce_healthchecks.go b/pkg/cloudprovider/providers/gce/gce_healthchecks.go index e4c0829e10..937ba29507 100644 --- a/pkg/cloudprovider/providers/gce/gce_healthchecks.go +++ b/pkg/cloudprovider/providers/gce/gce_healthchecks.go @@ -17,14 +17,13 @@ limitations under the License. package gce import ( - "context" - "github.com/golang/glog" computealpha "google.golang.org/api/compute/v0.alpha" compute "google.golang.org/api/compute/v1" "k8s.io/api/core/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" "k8s.io/kubernetes/pkg/master/ports" @@ -58,33 +57,48 @@ func newHealthcheckMetricContextWithVersion(request, version string) *metricCont // GetHttpHealthCheck returns the given HttpHealthCheck by name. func (gce *GCECloud) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("get_legacy") - v, err := gce.c.HttpHealthChecks().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.HttpHealthChecks().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // UpdateHttpHealthCheck applies the given HttpHealthCheck as an update. func (gce *GCECloud) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("update_legacy") - return mc.Observe(gce.c.HttpHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HttpHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc)) } // DeleteHttpHealthCheck deletes the given HttpHealthCheck by name. func (gce *GCECloud) DeleteHttpHealthCheck(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("delete_legacy") - return mc.Observe(gce.c.HttpHealthChecks().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.HttpHealthChecks().Delete(ctx, meta.GlobalKey(name))) } // CreateHttpHealthCheck creates the given HttpHealthCheck. func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("create_legacy") - return mc.Observe(gce.c.HttpHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HttpHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc)) } // ListHttpHealthChecks lists all HttpHealthChecks in the project. func (gce *GCECloud) ListHttpHealthChecks() ([]*compute.HttpHealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("list_legacy") - v, err := gce.c.HttpHealthChecks().List(context.Background(), filter.None) + v, err := gce.c.HttpHealthChecks().List(ctx, filter.None) return v, mc.Observe(err) } @@ -92,33 +106,48 @@ func (gce *GCECloud) ListHttpHealthChecks() ([]*compute.HttpHealthCheck, error) // GetHttpsHealthCheck returns the given HttpsHealthCheck by name. func (gce *GCECloud) GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("get_legacy") - v, err := gce.c.HttpsHealthChecks().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.HttpsHealthChecks().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // UpdateHttpsHealthCheck applies the given HttpsHealthCheck as an update. func (gce *GCECloud) UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("update_legacy") - return mc.Observe(gce.c.HttpsHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HttpsHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc)) } // DeleteHttpsHealthCheck deletes the given HttpsHealthCheck by name. func (gce *GCECloud) DeleteHttpsHealthCheck(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("delete_legacy") - return mc.Observe(gce.c.HttpsHealthChecks().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.HttpsHealthChecks().Delete(ctx, meta.GlobalKey(name))) } // CreateHttpsHealthCheck creates the given HttpsHealthCheck. func (gce *GCECloud) CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("create_legacy") - return mc.Observe(gce.c.HttpsHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HttpsHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc)) } // ListHttpsHealthChecks lists all HttpsHealthChecks in the project. func (gce *GCECloud) ListHttpsHealthChecks() ([]*compute.HttpsHealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("list_legacy") - v, err := gce.c.HttpsHealthChecks().List(context.Background(), filter.None) + v, err := gce.c.HttpsHealthChecks().List(ctx, filter.None) return v, mc.Observe(err) } @@ -126,52 +155,76 @@ func (gce *GCECloud) ListHttpsHealthChecks() ([]*compute.HttpsHealthCheck, error // GetHealthCheck returns the given HealthCheck by name. func (gce *GCECloud) GetHealthCheck(name string) (*compute.HealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("get") - v, err := gce.c.HealthChecks().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.HealthChecks().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // GetAlphaHealthCheck returns the given alpha HealthCheck by name. func (gce *GCECloud) GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContextWithVersion("get", computeAlphaVersion) - v, err := gce.c.AlphaHealthChecks().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.AlphaHealthChecks().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // UpdateHealthCheck applies the given HealthCheck as an update. func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("update") - return mc.Observe(gce.c.HealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc)) } // UpdateAlphaHealthCheck applies the given alpha HealthCheck as an update. func (gce *GCECloud) UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContextWithVersion("update", computeAlphaVersion) - return mc.Observe(gce.c.AlphaHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.AlphaHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc)) } // DeleteHealthCheck deletes the given HealthCheck by name. func (gce *GCECloud) DeleteHealthCheck(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("delete") - return mc.Observe(gce.c.HealthChecks().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.HealthChecks().Delete(ctx, meta.GlobalKey(name))) } // CreateHealthCheck creates the given HealthCheck. func (gce *GCECloud) CreateHealthCheck(hc *compute.HealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("create") - return mc.Observe(gce.c.HealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.HealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc)) } // CreateAlphaHealthCheck creates the given alpha HealthCheck. func (gce *GCECloud) CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContextWithVersion("create", computeAlphaVersion) - return mc.Observe(gce.c.AlphaHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc)) + return mc.Observe(gce.c.AlphaHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc)) } // ListHealthChecks lists all HealthCheck in the project. func (gce *GCECloud) ListHealthChecks() ([]*compute.HealthCheck, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newHealthcheckMetricContext("list") - v, err := gce.c.HealthChecks().List(context.Background(), filter.None) + v, err := gce.c.HealthChecks().List(ctx, filter.None) return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_instancegroup.go b/pkg/cloudprovider/providers/gce/gce_instancegroup.go index 84b9724bfa..13b2c51e50 100644 --- a/pkg/cloudprovider/providers/gce/gce_instancegroup.go +++ b/pkg/cloudprovider/providers/gce/gce_instancegroup.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -32,36 +31,51 @@ func newInstanceGroupMetricContext(request string, zone string) *metricContext { // CreateInstanceGroup creates an instance group with the given // instances. It is the callers responsibility to add named ports. func (gce *GCECloud) CreateInstanceGroup(ig *compute.InstanceGroup, zone string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("create", zone) - return mc.Observe(gce.c.InstanceGroups().Insert(context.Background(), meta.ZonalKey(ig.Name, zone), ig)) + return mc.Observe(gce.c.InstanceGroups().Insert(ctx, meta.ZonalKey(ig.Name, zone), ig)) } // DeleteInstanceGroup deletes an instance group. func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("delete", zone) - return mc.Observe(gce.c.InstanceGroups().Delete(context.Background(), meta.ZonalKey(name, zone))) + return mc.Observe(gce.c.InstanceGroups().Delete(ctx, meta.ZonalKey(name, zone))) } // ListInstanceGroups lists all InstanceGroups in the project and // zone. func (gce *GCECloud) ListInstanceGroups(zone string) ([]*compute.InstanceGroup, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("list", zone) - v, err := gce.c.InstanceGroups().List(context.Background(), zone, filter.None) + v, err := gce.c.InstanceGroups().List(ctx, zone, filter.None) return v, mc.Observe(err) } // ListInstancesInInstanceGroup lists all the instances in a given // instance group and state. func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) ([]*compute.InstanceWithNamedPorts, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("list_instances", zone) req := &compute.InstanceGroupsListInstancesRequest{InstanceState: state} - v, err := gce.c.InstanceGroups().ListInstances(context.Background(), meta.ZonalKey(name, zone), req, filter.None) + v, err := gce.c.InstanceGroups().ListInstances(ctx, meta.ZonalKey(name, zone), req, filter.None) return v, mc.Observe(err) } // AddInstancesToInstanceGroup adds the given instances to the given // instance group. func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("add_instances", zone) // TODO: should cull operation above this layer. if len(instanceRefs) == 0 { @@ -70,12 +84,15 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta req := &compute.InstanceGroupsAddInstancesRequest{ Instances: instanceRefs, } - return mc.Observe(gce.c.InstanceGroups().AddInstances(context.Background(), meta.ZonalKey(name, zone), req)) + return mc.Observe(gce.c.InstanceGroups().AddInstances(ctx, meta.ZonalKey(name, zone), req)) } // RemoveInstancesFromInstanceGroup removes the given instances from // the instance group. func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("remove_instances", zone) // TODO: should cull operation above this layer. if len(instanceRefs) == 0 { @@ -84,19 +101,25 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, req := &compute.InstanceGroupsRemoveInstancesRequest{ Instances: instanceRefs, } - return mc.Observe(gce.c.InstanceGroups().RemoveInstances(context.Background(), meta.ZonalKey(name, zone), req)) + return mc.Observe(gce.c.InstanceGroups().RemoveInstances(ctx, meta.ZonalKey(name, zone), req)) } // SetNamedPortsOfInstanceGroup sets the list of named ports on a given instance group func (gce *GCECloud) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("set_namedports", zone) req := &compute.InstanceGroupsSetNamedPortsRequest{NamedPorts: namedPorts} - return mc.Observe(gce.c.InstanceGroups().SetNamedPorts(context.Background(), meta.ZonalKey(igName, zone), req)) + return mc.Observe(gce.c.InstanceGroups().SetNamedPorts(ctx, meta.ZonalKey(igName, zone), req)) } // GetInstanceGroup returns an instance group by name. func (gce *GCECloud) GetInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstanceGroupMetricContext("get", zone) - v, err := gce.c.InstanceGroups().Get(context.Background(), meta.ZonalKey(name, zone)) + v, err := gce.c.InstanceGroups().Get(ctx, meta.ZonalKey(name, zone)) return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_instances.go b/pkg/cloudprovider/providers/gce/gce_instances.go index 69c487ecfc..0d4dc63149 100644 --- a/pkg/cloudprovider/providers/gce/gce_instances.go +++ b/pkg/cloudprovider/providers/gce/gce_instances.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis" @@ -99,12 +100,15 @@ func (gce *GCECloud) NodeAddresses(_ context.Context, _ types.NodeName) ([]v1.No // NodeAddressesByProviderID will not be called from the node that is requesting this ID. // i.e. metadata service and other local methods cannot be used here func (gce *GCECloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + _, zone, name, err := splitProviderID(providerID) if err != nil { return []v1.NodeAddress{}, err } - instance, err := gce.c.Instances().Get(context.Background(), meta.ZonalKey(canonicalizeInstanceName(name), zone)) + instance, err := gce.c.Instances().Get(ctx, meta.ZonalKey(canonicalizeInstanceName(name), zone)) if err != nil { return []v1.NodeAddress{}, fmt.Errorf("error while querying for providerID %q: %v", providerID, err) } @@ -212,8 +216,11 @@ func (gce *GCECloud) InstanceType(ctx context.Context, nodeName types.NodeName) } func (gce *GCECloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) { - project, err := gce.c.Projects().Get(context.Background(), gce.projectID) + project, err := gce.c.Projects().Get(ctx, gce.projectID) if err != nil { glog.Errorf("Could not get project: %v", err) return false, nil @@ -244,7 +251,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(ctx context.Context, user string, k } mc := newInstancesMetricContext("add_ssh_key", "") - err = gce.c.Projects().SetCommonInstanceMetadata(context.Background(), gce.projectID, project.CommonInstanceMetadata) + err = gce.c.Projects().SetCommonInstanceMetadata(ctx, gce.projectID, project.CommonInstanceMetadata) mc.Observe(err) if err != nil { @@ -284,9 +291,12 @@ func (gce *GCECloud) GetAllCurrentZones() (sets.String, error) { // // TODO: this should be removed from the cloud provider. func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + zones := sets.NewString() for _, zone := range gce.managedZones { - instances, err := gce.c.Instances().List(context.Background(), zone, filter.None) + instances, err := gce.c.Instances().List(ctx, zone, filter.None) if err != nil { return sets.NewString(), err } @@ -299,15 +309,21 @@ func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) { // InsertInstance creates a new instance on GCP func (gce *GCECloud) InsertInstance(project string, zone string, i *compute.Instance) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newInstancesMetricContext("create", zone) - return mc.Observe(gce.c.Instances().Insert(context.Background(), meta.ZonalKey(i.Name, zone), i)) + return mc.Observe(gce.c.Instances().Insert(ctx, meta.ZonalKey(i.Name, zone), i)) } // ListInstanceNames returns a string of instance names separated by spaces. // This method should only be used for e2e testing. // TODO: remove this method. func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) { - l, err := gce.c.Instances().List(context.Background(), zone, filter.None) + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + + l, err := gce.c.Instances().List(ctx, zone, filter.None) if err != nil { return "", err } @@ -320,7 +336,10 @@ func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) { // DeleteInstance deletes an instance specified by project, zone, and name func (gce *GCECloud) DeleteInstance(project, zone, name string) error { - return gce.c.Instances().Delete(context.Background(), meta.ZonalKey(name, zone)) + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + + return gce.c.Instances().Delete(ctx, meta.ZonalKey(name, zone)) } // Implementation of Instances.CurrentNodeName @@ -332,6 +351,9 @@ func (gce *GCECloud) CurrentNodeName(ctx context.Context, hostname string) (type // `node` for allocation to pods. Returns a list of the form // "/". func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + var instance *gceInstance instance, err = gce.getInstanceByName(mapNodeNameToInstanceName(nodeName)) if err != nil { @@ -339,7 +361,7 @@ func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err e } var res *computebeta.Instance - res, err = gce.c.BetaInstances().Get(context.Background(), meta.ZonalKey(instance.Name, lastComponent(instance.Zone))) + res, err = gce.c.BetaInstances().Get(ctx, meta.ZonalKey(instance.Name, lastComponent(instance.Zone))) if err != nil { return } @@ -355,12 +377,14 @@ func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err e // AddAliasToInstance adds an alias to the given instance from the named // secondary range. func (gce *GCECloud) AddAliasToInstance(nodeName types.NodeName, alias *net.IPNet) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() v1instance, err := gce.getInstanceByName(mapNodeNameToInstanceName(nodeName)) if err != nil { return err } - instance, err := gce.c.BetaInstances().Get(context.Background(), meta.ZonalKey(v1instance.Name, lastComponent(v1instance.Zone))) + instance, err := gce.c.BetaInstances().Get(ctx, meta.ZonalKey(v1instance.Name, lastComponent(v1instance.Zone))) if err != nil { return err } @@ -383,13 +407,16 @@ func (gce *GCECloud) AddAliasToInstance(nodeName types.NodeName, alias *net.IPNe }) mc := newInstancesMetricContext("add_alias", v1instance.Zone) - err = gce.c.BetaInstances().UpdateNetworkInterface(context.Background(), meta.ZonalKey(instance.Name, lastComponent(instance.Zone)), iface.Name, iface) + err = gce.c.BetaInstances().UpdateNetworkInterface(ctx, meta.ZonalKey(instance.Name, lastComponent(instance.Zone)), iface.Name, iface) return mc.Observe(err) } // Gets the named instances, returning cloudprovider.InstanceNotFound if any // instance is not found func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + found := map[string]*gceInstance{} remaining := len(names) @@ -407,7 +434,7 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error) if remaining == 0 { break } - instances, err := gce.c.Instances().List(context.Background(), zone, filter.Regexp("name", nodeInstancePrefix+".*")) + instances, err := gce.c.Instances().List(ctx, zone, filter.Regexp("name", nodeInstancePrefix+".*")) if err != nil { return nil, err } @@ -471,9 +498,12 @@ func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) { } func (gce *GCECloud) getInstanceFromProjectInZoneByName(project, zone, name string) (*gceInstance, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + name = canonicalizeInstanceName(name) mc := newInstancesMetricContext("get", zone) - res, err := gce.c.Instances().Get(context.Background(), meta.ZonalKey(name, zone)) + res, err := gce.c.Instances().Get(ctx, meta.ZonalKey(name, zone)) mc.Observe(err) if err != nil { return nil, err @@ -532,6 +562,9 @@ func (gce *GCECloud) isCurrentInstance(instanceID string) bool { // format of the host names in the cluster. Only use it as a fallback if // gce.nodeTags is unspecified func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + // TODO: We could store the tags in gceInstance, so we could have already fetched it hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup) nodeInstancePrefix := gce.nodeInstancePrefix @@ -556,7 +589,7 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) { filt = filter.Regexp("name", nodeInstancePrefix+".*") } for zone, hostNames := range hostNamesByZone { - instances, err := gce.c.Instances().List(context.Background(), zone, filt) + instances, err := gce.c.Instances().List(ctx, zone, filt) if err != nil { return nil, err } diff --git a/pkg/cloudprovider/providers/gce/gce_networkendpointgroup.go b/pkg/cloudprovider/providers/gce/gce_networkendpointgroup.go index f0e3fb538d..efa4dcf4be 100644 --- a/pkg/cloudprovider/providers/gce/gce_networkendpointgroup.go +++ b/pkg/cloudprovider/providers/gce/gce_networkendpointgroup.go @@ -17,12 +17,12 @@ limitations under the License. package gce import ( - "context" "fmt" "strings" computealpha "google.golang.org/api/compute/v0.alpha" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -37,31 +37,40 @@ func newNetworkEndpointGroupMetricContext(request string, zone string) *metricCo } func (gce *GCECloud) GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("get", zone) if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return nil, mc.Observe(err) } - v, err := gce.c.AlphaNetworkEndpointGroups().Get(context.Background(), meta.ZonalKey(name, zone)) + v, err := gce.c.AlphaNetworkEndpointGroups().Get(ctx, meta.ZonalKey(name, zone)) return v, mc.Observe(err) } func (gce *GCECloud) ListNetworkEndpointGroup(zone string) ([]*computealpha.NetworkEndpointGroup, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("list", zone) if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return nil, mc.Observe(err) } - negs, err := gce.c.AlphaNetworkEndpointGroups().List(context.Background(), zone, filter.None) + negs, err := gce.c.AlphaNetworkEndpointGroups().List(ctx, zone, filter.None) return negs, mc.Observe(err) } // AggregatedListNetworkEndpointGroup returns a map of zone -> endpoint group. func (gce *GCECloud) AggregatedListNetworkEndpointGroup() (map[string][]*computealpha.NetworkEndpointGroup, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("aggregated_list", "") if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return nil, mc.Observe(err) } // TODO: filter for the region the cluster is in. - all, err := gce.c.AlphaNetworkEndpointGroups().AggregatedList(context.Background(), filter.None) + all, err := gce.c.AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None) if err != nil { return nil, mc.Observe(err) } @@ -79,22 +88,31 @@ func (gce *GCECloud) AggregatedListNetworkEndpointGroup() (map[string][]*compute } func (gce *GCECloud) CreateNetworkEndpointGroup(neg *computealpha.NetworkEndpointGroup, zone string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return err } mc := newNetworkEndpointGroupMetricContext("create", zone) - return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Insert(context.Background(), meta.ZonalKey(neg.Name, zone), neg)) + return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Insert(ctx, meta.ZonalKey(neg.Name, zone), neg)) } func (gce *GCECloud) DeleteNetworkEndpointGroup(name string, zone string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return err } mc := newNetworkEndpointGroupMetricContext("delete", zone) - return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Delete(context.Background(), meta.ZonalKey(name, zone))) + return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Delete(ctx, meta.ZonalKey(name, zone))) } func (gce *GCECloud) AttachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("attach", zone) if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return mc.Observe(err) @@ -102,10 +120,13 @@ func (gce *GCECloud) AttachNetworkEndpoints(name, zone string, endpoints []*comp req := &computealpha.NetworkEndpointGroupsAttachEndpointsRequest{ NetworkEndpoints: endpoints, } - return mc.Observe(gce.c.AlphaNetworkEndpointGroups().AttachNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req)) + return mc.Observe(gce.c.AlphaNetworkEndpointGroups().AttachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req)) } func (gce *GCECloud) DetachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("detach", zone) if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return mc.Observe(err) @@ -113,10 +134,13 @@ func (gce *GCECloud) DetachNetworkEndpoints(name, zone string, endpoints []*comp req := &computealpha.NetworkEndpointGroupsDetachEndpointsRequest{ NetworkEndpoints: endpoints, } - return mc.Observe(gce.c.AlphaNetworkEndpointGroups().DetachNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req)) + return mc.Observe(gce.c.AlphaNetworkEndpointGroups().DetachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req)) } func (gce *GCECloud) ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*computealpha.NetworkEndpointWithHealthStatus, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newNetworkEndpointGroupMetricContext("list_networkendpoints", zone) if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil { return nil, mc.Observe(err) @@ -128,6 +152,6 @@ func (gce *GCECloud) ListNetworkEndpoints(name, zone string, showHealthStatus bo req := &computealpha.NetworkEndpointGroupsListEndpointsRequest{ HealthStatus: healthStatus, } - l, err := gce.c.AlphaNetworkEndpointGroups().ListNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req, filter.None) + l, err := gce.c.AlphaNetworkEndpointGroups().ListNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req, filter.None) return l, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_routes.go b/pkg/cloudprovider/providers/gce/gce_routes.go index 4a7cea5b38..624b581cbd 100644 --- a/pkg/cloudprovider/providers/gce/gce_routes.go +++ b/pkg/cloudprovider/providers/gce/gce_routes.go @@ -27,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -37,10 +38,13 @@ func newRoutesMetricContext(request string) *metricContext { // ListRoutes in the cloud environment. func (gce *GCECloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newRoutesMetricContext("list") prefix := truncateClusterName(clusterName) f := filter.Regexp("name", prefix+"-.*").AndRegexp("network", gce.NetworkURL()).AndRegexp("description", k8sNodeRouteTag) - routes, err := gce.c.Routes().List(context.Background(), f) + routes, err := gce.c.Routes().List(ctx, f) if err != nil { return nil, mc.Observe(err) } @@ -60,6 +64,9 @@ func (gce *GCECloud) ListRoutes(ctx context.Context, clusterName string) ([]*clo // CreateRoute in the cloud environment. func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newRoutesMetricContext("create") targetInstance, err := gce.getInstanceByName(mapNodeNameToInstanceName(route.TargetNode)) @@ -74,7 +81,7 @@ func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHi Priority: 1000, Description: k8sNodeRouteTag, } - err = gce.c.Routes().Insert(context.Background(), meta.GlobalKey(cr.Name), cr) + err = gce.c.Routes().Insert(ctx, meta.GlobalKey(cr.Name), cr) if isHTTPErrorCode(err, http.StatusConflict) { glog.Infof("Route %q already exists.", cr.Name) err = nil @@ -84,8 +91,11 @@ func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHi // DeleteRoute from the cloud environment. func (gce *GCECloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newRoutesMetricContext("delete") - return mc.Observe(gce.c.Routes().Delete(context.Background(), meta.GlobalKey(route.Name))) + return mc.Observe(gce.c.Routes().Delete(ctx, meta.GlobalKey(route.Name))) } func truncateClusterName(clusterName string) string { diff --git a/pkg/cloudprovider/providers/gce/gce_securitypolicy.go b/pkg/cloudprovider/providers/gce/gce_securitypolicy.go index bec23a644f..293946590b 100644 --- a/pkg/cloudprovider/providers/gce/gce_securitypolicy.go +++ b/pkg/cloudprovider/providers/gce/gce_securitypolicy.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - computebeta "google.golang.org/api/compute/v0.beta" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -31,60 +30,87 @@ func newSecurityPolicyMetricContextWithVersion(request, version string) *metricC // GetBetaSecurityPolicy retrieves a security policy. func (gce *GCECloud) GetBetaSecurityPolicy(name string) (*computebeta.SecurityPolicy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("get", computeBetaVersion) - v, err := gce.c.BetaSecurityPolicies().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.BetaSecurityPolicies().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // ListBetaSecurityPolicy lists all security policies in the project. func (gce *GCECloud) ListBetaSecurityPolicy() ([]*computebeta.SecurityPolicy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("list", computeBetaVersion) - v, err := gce.c.BetaSecurityPolicies().List(context.Background(), filter.None) + v, err := gce.c.BetaSecurityPolicies().List(ctx, filter.None) return v, mc.Observe(err) } // CreateBetaSecurityPolicy creates the given security policy. func (gce *GCECloud) CreateBetaSecurityPolicy(sp *computebeta.SecurityPolicy) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("create", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().Insert(context.Background(), meta.GlobalKey(sp.Name), sp)) + return mc.Observe(gce.c.BetaSecurityPolicies().Insert(ctx, meta.GlobalKey(sp.Name), sp)) } // DeleteBetaSecurityPolicy deletes the given security policy. func (gce *GCECloud) DeleteBetaSecurityPolicy(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("delete", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.BetaSecurityPolicies().Delete(ctx, meta.GlobalKey(name))) } // PatchBetaSecurityPolicy applies the given security policy as a // patch to an existing security policy. func (gce *GCECloud) PatchBetaSecurityPolicy(sp *computebeta.SecurityPolicy) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("patch", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().Patch(context.Background(), meta.GlobalKey(sp.Name), sp)) + return mc.Observe(gce.c.BetaSecurityPolicies().Patch(ctx, meta.GlobalKey(sp.Name), sp)) } // GetRuleForBetaSecurityPolicy gets rule from a security policy. func (gce *GCECloud) GetRuleForBetaSecurityPolicy(name string) (*computebeta.SecurityPolicyRule, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("get_rule", computeBetaVersion) - v, err := gce.c.BetaSecurityPolicies().GetRule(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.BetaSecurityPolicies().GetRule(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // AddRuletoBetaSecurityPolicy adds the given security policy rule to // a security policy. func (gce *GCECloud) AddRuletoBetaSecurityPolicy(name string, spr *computebeta.SecurityPolicyRule) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("add_rule", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().AddRule(context.Background(), meta.GlobalKey(name), spr)) + return mc.Observe(gce.c.BetaSecurityPolicies().AddRule(ctx, meta.GlobalKey(name), spr)) } // PatchRuleForBetaSecurityPolicy patches the given security policy // rule to a security policy. func (gce *GCECloud) PatchRuleForBetaSecurityPolicy(name string, spr *computebeta.SecurityPolicyRule) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("patch_rule", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().PatchRule(context.Background(), meta.GlobalKey(name), spr)) + return mc.Observe(gce.c.BetaSecurityPolicies().PatchRule(ctx, meta.GlobalKey(name), spr)) } // RemoveRuleFromBetaSecurityPolicy removes rule from a security policy. func (gce *GCECloud) RemoveRuleFromBetaSecurityPolicy(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newSecurityPolicyMetricContextWithVersion("remove_rule", computeBetaVersion) - return mc.Observe(gce.c.BetaSecurityPolicies().RemoveRule(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.BetaSecurityPolicies().RemoveRule(ctx, meta.GlobalKey(name))) } diff --git a/pkg/cloudprovider/providers/gce/gce_targetpool.go b/pkg/cloudprovider/providers/gce/gce_targetpool.go index 8fd3b3704e..8c1127e749 100644 --- a/pkg/cloudprovider/providers/gce/gce_targetpool.go +++ b/pkg/cloudprovider/providers/gce/gce_targetpool.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -30,37 +29,52 @@ func newTargetPoolMetricContext(request, region string) *metricContext { // GetTargetPool returns the TargetPool by name. func (gce *GCECloud) GetTargetPool(name, region string) (*compute.TargetPool, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetPoolMetricContext("get", region) - v, err := gce.c.TargetPools().Get(context.Background(), meta.RegionalKey(name, region)) + v, err := gce.c.TargetPools().Get(ctx, meta.RegionalKey(name, region)) return v, mc.Observe(err) } // CreateTargetPool creates the passed TargetPool func (gce *GCECloud) CreateTargetPool(tp *compute.TargetPool, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetPoolMetricContext("create", region) - return mc.Observe(gce.c.TargetPools().Insert(context.Background(), meta.RegionalKey(tp.Name, region), tp)) + return mc.Observe(gce.c.TargetPools().Insert(ctx, meta.RegionalKey(tp.Name, region), tp)) } // DeleteTargetPool deletes TargetPool by name. func (gce *GCECloud) DeleteTargetPool(name, region string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetPoolMetricContext("delete", region) - return mc.Observe(gce.c.TargetPools().Delete(context.Background(), meta.RegionalKey(name, region))) + return mc.Observe(gce.c.TargetPools().Delete(ctx, meta.RegionalKey(name, region))) } // AddInstancesToTargetPool adds instances by link to the TargetPool func (gce *GCECloud) AddInstancesToTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + req := &compute.TargetPoolsAddInstanceRequest{ Instances: instanceRefs, } mc := newTargetPoolMetricContext("add_instances", region) - return mc.Observe(gce.c.TargetPools().AddInstance(context.Background(), meta.RegionalKey(name, region), req)) + return mc.Observe(gce.c.TargetPools().AddInstance(ctx, meta.RegionalKey(name, region), req)) } // RemoveInstancesFromTargetPool removes instances by link to the TargetPool func (gce *GCECloud) RemoveInstancesFromTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + req := &compute.TargetPoolsRemoveInstanceRequest{ Instances: instanceRefs, } mc := newTargetPoolMetricContext("remove_instances", region) - return mc.Observe(gce.c.TargetPools().RemoveInstance(context.Background(), meta.RegionalKey(name, region), req)) + return mc.Observe(gce.c.TargetPools().RemoveInstance(ctx, meta.RegionalKey(name, region), req)) } diff --git a/pkg/cloudprovider/providers/gce/gce_targetproxy.go b/pkg/cloudprovider/providers/gce/gce_targetproxy.go index 36d21bcff6..f266d13e61 100644 --- a/pkg/cloudprovider/providers/gce/gce_targetproxy.go +++ b/pkg/cloudprovider/providers/gce/gce_targetproxy.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -31,34 +30,49 @@ func newTargetProxyMetricContext(request string) *metricContext { // GetTargetHttpProxy returns the UrlMap by name. func (gce *GCECloud) GetTargetHttpProxy(name string) (*compute.TargetHttpProxy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("get") - v, err := gce.c.TargetHttpProxies().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.TargetHttpProxies().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // CreateTargetHttpProxy creates a TargetHttpProxy func (gce *GCECloud) CreateTargetHttpProxy(proxy *compute.TargetHttpProxy) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("create") - return mc.Observe(gce.c.TargetHttpProxies().Insert(context.Background(), meta.GlobalKey(proxy.Name), proxy)) + return mc.Observe(gce.c.TargetHttpProxies().Insert(ctx, meta.GlobalKey(proxy.Name), proxy)) } // SetUrlMapForTargetHttpProxy sets the given UrlMap for the given TargetHttpProxy. func (gce *GCECloud) SetUrlMapForTargetHttpProxy(proxy *compute.TargetHttpProxy, urlMap *compute.UrlMap) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + ref := &compute.UrlMapReference{UrlMap: urlMap.SelfLink} mc := newTargetProxyMetricContext("set_url_map") - return mc.Observe(gce.c.TargetHttpProxies().SetUrlMap(context.Background(), meta.GlobalKey(proxy.Name), ref)) + return mc.Observe(gce.c.TargetHttpProxies().SetUrlMap(ctx, meta.GlobalKey(proxy.Name), ref)) } // DeleteTargetHttpProxy deletes the TargetHttpProxy by name. func (gce *GCECloud) DeleteTargetHttpProxy(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("delete") - return mc.Observe(gce.c.TargetHttpProxies().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.TargetHttpProxies().Delete(ctx, meta.GlobalKey(name))) } // ListTargetHttpProxies lists all TargetHttpProxies in the project. func (gce *GCECloud) ListTargetHttpProxies() ([]*compute.TargetHttpProxy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("list") - v, err := gce.c.TargetHttpProxies().List(context.Background(), filter.None) + v, err := gce.c.TargetHttpProxies().List(ctx, filter.None) return v, mc.Observe(err) } @@ -66,42 +80,60 @@ func (gce *GCECloud) ListTargetHttpProxies() ([]*compute.TargetHttpProxy, error) // GetTargetHttpsProxy returns the UrlMap by name. func (gce *GCECloud) GetTargetHttpsProxy(name string) (*compute.TargetHttpsProxy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("get") - v, err := gce.c.TargetHttpsProxies().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.TargetHttpsProxies().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // CreateTargetHttpsProxy creates a TargetHttpsProxy func (gce *GCECloud) CreateTargetHttpsProxy(proxy *compute.TargetHttpsProxy) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("create") - return mc.Observe(gce.c.TargetHttpsProxies().Insert(context.Background(), meta.GlobalKey(proxy.Name), proxy)) + return mc.Observe(gce.c.TargetHttpsProxies().Insert(ctx, meta.GlobalKey(proxy.Name), proxy)) } // SetUrlMapForTargetHttpsProxy sets the given UrlMap for the given TargetHttpsProxy. func (gce *GCECloud) SetUrlMapForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, urlMap *compute.UrlMap) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("set_url_map") ref := &compute.UrlMapReference{UrlMap: urlMap.SelfLink} - return mc.Observe(gce.c.TargetHttpsProxies().SetUrlMap(context.Background(), meta.GlobalKey(proxy.Name), ref)) + return mc.Observe(gce.c.TargetHttpsProxies().SetUrlMap(ctx, meta.GlobalKey(proxy.Name), ref)) } // SetSslCertificateForTargetHttpsProxy sets the given SslCertificate for the given TargetHttpsProxy. func (gce *GCECloud) SetSslCertificateForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, sslCertURLs []string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("set_ssl_cert") req := &compute.TargetHttpsProxiesSetSslCertificatesRequest{ SslCertificates: sslCertURLs, } - return mc.Observe(gce.c.TargetHttpsProxies().SetSslCertificates(context.Background(), meta.GlobalKey(proxy.Name), req)) + return mc.Observe(gce.c.TargetHttpsProxies().SetSslCertificates(ctx, meta.GlobalKey(proxy.Name), req)) } // DeleteTargetHttpsProxy deletes the TargetHttpsProxy by name. func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("delete") - return mc.Observe(gce.c.TargetHttpsProxies().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.TargetHttpsProxies().Delete(ctx, meta.GlobalKey(name))) } // ListTargetHttpsProxies lists all TargetHttpsProxies in the project. func (gce *GCECloud) ListTargetHttpsProxies() ([]*compute.TargetHttpsProxy, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newTargetProxyMetricContext("list") - v, err := gce.c.TargetHttpsProxies().List(context.Background(), filter.None) + v, err := gce.c.TargetHttpsProxies().List(ctx, filter.None) return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_urlmap.go b/pkg/cloudprovider/providers/gce/gce_urlmap.go index 54ceccbf54..b0e60093c1 100644 --- a/pkg/cloudprovider/providers/gce/gce_urlmap.go +++ b/pkg/cloudprovider/providers/gce/gce_urlmap.go @@ -17,10 +17,9 @@ limitations under the License. package gce import ( - "context" - compute "google.golang.org/api/compute/v1" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta" ) @@ -31,32 +30,47 @@ func newUrlMapMetricContext(request string) *metricContext { // GetUrlMap returns the UrlMap by name. func (gce *GCECloud) GetUrlMap(name string) (*compute.UrlMap, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newUrlMapMetricContext("get") - v, err := gce.c.UrlMaps().Get(context.Background(), meta.GlobalKey(name)) + v, err := gce.c.UrlMaps().Get(ctx, meta.GlobalKey(name)) return v, mc.Observe(err) } // CreateUrlMap creates a url map func (gce *GCECloud) CreateUrlMap(urlMap *compute.UrlMap) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newUrlMapMetricContext("create") - return mc.Observe(gce.c.UrlMaps().Insert(context.Background(), meta.GlobalKey(urlMap.Name), urlMap)) + return mc.Observe(gce.c.UrlMaps().Insert(ctx, meta.GlobalKey(urlMap.Name), urlMap)) } // UpdateUrlMap applies the given UrlMap as an update func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newUrlMapMetricContext("update") - return mc.Observe(gce.c.UrlMaps().Update(context.Background(), meta.GlobalKey(urlMap.Name), urlMap)) + return mc.Observe(gce.c.UrlMaps().Update(ctx, meta.GlobalKey(urlMap.Name), urlMap)) } // DeleteUrlMap deletes a url map by name. func (gce *GCECloud) DeleteUrlMap(name string) error { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newUrlMapMetricContext("delete") - return mc.Observe(gce.c.UrlMaps().Delete(context.Background(), meta.GlobalKey(name))) + return mc.Observe(gce.c.UrlMaps().Delete(ctx, meta.GlobalKey(name))) } // ListUrlMaps lists all UrlMaps in the project. func (gce *GCECloud) ListUrlMaps() ([]*compute.UrlMap, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newUrlMapMetricContext("list") - v, err := gce.c.UrlMaps().List(context.Background(), filter.None) + v, err := gce.c.UrlMaps().List(ctx, filter.None) return v, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/gce_zones.go b/pkg/cloudprovider/providers/gce/gce_zones.go index 6c745e34c2..503ca348f4 100644 --- a/pkg/cloudprovider/providers/gce/gce_zones.go +++ b/pkg/cloudprovider/providers/gce/gce_zones.go @@ -24,6 +24,7 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/kubernetes/pkg/cloudprovider" + "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud" "k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter" ) @@ -72,8 +73,11 @@ func (gce *GCECloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeN // ListZonesInRegion returns all zones in a GCP region func (gce *GCECloud) ListZonesInRegion(region string) ([]*compute.Zone, error) { + ctx, cancel := cloud.ContextWithCallTimeout() + defer cancel() + mc := newZonesMetricContext("list", region) - list, err := gce.c.Zones().List(context.Background(), filter.Regexp("region", gce.getRegionLink(region))) + list, err := gce.c.Zones().List(ctx, filter.Regexp("region", gce.getRegionLink(region))) if err != nil { return nil, mc.Observe(err) } diff --git a/pkg/cloudprovider/providers/gce/support.go b/pkg/cloudprovider/providers/gce/support.go index 37dc75b0b0..7861e08acb 100644 --- a/pkg/cloudprovider/providers/gce/support.go +++ b/pkg/cloudprovider/providers/gce/support.go @@ -50,17 +50,15 @@ type gceRateLimiter struct { // operations. func (l *gceRateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error { if key.Operation == "Get" && key.Service == "Operations" { - ch := make(chan struct{}) - go func() { - l.gce.operationPollRateLimiter.Accept() - close(ch) - }() - select { - case <-ch: - break - case <-ctx.Done(): - return ctx.Err() + // Wait a minimum amount of time regardless of rate limiter. + rl := &cloud.MinimumRateLimiter{ + // Convert flowcontrol.RateLimiter into cloud.RateLimiter + RateLimiter: &cloud.AcceptRateLimiter{ + Acceptor: l.gce.operationPollRateLimiter, + }, + Minimum: operationPollInterval, } + return rl.Accept(ctx, key) } return nil }