Merge pull request #64630 from nicksardo/fix-op-rate

Automatic merge from submit-queue (batch tested with PRs 64272, 64630). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

GCE: Fix operation polling and error handling

Cloud functions using the generated API are bursting operation GET calls because we don't wait a minimum amount of time.

Fixes #64712
Fixes #64858

**Changes**
- `operationPollInterval` is now 1.5 seconds instead of 3 seconds.
-  `operationPollRateLimiter` is now configured with 5 QPS / 5 burst instead of 10 QPS / 10 burst.
- `gceRateLimiter` is now configured with a `MinimumRateLimiter` to wait the above `operationPollInterval` duration _before_ waiting on the token rate limiter.
- Operations are now rate limited on the very first GET call.
- Operations are polled until `DONE` or context times out (even if operations.get fails continuously).
- Compute operations are checked for errors when they're recognized as `DONE`. 
- All "wrapper" funcs now generate a context with an hour timeout.


`ingress-gce` will need to update its vendor and utilize the `MinimumRateLimiter` as well. Since ingress creates rate limiters based off flags, we'll need to check the resource type and operation while parsing the flags and wrap the appropriate one.

**Special notes for your reviewer**:
/assign bowei
/cc bowei

**Fix Example**
Creating an external load balancer

without fix:  https://pastebin.com/raw/NNkeNWS3  
with fix: https://pastebin.com/raw/x2iMLW5S (a difference of about 200 GET calls)

**Release note**:
```release-note
GCE: Fixes operation polling to adhere to the specified interval. Furthermore, operation errors are now returned instead of ignored.
```
pull/8/head
Kubernetes Submit Queue 2018-06-14 14:11:15 -07:00 committed by GitHub
commit 0f87069384
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
24 changed files with 892 additions and 193 deletions

View File

@ -6,6 +6,7 @@ go_library(
"constants.go",
"context.go",
"doc.go",
"errors.go",
"gce_projects.go",
"gen.go",
"op.go",
@ -32,6 +33,8 @@ go_test(
srcs = [
"gen_test.go",
"mock_test.go",
"ratelimit_test.go",
"service_test.go",
"utils_test.go",
],
embed = [":go_default_library"],

View File

@ -0,0 +1,48 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import "fmt"
// OperationPollingError occurs when the GCE Operation cannot be retrieved for a prolonged period.
type OperationPollingError struct {
LastPollError error
}
// Error returns a string representation including the last poll error encountered.
func (e *OperationPollingError) Error() string {
return fmt.Sprintf("GCE operation polling error: %v", e.LastPollError)
}
// GCEOperationError occurs when the GCE Operation finishes with an error.
type GCEOperationError struct {
// HTTPStatusCode is the HTTP status code of the final error.
// For example, a failed operation may have 400 - BadRequest.
HTTPStatusCode int
// Code is GCE's code of what went wrong.
// For example, RESOURCE_IN_USE_BY_ANOTHER_RESOURCE
Code string
// Message is a human readable message.
// For example, "The network resource 'xxx' is already being used by 'xxx'"
Message string
}
// Error returns a string representation including the HTTP Status code, GCE's error code
// and a human readable message.
func (e *GCEOperationError) Error() string {
return fmt.Sprintf("GCE %v - %v: %v", e.HTTPStatusCode, e.Code, e.Message)
}

View File

@ -29,10 +29,17 @@ import (
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
const (
operationStatusDone = "DONE"
)
// operation is a GCE operation that can be watied on.
type operation interface {
// isDone queries GCE for the done status. This call can block.
isDone(ctx context.Context) (bool, error)
// error returns the resulting error of the operation. This may be nil if the operations
// was successful.
error() error
// rateLimitKey returns the rate limit key to use for the given operation.
// This rate limit will govern how fast the server will be polled for
// operation completion status.
@ -43,6 +50,7 @@ type gaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}
func (o *gaOperation) String() string {
@ -71,7 +79,15 @@ func (o *gaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}
func (o *gaOperation) rateLimitKey() *RateLimitKey {
@ -83,10 +99,15 @@ func (o *gaOperation) rateLimitKey() *RateLimitKey {
}
}
func (o *gaOperation) error() error {
return o.err
}
type alphaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}
func (o *alphaOperation) String() string {
@ -115,7 +136,15 @@ func (o *alphaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}
func (o *alphaOperation) rateLimitKey() *RateLimitKey {
@ -127,10 +156,15 @@ func (o *alphaOperation) rateLimitKey() *RateLimitKey {
}
}
func (o *alphaOperation) error() error {
return o.err
}
type betaOperation struct {
s *Service
projectID string
key *meta.Key
err error
}
func (o *betaOperation) String() string {
@ -159,7 +193,15 @@ func (o *betaOperation) isDone(ctx context.Context) (bool, error) {
if err != nil {
return false, err
}
return op != nil && op.Status == "DONE", nil
if op == nil || op.Status != operationStatusDone {
return false, nil
}
if op.Error != nil && len(op.Error.Errors) > 0 && op.Error.Errors[0] != nil {
e := op.Error.Errors[0]
o.err = &GCEOperationError{HTTPStatusCode: op.HTTPStatusCode, Code: e.Code, Message: e.Message}
}
return true, nil
}
func (o *betaOperation) rateLimitKey() *RateLimitKey {
@ -170,3 +212,7 @@ func (o *betaOperation) rateLimitKey() *RateLimitKey {
Version: meta.VersionBeta,
}
}
func (o *betaOperation) error() error {
return o.err
}

View File

@ -47,22 +47,60 @@ type RateLimiter interface {
Accept(ctx context.Context, key *RateLimitKey) error
}
// acceptor is an object which blocks within Accept until a call is allowed to run.
// Accept is a behavior of the flowcontrol.RateLimiter interface.
type acceptor interface {
// Accept blocks until a call is allowed to run.
Accept()
}
// AcceptRateLimiter wraps an Acceptor with RateLimiter parameters.
type AcceptRateLimiter struct {
// Acceptor is the underlying rate limiter.
Acceptor acceptor
}
// Accept wraps an Acceptor and blocks on Accept or context.Done(). Key is ignored.
func (rl *AcceptRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
ch := make(chan struct{})
go func() {
rl.Acceptor.Accept()
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
}
return nil
}
// NopRateLimiter is a rate limiter that performs no rate limiting.
type NopRateLimiter struct {
}
// Accept the operation to be rate limited.
// Accept everything immediately.
func (*NopRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
// Rate limit polling of the Operation status to avoid hammering GCE
// for the status of an operation.
const pollTime = time.Duration(1) * time.Second
if key.Operation == "Get" && key.Service == "Operations" {
select {
case <-time.NewTimer(pollTime).C:
break
case <-ctx.Done():
return ctx.Err()
}
}
return nil
}
// MinimumRateLimiter wraps a RateLimiter and will only call its Accept until the minimum
// duration has been met or the context is cancelled.
type MinimumRateLimiter struct {
// RateLimiter is the underlying ratelimiter which is called after the mininum time is reacehd.
RateLimiter RateLimiter
// Minimum is the minimum wait time before the underlying ratelimiter is called.
Minimum time.Duration
}
// Accept blocks on the minimum duration and context. Once the minimum duration is met,
// the func is blocked on the underlying ratelimiter.
func (m *MinimumRateLimiter) Accept(ctx context.Context, key *RateLimitKey) error {
select {
case <-time.After(m.Minimum):
return m.RateLimiter.Accept(ctx, key)
case <-ctx.Done():
return ctx.Err()
}
}

View File

@ -0,0 +1,80 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"testing"
"time"
)
type FakeAcceptor struct{ accept func() }
func (f *FakeAcceptor) Accept() {
f.accept()
}
func TestAcceptRateLimiter(t *testing.T) {
fa := &FakeAcceptor{accept: func() {}}
arl := &AcceptRateLimiter{fa}
err := arl.Accept(context.Background(), nil)
if err != nil {
t.Errorf("AcceptRateLimiter.Accept() = %v, want nil", err)
}
// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()
fa.accept = func() { time.Sleep(1 * time.Second) }
err = arl.Accept(ctxCancelled, nil)
if err != ctxCancelled.Err() {
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
}
}
func TestMinimumRateLimiter(t *testing.T) {
fa := &FakeAcceptor{accept: func() {}}
arl := &AcceptRateLimiter{fa}
var called bool
fa.accept = func() { called = true }
m := &MinimumRateLimiter{RateLimiter: arl, Minimum: 10 * time.Millisecond}
err := m.Accept(context.Background(), nil)
if err != nil {
t.Errorf("MinimumRateLimiter.Accept = %v, want nil", err)
}
if !called {
t.Errorf("`called` = false, want true")
}
// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()
called = false
err = m.Accept(ctxCancelled, nil)
if err != ctxCancelled.Err() {
t.Errorf("AcceptRateLimiter.Accept() = %v, want %v", err, ctxCancelled.Err())
}
if called {
t.Errorf("`called` = true, want false")
}
}

View File

@ -45,19 +45,19 @@ func (s *Service) wrapOperation(anyOp interface{}) (operation, error) {
if err != nil {
return nil, err
}
return &gaOperation{s, r.ProjectID, r.Key}, nil
return &gaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
case *alpha.Operation:
r, err := ParseResourceURL(o.SelfLink)
if err != nil {
return nil, err
}
return &alphaOperation{s, r.ProjectID, r.Key}, nil
return &alphaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
case *beta.Operation:
r, err := ParseResourceURL(o.SelfLink)
if err != nil {
return nil, err
}
return &betaOperation{s, r.ProjectID, r.Key}, nil
return &betaOperation{s: s, projectID: r.ProjectID, key: r.Key}, nil
default:
return nil, fmt.Errorf("invalid type %T", anyOp)
}
@ -72,14 +72,39 @@ func (s *Service) WaitForCompletion(ctx context.Context, genericOp interface{})
glog.Errorf("wrapOperation(%+v) error: %v", genericOp, err)
return err
}
for done, err := op.isDone(ctx); !done; done, err = op.isDone(ctx) {
if err != nil {
glog.V(4).Infof("op.isDone(%v) error; op = %v, err = %v", ctx, op, err)
return err
}
glog.V(5).Infof("op.isDone(%v) waiting; op = %v", ctx, op)
s.RateLimiter.Accept(ctx, op.rateLimitKey())
}
glog.V(5).Infof("op.isDone(%v) complete; op = %v", ctx, op)
return nil
return s.pollOperation(ctx, op)
}
// pollOperation calls operations.isDone until the function comes back true or context is Done.
// If an error occurs retrieving the operation, the loop will continue until the context is done.
// This is to prevent a transient error from bubbling up to controller-level logic.
func (s *Service) pollOperation(ctx context.Context, op operation) error {
var pollCount int
for {
// Check if context has been cancelled. Note that ctx.Done() must be checked before
// returning ctx.Err().
select {
case <-ctx.Done():
glog.V(5).Infof("op.pollOperation(%v, %v) not completed, poll count = %d, ctx.Err = %v", ctx, op, pollCount, ctx.Err())
return ctx.Err()
default:
// ctx is not canceled, continue immediately
}
pollCount++
glog.V(5).Infof("op.isDone(%v) waiting; op = %v, poll count = %d", ctx, op, pollCount)
s.RateLimiter.Accept(ctx, op.rateLimitKey())
done, err := op.isDone(ctx)
if err != nil {
glog.V(5).Infof("op.isDone(%v) error; op = %v, poll count = %d, err = %v, retrying", ctx, op, pollCount, err)
}
if done {
break
}
}
glog.V(5).Infof("op.isDone(%v) complete; op = %v, poll count = %d, op.err = %v", ctx, op, pollCount, op.error())
return op.error()
}

View File

@ -0,0 +1,84 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cloud
import (
"context"
"fmt"
"testing"
)
func TestPollOperation(t *testing.T) {
const totalAttempts = 10
var attempts int
fo := &fakeOperation{isDoneFunc: func(ctx context.Context) (bool, error) {
attempts++
if attempts < totalAttempts {
return false, nil
}
return true, nil
}}
s := Service{RateLimiter: &NopRateLimiter{}}
// Check that pollOperation will retry the operation multiple times.
err := s.pollOperation(context.Background(), fo)
if err != nil {
t.Errorf("pollOperation() = %v, want nil", err)
}
if attempts != totalAttempts {
t.Errorf("`attempts` = %d, want %d", attempts, totalAttempts)
}
// Check that the operation's error is returned.
fo.err = fmt.Errorf("test operation failed")
err = s.pollOperation(context.Background(), fo)
if err != fo.err {
t.Errorf("pollOperation() = %v, want %v", err, fo.err)
}
fo.err = nil
fo.isDoneFunc = func(ctx context.Context) (bool, error) {
return false, nil
}
// Use context that has been cancelled and expect a context error returned.
ctxCancelled, cancelled := context.WithCancel(context.Background())
cancelled()
// Verify context is cancelled by now.
<-ctxCancelled.Done()
// Check that pollOperation returns because the context is cancelled.
err = s.pollOperation(ctxCancelled, fo)
if err == nil {
t.Errorf("pollOperation() = nil, want: %v", ctxCancelled.Err())
}
}
type fakeOperation struct {
isDoneFunc func(ctx context.Context) (bool, error)
err error
rateKey *RateLimitKey
}
func (f *fakeOperation) isDone(ctx context.Context) (bool, error) {
return f.isDoneFunc(ctx)
}
func (f *fakeOperation) error() error {
return f.err
}
func (f *fakeOperation) rateLimitKey() *RateLimitKey {
return f.rateKey
}

View File

@ -68,7 +68,7 @@ const (
// AffinityTypeClientIPProto - affinity based on Client IP and port.
gceAffinityTypeClientIPProto = "CLIENT_IP_PROTO"
operationPollInterval = 3 * time.Second
operationPollInterval = time.Second
// Creating Route in very large clusters, may take more than half an hour.
operationPollTimeoutDuration = time.Hour
@ -484,7 +484,7 @@ func CreateGCECloud(config *CloudConfig) (*GCECloud, error) {
glog.Infof("managing multiple zones: %v", config.ManagedZones)
}
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(10, 100) // 10 qps, 100 bucket size.
operationPollRateLimiter := flowcontrol.NewTokenBucketRateLimiter(5, 5) // 5 qps, 5 burst.
gce := &GCECloud{
service: service,

View File

@ -17,7 +17,6 @@ limitations under the License.
package gce
import (
"context"
"fmt"
"github.com/golang/glog"
@ -44,72 +43,105 @@ func newAddressMetricContextWithVersion(request, region, version string) *metric
// ipAddress is specified, it must belong to the current project, eg: an
// ephemeral IP associated with a global forwarding rule.
func (gce *GCECloud) ReserveGlobalAddress(addr *compute.Address) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("reserve", "")
return mc.Observe(gce.c.GlobalAddresses().Insert(context.Background(), meta.GlobalKey(addr.Name), addr))
return mc.Observe(gce.c.GlobalAddresses().Insert(ctx, meta.GlobalKey(addr.Name), addr))
}
// DeleteGlobalAddress deletes a global address by name.
func (gce *GCECloud) DeleteGlobalAddress(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("delete", "")
return mc.Observe(gce.c.GlobalAddresses().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.GlobalAddresses().Delete(ctx, meta.GlobalKey(name)))
}
// GetGlobalAddress returns the global address by name.
func (gce *GCECloud) GetGlobalAddress(name string) (*compute.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("get", "")
v, err := gce.c.GlobalAddresses().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.GlobalAddresses().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// ReserveRegionAddress creates a region address
func (gce *GCECloud) ReserveRegionAddress(addr *compute.Address, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("reserve", region)
return mc.Observe(gce.c.Addresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr))
return mc.Observe(gce.c.Addresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr))
}
// ReserveAlphaRegionAddress creates an Alpha, regional address.
func (gce *GCECloud) ReserveAlphaRegionAddress(addr *computealpha.Address, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("reserve", region)
return mc.Observe(gce.c.AlphaAddresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr))
return mc.Observe(gce.c.AlphaAddresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr))
}
// ReserveBetaRegionAddress creates a beta region address
func (gce *GCECloud) ReserveBetaRegionAddress(addr *computebeta.Address, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("reserve", region)
return mc.Observe(gce.c.BetaAddresses().Insert(context.Background(), meta.RegionalKey(addr.Name, region), addr))
return mc.Observe(gce.c.BetaAddresses().Insert(ctx, meta.RegionalKey(addr.Name, region), addr))
}
// DeleteRegionAddress deletes a region address by name.
func (gce *GCECloud) DeleteRegionAddress(name, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("delete", region)
return mc.Observe(gce.c.Addresses().Delete(context.Background(), meta.RegionalKey(name, region)))
return mc.Observe(gce.c.Addresses().Delete(ctx, meta.RegionalKey(name, region)))
}
// GetRegionAddress returns the region address by name
func (gce *GCECloud) GetRegionAddress(name, region string) (*compute.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("get", region)
v, err := gce.c.Addresses().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.Addresses().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// GetAlphaRegionAddress returns the Alpha, regional address by name.
func (gce *GCECloud) GetAlphaRegionAddress(name, region string) (*computealpha.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("get", region)
v, err := gce.c.AlphaAddresses().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.AlphaAddresses().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// GetBetaRegionAddress returns the beta region address by name
func (gce *GCECloud) GetBetaRegionAddress(name, region string) (*computebeta.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("get", region)
v, err := gce.c.BetaAddresses().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.BetaAddresses().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// GetRegionAddressByIP returns the regional address matching the given IP address.
func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("list", region)
addrs, err := gce.c.Addresses().List(context.Background(), region, filter.Regexp("address", ipAddress))
addrs, err := gce.c.Addresses().List(ctx, region, filter.Regexp("address", ipAddress))
mc.Observe(err)
if err != nil {
@ -129,8 +161,11 @@ func (gce *GCECloud) GetRegionAddressByIP(region, ipAddress string) (*compute.Ad
// GetBetaRegionAddressByIP returns the beta regional address matching the given IP address.
func (gce *GCECloud) GetBetaRegionAddressByIP(region, ipAddress string) (*computebeta.Address, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newAddressMetricContext("list", region)
addrs, err := gce.c.BetaAddresses().List(context.Background(), region, filter.Regexp("address", ipAddress))
addrs, err := gce.c.BetaAddresses().List(ctx, region, filter.Regexp("address", ipAddress))
mc.Observe(err)
if err != nil {

View File

@ -17,12 +17,11 @@ limitations under the License.
package gce
import (
"context"
computealpha "google.golang.org/api/compute/v0.alpha"
computebeta "google.golang.org/api/compute/v0.beta"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -37,61 +36,88 @@ func newBackendServiceMetricContextWithVersion(request, region, version string)
// GetGlobalBackendService retrieves a backend by name.
func (gce *GCECloud) GetGlobalBackendService(name string) (*compute.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("get", "")
v, err := gce.c.BackendServices().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.BackendServices().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// GetBetaGlobalBackendService retrieves beta backend by name.
func (gce *GCECloud) GetBetaGlobalBackendService(name string) (*computebeta.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContextWithVersion("get", "", computeBetaVersion)
v, err := gce.c.BetaBackendServices().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.BetaBackendServices().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// GetAlphaGlobalBackendService retrieves alpha backend by name.
func (gce *GCECloud) GetAlphaGlobalBackendService(name string) (*computealpha.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContextWithVersion("get", "", computeAlphaVersion)
v, err := gce.c.AlphaBackendServices().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.AlphaBackendServices().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// UpdateGlobalBackendService applies the given BackendService as an update to
// an existing service.
func (gce *GCECloud) UpdateGlobalBackendService(bg *compute.BackendService) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("update", "")
return mc.Observe(gce.c.BackendServices().Update(context.Background(), meta.GlobalKey(bg.Name), bg))
return mc.Observe(gce.c.BackendServices().Update(ctx, meta.GlobalKey(bg.Name), bg))
}
// UpdateAlphaGlobalBackendService applies the given alpha BackendService as an
// update to an existing service.
func (gce *GCECloud) UpdateAlphaGlobalBackendService(bg *computealpha.BackendService) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("update", "")
return mc.Observe(gce.c.AlphaBackendServices().Update(context.Background(), meta.GlobalKey(bg.Name), bg))
return mc.Observe(gce.c.AlphaBackendServices().Update(ctx, meta.GlobalKey(bg.Name), bg))
}
// DeleteGlobalBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteGlobalBackendService(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("delete", "")
return mc.Observe(gce.c.BackendServices().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.BackendServices().Delete(ctx, meta.GlobalKey(name)))
}
// CreateGlobalBackendService creates the given BackendService.
func (gce *GCECloud) CreateGlobalBackendService(bg *compute.BackendService) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("create", "")
return mc.Observe(gce.c.BackendServices().Insert(context.Background(), meta.GlobalKey(bg.Name), bg))
return mc.Observe(gce.c.BackendServices().Insert(ctx, meta.GlobalKey(bg.Name), bg))
}
// CreateAlphaGlobalBackendService creates the given alpha BackendService.
func (gce *GCECloud) CreateAlphaGlobalBackendService(bg *computealpha.BackendService) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("create", "")
return mc.Observe(gce.c.AlphaBackendServices().Insert(context.Background(), meta.GlobalKey(bg.Name), bg))
return mc.Observe(gce.c.AlphaBackendServices().Insert(ctx, meta.GlobalKey(bg.Name), bg))
}
// ListGlobalBackendServices lists all backend services in the project.
func (gce *GCECloud) ListGlobalBackendServices() ([]*compute.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("list", "")
v, err := gce.c.BackendServices().List(context.Background(), filter.None)
v, err := gce.c.BackendServices().List(ctx, filter.None)
return v, mc.Observe(err)
}
@ -99,42 +125,60 @@ func (gce *GCECloud) ListGlobalBackendServices() ([]*compute.BackendService, err
// identified by the given name, in the given instanceGroup. The
// instanceGroupLink is the fully qualified self link of an instance group.
func (gce *GCECloud) GetGlobalBackendServiceHealth(name string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("get_health", "")
groupRef := &compute.ResourceGroupReference{Group: instanceGroupLink}
v, err := gce.c.BackendServices().GetHealth(context.Background(), meta.GlobalKey(name), groupRef)
v, err := gce.c.BackendServices().GetHealth(ctx, meta.GlobalKey(name), groupRef)
return v, mc.Observe(err)
}
// GetRegionBackendService retrieves a backend by name.
func (gce *GCECloud) GetRegionBackendService(name, region string) (*compute.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("get", region)
v, err := gce.c.RegionBackendServices().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.RegionBackendServices().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// UpdateRegionBackendService applies the given BackendService as an update to
// an existing service.
func (gce *GCECloud) UpdateRegionBackendService(bg *compute.BackendService, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("update", region)
return mc.Observe(gce.c.RegionBackendServices().Update(context.Background(), meta.RegionalKey(bg.Name, region), bg))
return mc.Observe(gce.c.RegionBackendServices().Update(ctx, meta.RegionalKey(bg.Name, region), bg))
}
// DeleteRegionBackendService deletes the given BackendService by name.
func (gce *GCECloud) DeleteRegionBackendService(name, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("delete", region)
return mc.Observe(gce.c.RegionBackendServices().Delete(context.Background(), meta.RegionalKey(name, region)))
return mc.Observe(gce.c.RegionBackendServices().Delete(ctx, meta.RegionalKey(name, region)))
}
// CreateRegionBackendService creates the given BackendService.
func (gce *GCECloud) CreateRegionBackendService(bg *compute.BackendService, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("create", region)
return mc.Observe(gce.c.RegionBackendServices().Insert(context.Background(), meta.RegionalKey(bg.Name, region), bg))
return mc.Observe(gce.c.RegionBackendServices().Insert(ctx, meta.RegionalKey(bg.Name, region), bg))
}
// ListRegionBackendServices lists all backend services in the project.
func (gce *GCECloud) ListRegionBackendServices(region string) ([]*compute.BackendService, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("list", region)
v, err := gce.c.RegionBackendServices().List(context.Background(), region, filter.None)
v, err := gce.c.RegionBackendServices().List(ctx, region, filter.None)
return v, mc.Observe(err)
}
@ -142,22 +186,31 @@ func (gce *GCECloud) ListRegionBackendServices(region string) ([]*compute.Backen
// identified by the given name, in the given instanceGroup. The
// instanceGroupLink is the fully qualified self link of an instance group.
func (gce *GCECloud) GetRegionalBackendServiceHealth(name, region string, instanceGroupLink string) (*compute.BackendServiceGroupHealth, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContext("get_health", region)
ref := &compute.ResourceGroupReference{Group: instanceGroupLink}
v, err := gce.c.RegionBackendServices().GetHealth(context.Background(), meta.RegionalKey(name, region), ref)
v, err := gce.c.RegionBackendServices().GetHealth(ctx, meta.RegionalKey(name, region), ref)
return v, mc.Observe(err)
}
// SetSecurityPolicyForBetaGlobalBackendService sets the given
// SecurityPolicyReference for the BackendService identified by the given name.
func (gce *GCECloud) SetSecurityPolicyForBetaGlobalBackendService(backendServiceName string, securityPolicyReference *computebeta.SecurityPolicyReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContextWithVersion("set_security_policy", "", computeBetaVersion)
return mc.Observe(gce.c.BetaBackendServices().SetSecurityPolicy(context.Background(), meta.GlobalKey(backendServiceName), securityPolicyReference))
return mc.Observe(gce.c.BetaBackendServices().SetSecurityPolicy(ctx, meta.GlobalKey(backendServiceName), securityPolicyReference))
}
// SetSecurityPolicyForAlphaGlobalBackendService sets the given
// SecurityPolicyReference for the BackendService identified by the given name.
func (gce *GCECloud) SetSecurityPolicyForAlphaGlobalBackendService(backendServiceName string, securityPolicyReference *computealpha.SecurityPolicyReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newBackendServiceMetricContextWithVersion("set_security_policy", "", computeAlphaVersion)
return mc.Observe(gce.c.AlphaBackendServices().SetSecurityPolicy(context.Background(), meta.GlobalKey(backendServiceName), securityPolicyReference))
return mc.Observe(gce.c.AlphaBackendServices().SetSecurityPolicy(ctx, meta.GlobalKey(backendServiceName), securityPolicyReference))
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -31,15 +30,21 @@ func newCertMetricContext(request string) *metricContext {
// GetSslCertificate returns the SslCertificate by name.
func (gce *GCECloud) GetSslCertificate(name string) (*compute.SslCertificate, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newCertMetricContext("get")
v, err := gce.c.SslCertificates().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.SslCertificates().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// CreateSslCertificate creates and returns a SslCertificate.
func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*compute.SslCertificate, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newCertMetricContext("create")
err := gce.c.SslCertificates().Insert(context.Background(), meta.GlobalKey(sslCerts.Name), sslCerts)
err := gce.c.SslCertificates().Insert(ctx, meta.GlobalKey(sslCerts.Name), sslCerts)
if err != nil {
return nil, mc.Observe(err)
}
@ -48,13 +53,19 @@ func (gce *GCECloud) CreateSslCertificate(sslCerts *compute.SslCertificate) (*co
// DeleteSslCertificate deletes the SslCertificate by name.
func (gce *GCECloud) DeleteSslCertificate(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newCertMetricContext("delete")
return mc.Observe(gce.c.SslCertificates().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.SslCertificates().Delete(ctx, meta.GlobalKey(name)))
}
// ListSslCertificates lists all SslCertificates in the project.
func (gce *GCECloud) ListSslCertificates() ([]*compute.SslCertificate, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newCertMetricContext("list")
v, err := gce.c.SslCertificates().List(context.Background(), filter.None)
v, err := gce.c.SslCertificates().List(ctx, filter.None)
return v, mc.Observe(err)
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -30,25 +29,37 @@ func newFirewallMetricContext(request string) *metricContext {
// GetFirewall returns the Firewall by name.
func (gce *GCECloud) GetFirewall(name string) (*compute.Firewall, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newFirewallMetricContext("get")
v, err := gce.c.Firewalls().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.Firewalls().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// CreateFirewall creates the passed firewall
func (gce *GCECloud) CreateFirewall(f *compute.Firewall) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newFirewallMetricContext("create")
return mc.Observe(gce.c.Firewalls().Insert(context.Background(), meta.GlobalKey(f.Name), f))
return mc.Observe(gce.c.Firewalls().Insert(ctx, meta.GlobalKey(f.Name), f))
}
// DeleteFirewall deletes the given firewall rule.
func (gce *GCECloud) DeleteFirewall(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newFirewallMetricContext("delete")
return mc.Observe(gce.c.Firewalls().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.Firewalls().Delete(ctx, meta.GlobalKey(name)))
}
// UpdateFirewall applies the given firewall as an update to an existing service.
func (gce *GCECloud) UpdateFirewall(f *compute.Firewall) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newFirewallMetricContext("update")
return mc.Observe(gce.c.Firewalls().Update(context.Background(), meta.GlobalKey(f.Name), f))
return mc.Observe(gce.c.Firewalls().Update(ctx, meta.GlobalKey(f.Name), f))
}

View File

@ -17,8 +17,6 @@ limitations under the License.
package gce
import (
"context"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
@ -35,84 +33,120 @@ func newForwardingRuleMetricContextWithVersion(request, region, version string)
// CreateGlobalForwardingRule creates the passed GlobalForwardingRule
func (gce *GCECloud) CreateGlobalForwardingRule(rule *compute.ForwardingRule) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("create", "")
return mc.Observe(gce.c.GlobalForwardingRules().Insert(context.Background(), meta.GlobalKey(rule.Name), rule))
return mc.Observe(gce.c.GlobalForwardingRules().Insert(ctx, meta.GlobalKey(rule.Name), rule))
}
// SetProxyForGlobalForwardingRule links the given TargetHttp(s)Proxy with the given GlobalForwardingRule.
// targetProxyLink is the SelfLink of a TargetHttp(s)Proxy.
func (gce *GCECloud) SetProxyForGlobalForwardingRule(forwardingRuleName, targetProxyLink string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("set_proxy", "")
target := &compute.TargetReference{Target: targetProxyLink}
return mc.Observe(gce.c.GlobalForwardingRules().SetTarget(context.Background(), meta.GlobalKey(forwardingRuleName), target))
return mc.Observe(gce.c.GlobalForwardingRules().SetTarget(ctx, meta.GlobalKey(forwardingRuleName), target))
}
// DeleteGlobalForwardingRule deletes the GlobalForwardingRule by name.
func (gce *GCECloud) DeleteGlobalForwardingRule(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("delete", "")
return mc.Observe(gce.c.GlobalForwardingRules().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.GlobalForwardingRules().Delete(ctx, meta.GlobalKey(name)))
}
// GetGlobalForwardingRule returns the GlobalForwardingRule by name.
func (gce *GCECloud) GetGlobalForwardingRule(name string) (*compute.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("get", "")
v, err := gce.c.GlobalForwardingRules().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.GlobalForwardingRules().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// ListGlobalForwardingRules lists all GlobalForwardingRules in the project.
func (gce *GCECloud) ListGlobalForwardingRules() ([]*compute.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("list", "")
v, err := gce.c.GlobalForwardingRules().List(context.Background(), filter.None)
v, err := gce.c.GlobalForwardingRules().List(ctx, filter.None)
return v, mc.Observe(err)
}
// GetRegionForwardingRule returns the RegionalForwardingRule by name & region.
func (gce *GCECloud) GetRegionForwardingRule(name, region string) (*compute.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("get", region)
v, err := gce.c.ForwardingRules().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.ForwardingRules().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// GetAlphaRegionForwardingRule returns the Alpha forwarding rule by name & region.
func (gce *GCECloud) GetAlphaRegionForwardingRule(name, region string) (*computealpha.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContextWithVersion("get", region, computeAlphaVersion)
v, err := gce.c.AlphaForwardingRules().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.AlphaForwardingRules().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// ListRegionForwardingRules lists all RegionalForwardingRules in the project & region.
func (gce *GCECloud) ListRegionForwardingRules(region string) ([]*compute.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("list", region)
v, err := gce.c.ForwardingRules().List(context.Background(), region, filter.None)
v, err := gce.c.ForwardingRules().List(ctx, region, filter.None)
return v, mc.Observe(err)
}
// ListAlphaRegionForwardingRules lists all RegionalForwardingRules in the project & region.
func (gce *GCECloud) ListAlphaRegionForwardingRules(region string) ([]*computealpha.ForwardingRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContextWithVersion("list", region, computeAlphaVersion)
v, err := gce.c.AlphaForwardingRules().List(context.Background(), region, filter.None)
v, err := gce.c.AlphaForwardingRules().List(ctx, region, filter.None)
return v, mc.Observe(err)
}
// CreateRegionForwardingRule creates and returns a
// RegionalForwardingRule that points to the given BackendService
func (gce *GCECloud) CreateRegionForwardingRule(rule *compute.ForwardingRule, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("create", region)
return mc.Observe(gce.c.ForwardingRules().Insert(context.Background(), meta.RegionalKey(rule.Name, region), rule))
return mc.Observe(gce.c.ForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule))
}
// CreateAlphaRegionForwardingRule creates and returns an Alpha
// forwarding fule in the given region.
func (gce *GCECloud) CreateAlphaRegionForwardingRule(rule *computealpha.ForwardingRule, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContextWithVersion("create", region, computeAlphaVersion)
return mc.Observe(gce.c.AlphaForwardingRules().Insert(context.Background(), meta.RegionalKey(rule.Name, region), rule))
return mc.Observe(gce.c.AlphaForwardingRules().Insert(ctx, meta.RegionalKey(rule.Name, region), rule))
}
// DeleteRegionForwardingRule deletes the RegionalForwardingRule by name & region.
func (gce *GCECloud) DeleteRegionForwardingRule(name, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newForwardingRuleMetricContext("delete", region)
return mc.Observe(gce.c.ForwardingRules().Delete(context.Background(), meta.RegionalKey(name, region)))
return mc.Observe(gce.c.ForwardingRules().Delete(ctx, meta.RegionalKey(name, region)))
}
// TODO(#51665): retire this function once Network Tiers becomes Beta in GCP.

View File

@ -17,14 +17,13 @@ limitations under the License.
package gce
import (
"context"
"github.com/golang/glog"
computealpha "google.golang.org/api/compute/v0.alpha"
compute "google.golang.org/api/compute/v1"
"k8s.io/api/core/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
"k8s.io/kubernetes/pkg/master/ports"
@ -58,33 +57,48 @@ func newHealthcheckMetricContextWithVersion(request, version string) *metricCont
// GetHttpHealthCheck returns the given HttpHealthCheck by name.
func (gce *GCECloud) GetHttpHealthCheck(name string) (*compute.HttpHealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("get_legacy")
v, err := gce.c.HttpHealthChecks().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.HttpHealthChecks().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// UpdateHttpHealthCheck applies the given HttpHealthCheck as an update.
func (gce *GCECloud) UpdateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("update_legacy")
return mc.Observe(gce.c.HttpHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HttpHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc))
}
// DeleteHttpHealthCheck deletes the given HttpHealthCheck by name.
func (gce *GCECloud) DeleteHttpHealthCheck(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("delete_legacy")
return mc.Observe(gce.c.HttpHealthChecks().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.HttpHealthChecks().Delete(ctx, meta.GlobalKey(name)))
}
// CreateHttpHealthCheck creates the given HttpHealthCheck.
func (gce *GCECloud) CreateHttpHealthCheck(hc *compute.HttpHealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("create_legacy")
return mc.Observe(gce.c.HttpHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HttpHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc))
}
// ListHttpHealthChecks lists all HttpHealthChecks in the project.
func (gce *GCECloud) ListHttpHealthChecks() ([]*compute.HttpHealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("list_legacy")
v, err := gce.c.HttpHealthChecks().List(context.Background(), filter.None)
v, err := gce.c.HttpHealthChecks().List(ctx, filter.None)
return v, mc.Observe(err)
}
@ -92,33 +106,48 @@ func (gce *GCECloud) ListHttpHealthChecks() ([]*compute.HttpHealthCheck, error)
// GetHttpsHealthCheck returns the given HttpsHealthCheck by name.
func (gce *GCECloud) GetHttpsHealthCheck(name string) (*compute.HttpsHealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("get_legacy")
v, err := gce.c.HttpsHealthChecks().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.HttpsHealthChecks().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// UpdateHttpsHealthCheck applies the given HttpsHealthCheck as an update.
func (gce *GCECloud) UpdateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("update_legacy")
return mc.Observe(gce.c.HttpsHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HttpsHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc))
}
// DeleteHttpsHealthCheck deletes the given HttpsHealthCheck by name.
func (gce *GCECloud) DeleteHttpsHealthCheck(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("delete_legacy")
return mc.Observe(gce.c.HttpsHealthChecks().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.HttpsHealthChecks().Delete(ctx, meta.GlobalKey(name)))
}
// CreateHttpsHealthCheck creates the given HttpsHealthCheck.
func (gce *GCECloud) CreateHttpsHealthCheck(hc *compute.HttpsHealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("create_legacy")
return mc.Observe(gce.c.HttpsHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HttpsHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc))
}
// ListHttpsHealthChecks lists all HttpsHealthChecks in the project.
func (gce *GCECloud) ListHttpsHealthChecks() ([]*compute.HttpsHealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("list_legacy")
v, err := gce.c.HttpsHealthChecks().List(context.Background(), filter.None)
v, err := gce.c.HttpsHealthChecks().List(ctx, filter.None)
return v, mc.Observe(err)
}
@ -126,52 +155,76 @@ func (gce *GCECloud) ListHttpsHealthChecks() ([]*compute.HttpsHealthCheck, error
// GetHealthCheck returns the given HealthCheck by name.
func (gce *GCECloud) GetHealthCheck(name string) (*compute.HealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("get")
v, err := gce.c.HealthChecks().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.HealthChecks().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// GetAlphaHealthCheck returns the given alpha HealthCheck by name.
func (gce *GCECloud) GetAlphaHealthCheck(name string) (*computealpha.HealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContextWithVersion("get", computeAlphaVersion)
v, err := gce.c.AlphaHealthChecks().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.AlphaHealthChecks().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// UpdateHealthCheck applies the given HealthCheck as an update.
func (gce *GCECloud) UpdateHealthCheck(hc *compute.HealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("update")
return mc.Observe(gce.c.HealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc))
}
// UpdateAlphaHealthCheck applies the given alpha HealthCheck as an update.
func (gce *GCECloud) UpdateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContextWithVersion("update", computeAlphaVersion)
return mc.Observe(gce.c.AlphaHealthChecks().Update(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.AlphaHealthChecks().Update(ctx, meta.GlobalKey(hc.Name), hc))
}
// DeleteHealthCheck deletes the given HealthCheck by name.
func (gce *GCECloud) DeleteHealthCheck(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("delete")
return mc.Observe(gce.c.HealthChecks().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.HealthChecks().Delete(ctx, meta.GlobalKey(name)))
}
// CreateHealthCheck creates the given HealthCheck.
func (gce *GCECloud) CreateHealthCheck(hc *compute.HealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("create")
return mc.Observe(gce.c.HealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.HealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc))
}
// CreateAlphaHealthCheck creates the given alpha HealthCheck.
func (gce *GCECloud) CreateAlphaHealthCheck(hc *computealpha.HealthCheck) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContextWithVersion("create", computeAlphaVersion)
return mc.Observe(gce.c.AlphaHealthChecks().Insert(context.Background(), meta.GlobalKey(hc.Name), hc))
return mc.Observe(gce.c.AlphaHealthChecks().Insert(ctx, meta.GlobalKey(hc.Name), hc))
}
// ListHealthChecks lists all HealthCheck in the project.
func (gce *GCECloud) ListHealthChecks() ([]*compute.HealthCheck, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newHealthcheckMetricContext("list")
v, err := gce.c.HealthChecks().List(context.Background(), filter.None)
v, err := gce.c.HealthChecks().List(ctx, filter.None)
return v, mc.Observe(err)
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -32,36 +31,51 @@ func newInstanceGroupMetricContext(request string, zone string) *metricContext {
// CreateInstanceGroup creates an instance group with the given
// instances. It is the callers responsibility to add named ports.
func (gce *GCECloud) CreateInstanceGroup(ig *compute.InstanceGroup, zone string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("create", zone)
return mc.Observe(gce.c.InstanceGroups().Insert(context.Background(), meta.ZonalKey(ig.Name, zone), ig))
return mc.Observe(gce.c.InstanceGroups().Insert(ctx, meta.ZonalKey(ig.Name, zone), ig))
}
// DeleteInstanceGroup deletes an instance group.
func (gce *GCECloud) DeleteInstanceGroup(name string, zone string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("delete", zone)
return mc.Observe(gce.c.InstanceGroups().Delete(context.Background(), meta.ZonalKey(name, zone)))
return mc.Observe(gce.c.InstanceGroups().Delete(ctx, meta.ZonalKey(name, zone)))
}
// ListInstanceGroups lists all InstanceGroups in the project and
// zone.
func (gce *GCECloud) ListInstanceGroups(zone string) ([]*compute.InstanceGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("list", zone)
v, err := gce.c.InstanceGroups().List(context.Background(), zone, filter.None)
v, err := gce.c.InstanceGroups().List(ctx, zone, filter.None)
return v, mc.Observe(err)
}
// ListInstancesInInstanceGroup lists all the instances in a given
// instance group and state.
func (gce *GCECloud) ListInstancesInInstanceGroup(name string, zone string, state string) ([]*compute.InstanceWithNamedPorts, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("list_instances", zone)
req := &compute.InstanceGroupsListInstancesRequest{InstanceState: state}
v, err := gce.c.InstanceGroups().ListInstances(context.Background(), meta.ZonalKey(name, zone), req, filter.None)
v, err := gce.c.InstanceGroups().ListInstances(ctx, meta.ZonalKey(name, zone), req, filter.None)
return v, mc.Observe(err)
}
// AddInstancesToInstanceGroup adds the given instances to the given
// instance group.
func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("add_instances", zone)
// TODO: should cull operation above this layer.
if len(instanceRefs) == 0 {
@ -70,12 +84,15 @@ func (gce *GCECloud) AddInstancesToInstanceGroup(name string, zone string, insta
req := &compute.InstanceGroupsAddInstancesRequest{
Instances: instanceRefs,
}
return mc.Observe(gce.c.InstanceGroups().AddInstances(context.Background(), meta.ZonalKey(name, zone), req))
return mc.Observe(gce.c.InstanceGroups().AddInstances(ctx, meta.ZonalKey(name, zone), req))
}
// RemoveInstancesFromInstanceGroup removes the given instances from
// the instance group.
func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string, instanceRefs []*compute.InstanceReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("remove_instances", zone)
// TODO: should cull operation above this layer.
if len(instanceRefs) == 0 {
@ -84,19 +101,25 @@ func (gce *GCECloud) RemoveInstancesFromInstanceGroup(name string, zone string,
req := &compute.InstanceGroupsRemoveInstancesRequest{
Instances: instanceRefs,
}
return mc.Observe(gce.c.InstanceGroups().RemoveInstances(context.Background(), meta.ZonalKey(name, zone), req))
return mc.Observe(gce.c.InstanceGroups().RemoveInstances(ctx, meta.ZonalKey(name, zone), req))
}
// SetNamedPortsOfInstanceGroup sets the list of named ports on a given instance group
func (gce *GCECloud) SetNamedPortsOfInstanceGroup(igName, zone string, namedPorts []*compute.NamedPort) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("set_namedports", zone)
req := &compute.InstanceGroupsSetNamedPortsRequest{NamedPorts: namedPorts}
return mc.Observe(gce.c.InstanceGroups().SetNamedPorts(context.Background(), meta.ZonalKey(igName, zone), req))
return mc.Observe(gce.c.InstanceGroups().SetNamedPorts(ctx, meta.ZonalKey(igName, zone), req))
}
// GetInstanceGroup returns an instance group by name.
func (gce *GCECloud) GetInstanceGroup(name string, zone string) (*compute.InstanceGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstanceGroupMetricContext("get", zone)
v, err := gce.c.InstanceGroups().Get(context.Background(), meta.ZonalKey(name, zone))
v, err := gce.c.InstanceGroups().Get(ctx, meta.ZonalKey(name, zone))
return v, mc.Observe(err)
}

View File

@ -34,6 +34,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
kubeletapis "k8s.io/kubernetes/pkg/kubelet/apis"
@ -99,12 +100,15 @@ func (gce *GCECloud) NodeAddresses(_ context.Context, _ types.NodeName) ([]v1.No
// NodeAddressesByProviderID will not be called from the node that is requesting this ID.
// i.e. metadata service and other local methods cannot be used here
func (gce *GCECloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
_, zone, name, err := splitProviderID(providerID)
if err != nil {
return []v1.NodeAddress{}, err
}
instance, err := gce.c.Instances().Get(context.Background(), meta.ZonalKey(canonicalizeInstanceName(name), zone))
instance, err := gce.c.Instances().Get(ctx, meta.ZonalKey(canonicalizeInstanceName(name), zone))
if err != nil {
return []v1.NodeAddress{}, fmt.Errorf("error while querying for providerID %q: %v", providerID, err)
}
@ -212,8 +216,11 @@ func (gce *GCECloud) InstanceType(ctx context.Context, nodeName types.NodeName)
}
func (gce *GCECloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) {
project, err := gce.c.Projects().Get(context.Background(), gce.projectID)
project, err := gce.c.Projects().Get(ctx, gce.projectID)
if err != nil {
glog.Errorf("Could not get project: %v", err)
return false, nil
@ -244,7 +251,7 @@ func (gce *GCECloud) AddSSHKeyToAllInstances(ctx context.Context, user string, k
}
mc := newInstancesMetricContext("add_ssh_key", "")
err = gce.c.Projects().SetCommonInstanceMetadata(context.Background(), gce.projectID, project.CommonInstanceMetadata)
err = gce.c.Projects().SetCommonInstanceMetadata(ctx, gce.projectID, project.CommonInstanceMetadata)
mc.Observe(err)
if err != nil {
@ -284,9 +291,12 @@ func (gce *GCECloud) GetAllCurrentZones() (sets.String, error) {
//
// TODO: this should be removed from the cloud provider.
func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
zones := sets.NewString()
for _, zone := range gce.managedZones {
instances, err := gce.c.Instances().List(context.Background(), zone, filter.None)
instances, err := gce.c.Instances().List(ctx, zone, filter.None)
if err != nil {
return sets.NewString(), err
}
@ -299,15 +309,21 @@ func (gce *GCECloud) GetAllZonesFromCloudProvider() (sets.String, error) {
// InsertInstance creates a new instance on GCP
func (gce *GCECloud) InsertInstance(project string, zone string, i *compute.Instance) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newInstancesMetricContext("create", zone)
return mc.Observe(gce.c.Instances().Insert(context.Background(), meta.ZonalKey(i.Name, zone), i))
return mc.Observe(gce.c.Instances().Insert(ctx, meta.ZonalKey(i.Name, zone), i))
}
// ListInstanceNames returns a string of instance names separated by spaces.
// This method should only be used for e2e testing.
// TODO: remove this method.
func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) {
l, err := gce.c.Instances().List(context.Background(), zone, filter.None)
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
l, err := gce.c.Instances().List(ctx, zone, filter.None)
if err != nil {
return "", err
}
@ -320,7 +336,10 @@ func (gce *GCECloud) ListInstanceNames(project, zone string) (string, error) {
// DeleteInstance deletes an instance specified by project, zone, and name
func (gce *GCECloud) DeleteInstance(project, zone, name string) error {
return gce.c.Instances().Delete(context.Background(), meta.ZonalKey(name, zone))
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
return gce.c.Instances().Delete(ctx, meta.ZonalKey(name, zone))
}
// Implementation of Instances.CurrentNodeName
@ -332,6 +351,9 @@ func (gce *GCECloud) CurrentNodeName(ctx context.Context, hostname string) (type
// `node` for allocation to pods. Returns a list of the form
// "<ip>/<netmask>".
func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
var instance *gceInstance
instance, err = gce.getInstanceByName(mapNodeNameToInstanceName(nodeName))
if err != nil {
@ -339,7 +361,7 @@ func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err e
}
var res *computebeta.Instance
res, err = gce.c.BetaInstances().Get(context.Background(), meta.ZonalKey(instance.Name, lastComponent(instance.Zone)))
res, err = gce.c.BetaInstances().Get(ctx, meta.ZonalKey(instance.Name, lastComponent(instance.Zone)))
if err != nil {
return
}
@ -355,12 +377,14 @@ func (gce *GCECloud) AliasRanges(nodeName types.NodeName) (cidrs []string, err e
// AddAliasToInstance adds an alias to the given instance from the named
// secondary range.
func (gce *GCECloud) AddAliasToInstance(nodeName types.NodeName, alias *net.IPNet) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
v1instance, err := gce.getInstanceByName(mapNodeNameToInstanceName(nodeName))
if err != nil {
return err
}
instance, err := gce.c.BetaInstances().Get(context.Background(), meta.ZonalKey(v1instance.Name, lastComponent(v1instance.Zone)))
instance, err := gce.c.BetaInstances().Get(ctx, meta.ZonalKey(v1instance.Name, lastComponent(v1instance.Zone)))
if err != nil {
return err
}
@ -383,13 +407,16 @@ func (gce *GCECloud) AddAliasToInstance(nodeName types.NodeName, alias *net.IPNe
})
mc := newInstancesMetricContext("add_alias", v1instance.Zone)
err = gce.c.BetaInstances().UpdateNetworkInterface(context.Background(), meta.ZonalKey(instance.Name, lastComponent(instance.Zone)), iface.Name, iface)
err = gce.c.BetaInstances().UpdateNetworkInterface(ctx, meta.ZonalKey(instance.Name, lastComponent(instance.Zone)), iface.Name, iface)
return mc.Observe(err)
}
// Gets the named instances, returning cloudprovider.InstanceNotFound if any
// instance is not found
func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
found := map[string]*gceInstance{}
remaining := len(names)
@ -407,7 +434,7 @@ func (gce *GCECloud) getInstancesByNames(names []string) ([]*gceInstance, error)
if remaining == 0 {
break
}
instances, err := gce.c.Instances().List(context.Background(), zone, filter.Regexp("name", nodeInstancePrefix+".*"))
instances, err := gce.c.Instances().List(ctx, zone, filter.Regexp("name", nodeInstancePrefix+".*"))
if err != nil {
return nil, err
}
@ -471,9 +498,12 @@ func (gce *GCECloud) getInstanceByName(name string) (*gceInstance, error) {
}
func (gce *GCECloud) getInstanceFromProjectInZoneByName(project, zone, name string) (*gceInstance, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
name = canonicalizeInstanceName(name)
mc := newInstancesMetricContext("get", zone)
res, err := gce.c.Instances().Get(context.Background(), meta.ZonalKey(name, zone))
res, err := gce.c.Instances().Get(ctx, meta.ZonalKey(name, zone))
mc.Observe(err)
if err != nil {
return nil, err
@ -532,6 +562,9 @@ func (gce *GCECloud) isCurrentInstance(instanceID string) bool {
// format of the host names in the cluster. Only use it as a fallback if
// gce.nodeTags is unspecified
func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
// TODO: We could store the tags in gceInstance, so we could have already fetched it
hostNamesByZone := make(map[string]map[string]bool) // map of zones -> map of names -> bool (for easy lookup)
nodeInstancePrefix := gce.nodeInstancePrefix
@ -556,7 +589,7 @@ func (gce *GCECloud) computeHostTags(hosts []*gceInstance) ([]string, error) {
filt = filter.Regexp("name", nodeInstancePrefix+".*")
}
for zone, hostNames := range hostNamesByZone {
instances, err := gce.c.Instances().List(context.Background(), zone, filt)
instances, err := gce.c.Instances().List(ctx, zone, filt)
if err != nil {
return nil, err
}

View File

@ -17,12 +17,12 @@ limitations under the License.
package gce
import (
"context"
"fmt"
"strings"
computealpha "google.golang.org/api/compute/v0.alpha"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -37,31 +37,40 @@ func newNetworkEndpointGroupMetricContext(request string, zone string) *metricCo
}
func (gce *GCECloud) GetNetworkEndpointGroup(name string, zone string) (*computealpha.NetworkEndpointGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("get", zone)
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, mc.Observe(err)
}
v, err := gce.c.AlphaNetworkEndpointGroups().Get(context.Background(), meta.ZonalKey(name, zone))
v, err := gce.c.AlphaNetworkEndpointGroups().Get(ctx, meta.ZonalKey(name, zone))
return v, mc.Observe(err)
}
func (gce *GCECloud) ListNetworkEndpointGroup(zone string) ([]*computealpha.NetworkEndpointGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("list", zone)
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, mc.Observe(err)
}
negs, err := gce.c.AlphaNetworkEndpointGroups().List(context.Background(), zone, filter.None)
negs, err := gce.c.AlphaNetworkEndpointGroups().List(ctx, zone, filter.None)
return negs, mc.Observe(err)
}
// AggregatedListNetworkEndpointGroup returns a map of zone -> endpoint group.
func (gce *GCECloud) AggregatedListNetworkEndpointGroup() (map[string][]*computealpha.NetworkEndpointGroup, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("aggregated_list", "")
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, mc.Observe(err)
}
// TODO: filter for the region the cluster is in.
all, err := gce.c.AlphaNetworkEndpointGroups().AggregatedList(context.Background(), filter.None)
all, err := gce.c.AlphaNetworkEndpointGroups().AggregatedList(ctx, filter.None)
if err != nil {
return nil, mc.Observe(err)
}
@ -79,22 +88,31 @@ func (gce *GCECloud) AggregatedListNetworkEndpointGroup() (map[string][]*compute
}
func (gce *GCECloud) CreateNetworkEndpointGroup(neg *computealpha.NetworkEndpointGroup, zone string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("create", zone)
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Insert(context.Background(), meta.ZonalKey(neg.Name, zone), neg))
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Insert(ctx, meta.ZonalKey(neg.Name, zone), neg))
}
func (gce *GCECloud) DeleteNetworkEndpointGroup(name string, zone string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return err
}
mc := newNetworkEndpointGroupMetricContext("delete", zone)
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Delete(context.Background(), meta.ZonalKey(name, zone)))
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().Delete(ctx, meta.ZonalKey(name, zone)))
}
func (gce *GCECloud) AttachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("attach", zone)
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return mc.Observe(err)
@ -102,10 +120,13 @@ func (gce *GCECloud) AttachNetworkEndpoints(name, zone string, endpoints []*comp
req := &computealpha.NetworkEndpointGroupsAttachEndpointsRequest{
NetworkEndpoints: endpoints,
}
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().AttachNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req))
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().AttachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req))
}
func (gce *GCECloud) DetachNetworkEndpoints(name, zone string, endpoints []*computealpha.NetworkEndpoint) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("detach", zone)
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return mc.Observe(err)
@ -113,10 +134,13 @@ func (gce *GCECloud) DetachNetworkEndpoints(name, zone string, endpoints []*comp
req := &computealpha.NetworkEndpointGroupsDetachEndpointsRequest{
NetworkEndpoints: endpoints,
}
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().DetachNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req))
return mc.Observe(gce.c.AlphaNetworkEndpointGroups().DetachNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req))
}
func (gce *GCECloud) ListNetworkEndpoints(name, zone string, showHealthStatus bool) ([]*computealpha.NetworkEndpointWithHealthStatus, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newNetworkEndpointGroupMetricContext("list_networkendpoints", zone)
if err := gce.alphaFeatureEnabled(AlphaFeatureNetworkEndpointGroup); err != nil {
return nil, mc.Observe(err)
@ -128,6 +152,6 @@ func (gce *GCECloud) ListNetworkEndpoints(name, zone string, showHealthStatus bo
req := &computealpha.NetworkEndpointGroupsListEndpointsRequest{
HealthStatus: healthStatus,
}
l, err := gce.c.AlphaNetworkEndpointGroups().ListNetworkEndpoints(context.Background(), meta.ZonalKey(name, zone), req, filter.None)
l, err := gce.c.AlphaNetworkEndpointGroups().ListNetworkEndpoints(ctx, meta.ZonalKey(name, zone), req, filter.None)
return l, mc.Observe(err)
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -37,10 +38,13 @@ func newRoutesMetricContext(request string) *metricContext {
// ListRoutes in the cloud environment.
func (gce *GCECloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newRoutesMetricContext("list")
prefix := truncateClusterName(clusterName)
f := filter.Regexp("name", prefix+"-.*").AndRegexp("network", gce.NetworkURL()).AndRegexp("description", k8sNodeRouteTag)
routes, err := gce.c.Routes().List(context.Background(), f)
routes, err := gce.c.Routes().List(ctx, f)
if err != nil {
return nil, mc.Observe(err)
}
@ -60,6 +64,9 @@ func (gce *GCECloud) ListRoutes(ctx context.Context, clusterName string) ([]*clo
// CreateRoute in the cloud environment.
func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newRoutesMetricContext("create")
targetInstance, err := gce.getInstanceByName(mapNodeNameToInstanceName(route.TargetNode))
@ -74,7 +81,7 @@ func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHi
Priority: 1000,
Description: k8sNodeRouteTag,
}
err = gce.c.Routes().Insert(context.Background(), meta.GlobalKey(cr.Name), cr)
err = gce.c.Routes().Insert(ctx, meta.GlobalKey(cr.Name), cr)
if isHTTPErrorCode(err, http.StatusConflict) {
glog.Infof("Route %q already exists.", cr.Name)
err = nil
@ -84,8 +91,11 @@ func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHi
// DeleteRoute from the cloud environment.
func (gce *GCECloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newRoutesMetricContext("delete")
return mc.Observe(gce.c.Routes().Delete(context.Background(), meta.GlobalKey(route.Name)))
return mc.Observe(gce.c.Routes().Delete(ctx, meta.GlobalKey(route.Name)))
}
func truncateClusterName(clusterName string) string {

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
computebeta "google.golang.org/api/compute/v0.beta"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -31,60 +30,87 @@ func newSecurityPolicyMetricContextWithVersion(request, version string) *metricC
// GetBetaSecurityPolicy retrieves a security policy.
func (gce *GCECloud) GetBetaSecurityPolicy(name string) (*computebeta.SecurityPolicy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("get", computeBetaVersion)
v, err := gce.c.BetaSecurityPolicies().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.BetaSecurityPolicies().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// ListBetaSecurityPolicy lists all security policies in the project.
func (gce *GCECloud) ListBetaSecurityPolicy() ([]*computebeta.SecurityPolicy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("list", computeBetaVersion)
v, err := gce.c.BetaSecurityPolicies().List(context.Background(), filter.None)
v, err := gce.c.BetaSecurityPolicies().List(ctx, filter.None)
return v, mc.Observe(err)
}
// CreateBetaSecurityPolicy creates the given security policy.
func (gce *GCECloud) CreateBetaSecurityPolicy(sp *computebeta.SecurityPolicy) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("create", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().Insert(context.Background(), meta.GlobalKey(sp.Name), sp))
return mc.Observe(gce.c.BetaSecurityPolicies().Insert(ctx, meta.GlobalKey(sp.Name), sp))
}
// DeleteBetaSecurityPolicy deletes the given security policy.
func (gce *GCECloud) DeleteBetaSecurityPolicy(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("delete", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.BetaSecurityPolicies().Delete(ctx, meta.GlobalKey(name)))
}
// PatchBetaSecurityPolicy applies the given security policy as a
// patch to an existing security policy.
func (gce *GCECloud) PatchBetaSecurityPolicy(sp *computebeta.SecurityPolicy) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("patch", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().Patch(context.Background(), meta.GlobalKey(sp.Name), sp))
return mc.Observe(gce.c.BetaSecurityPolicies().Patch(ctx, meta.GlobalKey(sp.Name), sp))
}
// GetRuleForBetaSecurityPolicy gets rule from a security policy.
func (gce *GCECloud) GetRuleForBetaSecurityPolicy(name string) (*computebeta.SecurityPolicyRule, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("get_rule", computeBetaVersion)
v, err := gce.c.BetaSecurityPolicies().GetRule(context.Background(), meta.GlobalKey(name))
v, err := gce.c.BetaSecurityPolicies().GetRule(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// AddRuletoBetaSecurityPolicy adds the given security policy rule to
// a security policy.
func (gce *GCECloud) AddRuletoBetaSecurityPolicy(name string, spr *computebeta.SecurityPolicyRule) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("add_rule", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().AddRule(context.Background(), meta.GlobalKey(name), spr))
return mc.Observe(gce.c.BetaSecurityPolicies().AddRule(ctx, meta.GlobalKey(name), spr))
}
// PatchRuleForBetaSecurityPolicy patches the given security policy
// rule to a security policy.
func (gce *GCECloud) PatchRuleForBetaSecurityPolicy(name string, spr *computebeta.SecurityPolicyRule) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("patch_rule", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().PatchRule(context.Background(), meta.GlobalKey(name), spr))
return mc.Observe(gce.c.BetaSecurityPolicies().PatchRule(ctx, meta.GlobalKey(name), spr))
}
// RemoveRuleFromBetaSecurityPolicy removes rule from a security policy.
func (gce *GCECloud) RemoveRuleFromBetaSecurityPolicy(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newSecurityPolicyMetricContextWithVersion("remove_rule", computeBetaVersion)
return mc.Observe(gce.c.BetaSecurityPolicies().RemoveRule(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.BetaSecurityPolicies().RemoveRule(ctx, meta.GlobalKey(name)))
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -30,37 +29,52 @@ func newTargetPoolMetricContext(request, region string) *metricContext {
// GetTargetPool returns the TargetPool by name.
func (gce *GCECloud) GetTargetPool(name, region string) (*compute.TargetPool, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetPoolMetricContext("get", region)
v, err := gce.c.TargetPools().Get(context.Background(), meta.RegionalKey(name, region))
v, err := gce.c.TargetPools().Get(ctx, meta.RegionalKey(name, region))
return v, mc.Observe(err)
}
// CreateTargetPool creates the passed TargetPool
func (gce *GCECloud) CreateTargetPool(tp *compute.TargetPool, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetPoolMetricContext("create", region)
return mc.Observe(gce.c.TargetPools().Insert(context.Background(), meta.RegionalKey(tp.Name, region), tp))
return mc.Observe(gce.c.TargetPools().Insert(ctx, meta.RegionalKey(tp.Name, region), tp))
}
// DeleteTargetPool deletes TargetPool by name.
func (gce *GCECloud) DeleteTargetPool(name, region string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetPoolMetricContext("delete", region)
return mc.Observe(gce.c.TargetPools().Delete(context.Background(), meta.RegionalKey(name, region)))
return mc.Observe(gce.c.TargetPools().Delete(ctx, meta.RegionalKey(name, region)))
}
// AddInstancesToTargetPool adds instances by link to the TargetPool
func (gce *GCECloud) AddInstancesToTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
req := &compute.TargetPoolsAddInstanceRequest{
Instances: instanceRefs,
}
mc := newTargetPoolMetricContext("add_instances", region)
return mc.Observe(gce.c.TargetPools().AddInstance(context.Background(), meta.RegionalKey(name, region), req))
return mc.Observe(gce.c.TargetPools().AddInstance(ctx, meta.RegionalKey(name, region), req))
}
// RemoveInstancesFromTargetPool removes instances by link to the TargetPool
func (gce *GCECloud) RemoveInstancesFromTargetPool(name, region string, instanceRefs []*compute.InstanceReference) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
req := &compute.TargetPoolsRemoveInstanceRequest{
Instances: instanceRefs,
}
mc := newTargetPoolMetricContext("remove_instances", region)
return mc.Observe(gce.c.TargetPools().RemoveInstance(context.Background(), meta.RegionalKey(name, region), req))
return mc.Observe(gce.c.TargetPools().RemoveInstance(ctx, meta.RegionalKey(name, region), req))
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -31,34 +30,49 @@ func newTargetProxyMetricContext(request string) *metricContext {
// GetTargetHttpProxy returns the UrlMap by name.
func (gce *GCECloud) GetTargetHttpProxy(name string) (*compute.TargetHttpProxy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("get")
v, err := gce.c.TargetHttpProxies().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.TargetHttpProxies().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// CreateTargetHttpProxy creates a TargetHttpProxy
func (gce *GCECloud) CreateTargetHttpProxy(proxy *compute.TargetHttpProxy) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("create")
return mc.Observe(gce.c.TargetHttpProxies().Insert(context.Background(), meta.GlobalKey(proxy.Name), proxy))
return mc.Observe(gce.c.TargetHttpProxies().Insert(ctx, meta.GlobalKey(proxy.Name), proxy))
}
// SetUrlMapForTargetHttpProxy sets the given UrlMap for the given TargetHttpProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpProxy(proxy *compute.TargetHttpProxy, urlMap *compute.UrlMap) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
ref := &compute.UrlMapReference{UrlMap: urlMap.SelfLink}
mc := newTargetProxyMetricContext("set_url_map")
return mc.Observe(gce.c.TargetHttpProxies().SetUrlMap(context.Background(), meta.GlobalKey(proxy.Name), ref))
return mc.Observe(gce.c.TargetHttpProxies().SetUrlMap(ctx, meta.GlobalKey(proxy.Name), ref))
}
// DeleteTargetHttpProxy deletes the TargetHttpProxy by name.
func (gce *GCECloud) DeleteTargetHttpProxy(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("delete")
return mc.Observe(gce.c.TargetHttpProxies().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.TargetHttpProxies().Delete(ctx, meta.GlobalKey(name)))
}
// ListTargetHttpProxies lists all TargetHttpProxies in the project.
func (gce *GCECloud) ListTargetHttpProxies() ([]*compute.TargetHttpProxy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("list")
v, err := gce.c.TargetHttpProxies().List(context.Background(), filter.None)
v, err := gce.c.TargetHttpProxies().List(ctx, filter.None)
return v, mc.Observe(err)
}
@ -66,42 +80,60 @@ func (gce *GCECloud) ListTargetHttpProxies() ([]*compute.TargetHttpProxy, error)
// GetTargetHttpsProxy returns the UrlMap by name.
func (gce *GCECloud) GetTargetHttpsProxy(name string) (*compute.TargetHttpsProxy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("get")
v, err := gce.c.TargetHttpsProxies().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.TargetHttpsProxies().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// CreateTargetHttpsProxy creates a TargetHttpsProxy
func (gce *GCECloud) CreateTargetHttpsProxy(proxy *compute.TargetHttpsProxy) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("create")
return mc.Observe(gce.c.TargetHttpsProxies().Insert(context.Background(), meta.GlobalKey(proxy.Name), proxy))
return mc.Observe(gce.c.TargetHttpsProxies().Insert(ctx, meta.GlobalKey(proxy.Name), proxy))
}
// SetUrlMapForTargetHttpsProxy sets the given UrlMap for the given TargetHttpsProxy.
func (gce *GCECloud) SetUrlMapForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, urlMap *compute.UrlMap) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("set_url_map")
ref := &compute.UrlMapReference{UrlMap: urlMap.SelfLink}
return mc.Observe(gce.c.TargetHttpsProxies().SetUrlMap(context.Background(), meta.GlobalKey(proxy.Name), ref))
return mc.Observe(gce.c.TargetHttpsProxies().SetUrlMap(ctx, meta.GlobalKey(proxy.Name), ref))
}
// SetSslCertificateForTargetHttpsProxy sets the given SslCertificate for the given TargetHttpsProxy.
func (gce *GCECloud) SetSslCertificateForTargetHttpsProxy(proxy *compute.TargetHttpsProxy, sslCertURLs []string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("set_ssl_cert")
req := &compute.TargetHttpsProxiesSetSslCertificatesRequest{
SslCertificates: sslCertURLs,
}
return mc.Observe(gce.c.TargetHttpsProxies().SetSslCertificates(context.Background(), meta.GlobalKey(proxy.Name), req))
return mc.Observe(gce.c.TargetHttpsProxies().SetSslCertificates(ctx, meta.GlobalKey(proxy.Name), req))
}
// DeleteTargetHttpsProxy deletes the TargetHttpsProxy by name.
func (gce *GCECloud) DeleteTargetHttpsProxy(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("delete")
return mc.Observe(gce.c.TargetHttpsProxies().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.TargetHttpsProxies().Delete(ctx, meta.GlobalKey(name)))
}
// ListTargetHttpsProxies lists all TargetHttpsProxies in the project.
func (gce *GCECloud) ListTargetHttpsProxies() ([]*compute.TargetHttpsProxy, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newTargetProxyMetricContext("list")
v, err := gce.c.TargetHttpsProxies().List(context.Background(), filter.None)
v, err := gce.c.TargetHttpsProxies().List(ctx, filter.None)
return v, mc.Observe(err)
}

View File

@ -17,10 +17,9 @@ limitations under the License.
package gce
import (
"context"
compute "google.golang.org/api/compute/v1"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/meta"
)
@ -31,32 +30,47 @@ func newUrlMapMetricContext(request string) *metricContext {
// GetUrlMap returns the UrlMap by name.
func (gce *GCECloud) GetUrlMap(name string) (*compute.UrlMap, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newUrlMapMetricContext("get")
v, err := gce.c.UrlMaps().Get(context.Background(), meta.GlobalKey(name))
v, err := gce.c.UrlMaps().Get(ctx, meta.GlobalKey(name))
return v, mc.Observe(err)
}
// CreateUrlMap creates a url map
func (gce *GCECloud) CreateUrlMap(urlMap *compute.UrlMap) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newUrlMapMetricContext("create")
return mc.Observe(gce.c.UrlMaps().Insert(context.Background(), meta.GlobalKey(urlMap.Name), urlMap))
return mc.Observe(gce.c.UrlMaps().Insert(ctx, meta.GlobalKey(urlMap.Name), urlMap))
}
// UpdateUrlMap applies the given UrlMap as an update
func (gce *GCECloud) UpdateUrlMap(urlMap *compute.UrlMap) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newUrlMapMetricContext("update")
return mc.Observe(gce.c.UrlMaps().Update(context.Background(), meta.GlobalKey(urlMap.Name), urlMap))
return mc.Observe(gce.c.UrlMaps().Update(ctx, meta.GlobalKey(urlMap.Name), urlMap))
}
// DeleteUrlMap deletes a url map by name.
func (gce *GCECloud) DeleteUrlMap(name string) error {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newUrlMapMetricContext("delete")
return mc.Observe(gce.c.UrlMaps().Delete(context.Background(), meta.GlobalKey(name)))
return mc.Observe(gce.c.UrlMaps().Delete(ctx, meta.GlobalKey(name)))
}
// ListUrlMaps lists all UrlMaps in the project.
func (gce *GCECloud) ListUrlMaps() ([]*compute.UrlMap, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newUrlMapMetricContext("list")
v, err := gce.c.UrlMaps().List(context.Background(), filter.None)
v, err := gce.c.UrlMaps().List(ctx, filter.None)
return v, mc.Observe(err)
}

View File

@ -24,6 +24,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud"
"k8s.io/kubernetes/pkg/cloudprovider/providers/gce/cloud/filter"
)
@ -72,8 +73,11 @@ func (gce *GCECloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeN
// ListZonesInRegion returns all zones in a GCP region
func (gce *GCECloud) ListZonesInRegion(region string) ([]*compute.Zone, error) {
ctx, cancel := cloud.ContextWithCallTimeout()
defer cancel()
mc := newZonesMetricContext("list", region)
list, err := gce.c.Zones().List(context.Background(), filter.Regexp("region", gce.getRegionLink(region)))
list, err := gce.c.Zones().List(ctx, filter.Regexp("region", gce.getRegionLink(region)))
if err != nil {
return nil, mc.Observe(err)
}

View File

@ -50,17 +50,15 @@ type gceRateLimiter struct {
// operations.
func (l *gceRateLimiter) Accept(ctx context.Context, key *cloud.RateLimitKey) error {
if key.Operation == "Get" && key.Service == "Operations" {
ch := make(chan struct{})
go func() {
l.gce.operationPollRateLimiter.Accept()
close(ch)
}()
select {
case <-ch:
break
case <-ctx.Done():
return ctx.Err()
// Wait a minimum amount of time regardless of rate limiter.
rl := &cloud.MinimumRateLimiter{
// Convert flowcontrol.RateLimiter into cloud.RateLimiter
RateLimiter: &cloud.AcceptRateLimiter{
Acceptor: l.gce.operationPollRateLimiter,
},
Minimum: operationPollInterval,
}
return rl.Accept(ctx, key)
}
return nil
}