Add context to all relevant cloud APIs

This adds context to all the relevant cloud provider interface signatures.
Callers of those APIs are currently satisfied using context.TODO().
There will be follow on PRs to push the context through the stack.
For an idea of the full scope of this change please look at PR #58532.
pull/6/head
Walter Fender 2018-02-02 13:12:07 -08:00
parent b13092554c
commit e18e8ec3c0
49 changed files with 350 additions and 305 deletions

View File

@ -18,6 +18,7 @@ limitations under the License.
package app
import (
"context"
"crypto/tls"
"errors"
"fmt"
@ -712,7 +713,7 @@ func getNodeName(cloud cloudprovider.Interface, hostname string) (types.NodeName
return "", fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err := instances.CurrentNodeName(hostname)
nodeName, err := instances.CurrentNodeName(context.TODO(), hostname)
if err != nil {
return "", fmt.Errorf("error fetching current node name from cloud provider: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudprovider
import (
"context"
"errors"
"fmt"
"strings"
@ -56,9 +57,9 @@ type InformerUser interface {
// Clusters is an abstract, pluggable interface for clusters of containers.
type Clusters interface {
// ListClusters lists the names of the available clusters.
ListClusters() ([]string, error)
ListClusters(ctx context.Context) ([]string, error)
// Master gets back the address (either DNS name or IP address) of the master node for the cluster.
Master(clusterName string) (string, error)
Master(ctx context.Context, clusterName string) (string, error)
}
// TODO(#6812): Use a shorter name that's less likely to be longer than cloud
@ -75,12 +76,12 @@ func GetLoadBalancerName(service *v1.Service) string {
}
// GetInstanceProviderID builds a ProviderID for a node in a cloud.
func GetInstanceProviderID(cloud Interface, nodeName types.NodeName) (string, error) {
func GetInstanceProviderID(ctx context.Context, cloud Interface, nodeName types.NodeName) (string, error) {
instances, ok := cloud.Instances()
if !ok {
return "", fmt.Errorf("failed to get instances from cloud provider")
}
instanceID, err := instances.InstanceID(nodeName)
instanceID, err := instances.InstanceID(ctx, nodeName)
if err != nil {
return "", fmt.Errorf("failed to get instance ID from cloud provider: %v", err)
}
@ -94,17 +95,17 @@ type LoadBalancer interface {
// if so, what its status is.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error)
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error)
// UpdateLoadBalancer updates hosts under the specified load balancer.
// Implementations must treat the *v1.Service and *v1.Node
// parameters as read-only and not modify them.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error
UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error
// EnsureLoadBalancerDeleted deletes the specified load balancer if it
// exists, returning nil if the load balancer specified either didn't exist or
// was successfully deleted.
@ -113,7 +114,7 @@ type LoadBalancer interface {
// doesn't exist even if some part of it is still laying around.
// Implementations must treat the *v1.Service parameter as read-only and not modify it.
// Parameter 'clusterName' is the name of the cluster as presented to kube-controller-manager
EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error
EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error
}
// Instances is an abstract, pluggable interface for sets of instances.
@ -122,31 +123,31 @@ type Instances interface {
// TODO(roberthbailey): This currently is only used in such a way that it
// returns the address of the calling instance. We should do a rename to
// make this clearer.
NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error)
NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error)
// NodeAddressesByProviderID returns the addresses of the specified instance.
// The instance is specified using the providerID of the node. The
// ProviderID is a unique identifier of the node. This will not be called
// from the node whose nodeaddresses are being queried. i.e. local metadata
// services cannot be used in this method to obtain nodeaddresses
NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error)
NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error)
// ExternalID returns the cloud provider ID of the node with the specified NodeName.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
ExternalID(nodeName types.NodeName) (string, error)
ExternalID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
InstanceID(nodeName types.NodeName) (string, error)
InstanceID(ctx context.Context, nodeName types.NodeName) (string, error)
// InstanceType returns the type of the specified instance.
InstanceType(name types.NodeName) (string, error)
InstanceType(ctx context.Context, name types.NodeName) (string, error)
// InstanceTypeByProviderID returns the type of the specified instance.
InstanceTypeByProviderID(providerID string) (string, error)
InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error)
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
AddSSHKeyToAllInstances(user string, keyData []byte) error
AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error
// CurrentNodeName returns the name of the node we are currently running on
// On most clouds (e.g. GCE) this is the hostname, so we provide the hostname
CurrentNodeName(hostname string) (types.NodeName, error)
CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error)
// InstanceExistsByProviderID returns true if the instance for the given provider id still is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
InstanceExistsByProviderID(providerID string) (bool, error)
InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error)
}
// Route is a representation of an advanced routing rule.
@ -167,14 +168,14 @@ type Route struct {
// Routes is an abstract, pluggable interface for advanced routing rules.
type Routes interface {
// ListRoutes lists all managed routes that belong to the specified clusterName
ListRoutes(clusterName string) ([]*Route, error)
ListRoutes(ctx context.Context, clusterName string) ([]*Route, error)
// CreateRoute creates the described managed route
// route.Name will be ignored, although the cloud-provider may use nameHint
// to create a more user-meaningful name.
CreateRoute(clusterName string, nameHint string, route *Route) error
CreateRoute(ctx context.Context, clusterName string, nameHint string, route *Route) error
// DeleteRoute deletes the specified managed route
// Route should be as returned by ListRoutes
DeleteRoute(clusterName string, route *Route) error
DeleteRoute(ctx context.Context, clusterName string, route *Route) error
}
var (
@ -195,20 +196,20 @@ type Zones interface {
// In most cases, this method is called from the kubelet querying a local metadata service to aquire its zone.
// For the case of external cloud providers, use GetZoneByProviderID or GetZoneByNodeName since GetZone
// can no longer be called from the kubelets.
GetZone() (Zone, error)
GetZone(ctx context.Context) (Zone, error)
// GetZoneByProviderID returns the Zone containing the current zone and locality region of the node specified by providerId
// This method is particularly used in the context of external cloud providers where node initialization must be down
// outside the kubelets.
GetZoneByProviderID(providerID string) (Zone, error)
GetZoneByProviderID(ctx context.Context, providerID string) (Zone, error)
// GetZoneByNodeName returns the Zone containing the current zone and locality region of the node specified by node name
// This method is particularly used in the context of external cloud providers where node initialization must be down
// outside the kubelets.
GetZoneByNodeName(nodeName types.NodeName) (Zone, error)
GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (Zone, error)
}
// PVLabeler is an abstract, pluggable interface for fetching labels for volumes
type PVLabeler interface {
GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error)
GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package aws
import (
"context"
"errors"
"fmt"
"io"
@ -731,12 +732,12 @@ func newEc2Filter(name string, values ...string) *ec2.Filter {
}
// AddSSHKeyToAllInstances is currently not implemented.
func (c *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (c *Cloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
// CurrentNodeName returns the name of the current node
func (c *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (c *Cloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return c.selfAWSInstance.nodeName, nil
}
@ -1156,7 +1157,7 @@ func (c *Cloud) HasClusterID() bool {
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (c *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (c *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
if c.selfAWSInstance.nodeName == name || len(name) == 0 {
addresses := []v1.NodeAddress{}
@ -1273,7 +1274,7 @@ func extractNodeAddresses(instance *ec2.Instance) ([]v1.NodeAddress, error) {
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (c *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return nil, err
@ -1288,7 +1289,7 @@ func (c *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress,
}
// ExternalID returns the cloud provider ID of the node with the specified nodeName (deprecated).
func (c *Cloud) ExternalID(nodeName types.NodeName) (string, error) {
func (c *Cloud) ExternalID(ctx context.Context, nodeName types.NodeName) (string, error) {
if c.selfAWSInstance.nodeName == nodeName {
// We assume that if this is run on the instance itself, the instance exists and is alive
return c.selfAWSInstance.awsID, nil
@ -1307,7 +1308,7 @@ func (c *Cloud) ExternalID(nodeName types.NodeName) (string, error) {
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (c *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (c *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return false, err
@ -1338,7 +1339,7 @@ func (c *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
}
// InstanceID returns the cloud provider ID of the node with the specified nodeName.
func (c *Cloud) InstanceID(nodeName types.NodeName) (string, error) {
func (c *Cloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
// In the future it is possible to also return an endpoint as:
// <endpoint>/<zone>/<instanceid>
if c.selfAWSInstance.nodeName == nodeName {
@ -1354,7 +1355,7 @@ func (c *Cloud) InstanceID(nodeName types.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (c *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (c *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return "", err
@ -1369,7 +1370,7 @@ func (c *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
}
// InstanceType returns the type of the node with the specified nodeName.
func (c *Cloud) InstanceType(nodeName types.NodeName) (string, error) {
func (c *Cloud) InstanceType(ctx context.Context, nodeName types.NodeName) (string, error) {
if c.selfAWSInstance.nodeName == nodeName {
return c.selfAWSInstance.instanceType, nil
}
@ -1430,7 +1431,7 @@ func (c *Cloud) getCandidateZonesForDynamicVolume() (sets.String, error) {
}
// GetZone implements Zones.GetZone
func (c *Cloud) GetZone() (cloudprovider.Zone, error) {
func (c *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
return cloudprovider.Zone{
FailureDomain: c.selfAWSInstance.availabilityZone,
Region: c.region,
@ -1440,7 +1441,7 @@ func (c *Cloud) GetZone() (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (c *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (c *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
instanceID, err := kubernetesInstanceID(providerID).mapToAWSInstanceID()
if err != nil {
return cloudprovider.Zone{}, err
@ -1461,7 +1462,7 @@ func (c *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, erro
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (c *Cloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (c *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
instance, err := c.getInstanceByNodeName(nodeName)
if err != nil {
return cloudprovider.Zone{}, err
@ -2249,7 +2250,7 @@ func (c *Cloud) checkIfAvailable(disk *awsDisk, opName string, instance string)
return true, nil
}
func (c *Cloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.AWSElasticBlockStore.VolumeID == volume.ProvisionedVolumeName {
return nil, nil
@ -3162,7 +3163,7 @@ func buildListener(port v1.ServicePort, annotations map[string]string, sslPorts
}
// EnsureLoadBalancer implements LoadBalancer.EnsureLoadBalancer
func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (c *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
annotations := apiService.Annotations
glog.V(2).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)",
clusterName, apiService.Namespace, apiService.Name, c.region, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, annotations)
@ -3533,7 +3534,7 @@ func (c *Cloud) EnsureLoadBalancer(clusterName string, apiService *v1.Service, n
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (c *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (c *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
if isNLB(service.Annotations) {
@ -3793,7 +3794,7 @@ func (c *Cloud) updateInstanceSecurityGroupsForLoadBalancer(lb *elb.LoadBalancer
}
// EnsureLoadBalancerDeleted implements LoadBalancer.EnsureLoadBalancerDeleted.
func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (c *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
if isNLB(service.Annotations) {
@ -4026,7 +4027,7 @@ func (c *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Servic
}
// UpdateLoadBalancer implements LoadBalancer.UpdateLoadBalancer
func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
func (c *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
instances, err := c.findInstancesForELB(nodes)
if err != nil {
return err
@ -4041,7 +4042,7 @@ func (c *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, node
if lb == nil {
return fmt.Errorf("Load balancer not found")
}
_, err = c.EnsureLoadBalancer(clusterName, service, nodes)
_, err = c.EnsureLoadBalancer(ctx, clusterName, service, nodes)
return err
}
lb, err := c.describeLoadBalancer(loadBalancerName)

View File

@ -17,6 +17,7 @@ limitations under the License.
package aws
import (
"context"
"fmt"
"github.com/aws/aws-sdk-go/aws"
@ -65,7 +66,7 @@ func (c *Cloud) findRouteTable(clusterName string) (*ec2.RouteTable, error) {
// ListRoutes implements Routes.ListRoutes
// List all routes that match the filter
func (c *Cloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (c *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
table, err := c.findRouteTable(clusterName)
if err != nil {
return nil, err
@ -138,7 +139,7 @@ func (c *Cloud) configureInstanceSourceDestCheck(instanceID string, sourceDestCh
// CreateRoute implements Routes.CreateRoute
// Create the described route
func (c *Cloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
func (c *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
instance, err := c.getInstanceByNodeName(route.TargetNode)
if err != nil {
return err
@ -198,7 +199,7 @@ func (c *Cloud) CreateRoute(clusterName string, nameHint string, route *cloudpro
// DeleteRoute implements Routes.DeleteRoute
// Delete the specified route
func (c *Cloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
func (c *Cloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
table, err := c.findRouteTable(clusterName)
if err != nil {
return err

View File

@ -17,6 +17,7 @@ limitations under the License.
package aws
import (
"context"
"fmt"
"io"
"reflect"
@ -313,13 +314,13 @@ func TestNodeAddresses(t *testing.T) {
instances := []*ec2.Instance{&instance0, &instance1, &instance2}
aws1, _ := mockInstancesResp(&instance0, []*ec2.Instance{&instance0})
_, err1 := aws1.NodeAddresses("instance-mismatch.ec2.internal")
_, err1 := aws1.NodeAddresses(context.TODO(), "instance-mismatch.ec2.internal")
if err1 == nil {
t.Errorf("Should error when no instance found")
}
aws2, _ := mockInstancesResp(&instance2, instances)
_, err2 := aws2.NodeAddresses("instance-same.ec2.internal")
_, err2 := aws2.NodeAddresses(context.TODO(), "instance-same.ec2.internal")
if err2 == nil {
t.Errorf("Should error when multiple instances found")
}
@ -327,7 +328,7 @@ func TestNodeAddresses(t *testing.T) {
aws3, _ := mockInstancesResp(&instance0, instances[0:1])
// change node name so it uses the instance instead of metadata
aws3.selfAWSInstance.nodeName = "foo"
addrs3, err3 := aws3.NodeAddresses("instance-same.ec2.internal")
addrs3, err3 := aws3.NodeAddresses(context.TODO(), "instance-same.ec2.internal")
if err3 != nil {
t.Errorf("Should not error when instance found")
}
@ -359,7 +360,7 @@ func TestNodeAddressesWithMetadata(t *testing.T) {
awsServices.networkInterfacesMacs = []string{"0a:26:89:f3:9c:f6", "0a:77:64:c4:6a:48"}
awsServices.networkInterfacesPrivateIPs = [][]string{{"192.168.0.1"}, {"192.168.0.2"}}
addrs, err := awsCloud.NodeAddresses("")
addrs, err := awsCloud.NodeAddresses(context.TODO(), "")
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -374,7 +375,7 @@ func TestGetRegion(t *testing.T) {
if !ok {
t.Fatalf("Unexpected missing zones impl")
}
zone, err := zones.GetZone()
zone, err := zones.GetZone(context.TODO())
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -869,7 +870,7 @@ func TestDescribeLoadBalancerOnDelete(t *testing.T) {
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancerDeleted(TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}})
c.EnsureLoadBalancerDeleted(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnUpdate(t *testing.T) {
@ -877,7 +878,7 @@ func TestDescribeLoadBalancerOnUpdate(t *testing.T) {
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid")
c.UpdateLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
c.UpdateLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
}
func TestDescribeLoadBalancerOnGet(t *testing.T) {
@ -885,7 +886,7 @@ func TestDescribeLoadBalancerOnGet(t *testing.T) {
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid")
c.GetLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}})
c.GetLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}})
}
func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
@ -893,7 +894,7 @@ func TestDescribeLoadBalancerOnEnsure(t *testing.T) {
c, _ := newAWSCloud(strings.NewReader("[global]"), awsServices)
awsServices.elb.(*MockedFakeELB).expectDescribeLoadBalancers("aid")
c.EnsureLoadBalancer(TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
c.EnsureLoadBalancer(context.TODO(), TestClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "myservice", UID: "id"}}, []*v1.Node{})
}
func TestBuildListener(t *testing.T) {

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"k8s.io/api/core/v1"
@ -27,7 +28,7 @@ import (
)
// NodeAddresses returns the addresses of the specified instance.
func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (az *Cloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
ip, err := az.GetIPForMachineWithRetry(name)
if err != nil {
glog.V(2).Infof("NodeAddresses(%s) abort backoff", name)
@ -43,29 +44,29 @@ func (az *Cloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (az *Cloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (az *Cloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
if err != nil {
return nil, err
}
return az.NodeAddresses(name)
return az.NodeAddresses(ctx, name)
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (az *Cloud) ExternalID(name types.NodeName) (string, error) {
return az.InstanceID(name)
func (az *Cloud) ExternalID(ctx context.Context, name types.NodeName) (string, error) {
return az.InstanceID(ctx, name)
}
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (az *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (az *Cloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
if err != nil {
return false, err
}
_, err = az.InstanceID(name)
_, err = az.InstanceID(ctx, name)
if err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
@ -78,39 +79,39 @@ func (az *Cloud) InstanceExistsByProviderID(providerID string) (bool, error) {
// InstanceID returns the cloud provider ID of the specified instance.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
func (az *Cloud) InstanceID(name types.NodeName) (string, error) {
func (az *Cloud) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
return az.vmSet.GetInstanceIDByNodeName(string(name))
}
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (az *Cloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (az *Cloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
name, err := az.vmSet.GetNodeNameByProviderID(providerID)
if err != nil {
return "", err
}
return az.InstanceType(name)
return az.InstanceType(ctx, name)
}
// InstanceType returns the type of the specified instance.
// Note that if the instance does not exist or is no longer running, we must return ("", cloudprovider.InstanceNotFound)
// (Implementer Note): This is used by kubelet. Kubelet will label the node. Real log from kubelet:
// Adding node label from cloud provider: beta.kubernetes.io/instance-type=[value]
func (az *Cloud) InstanceType(name types.NodeName) (string, error) {
func (az *Cloud) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
return az.vmSet.GetInstanceTypeByNodeName(string(name))
}
// AddSSHKeyToAllInstances adds an SSH public key as a legal identity for all instances
// expected format for the key is standard ssh-keygen format: <protocol> <blob>
func (az *Cloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (az *Cloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return fmt.Errorf("not supported")
}
// CurrentNodeName returns the name of the node we are currently running on.
// On Azure this is the hostname, so we just return the hostname.
func (az *Cloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (az *Cloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"math"
"strconv"
@ -73,7 +74,7 @@ const ServiceAnnotationLoadBalancerResourceGroup = "service.beta.kubernetes.io/a
// GetLoadBalancer returns whether the specified load balancer exists, and
// if so, what its status is.
func (az *Cloud) GetLoadBalancer(clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
func (az *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) {
_, status, exists, err = az.getServiceLoadBalancer(service, clusterName, nil, false)
if err != nil {
return nil, false, err
@ -94,7 +95,7 @@ func getPublicIPDomainNameLabel(service *v1.Service) string {
}
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one. Returns the status of the balancer
func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
// When a client updates the internal load balancer annotation,
// the service may be switched from an internal LB to a public one, or vise versa.
// Here we'll firstly ensure service do not lie in the opposite LB.
@ -132,8 +133,8 @@ func (az *Cloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nod
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (az *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
_, err := az.EnsureLoadBalancer(clusterName, service, nodes)
func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
_, err := az.EnsureLoadBalancer(ctx, clusterName, service, nodes)
return err
}
@ -143,12 +144,12 @@ func (az *Cloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nod
// This construction is useful because many cloud providers' load balancers
// have multiple underlying components, meaning a Get could say that the LB
// doesn't exist even if some part of it is still laying around.
func (az *Cloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (az *Cloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
isInternal := requiresInternalLoadBalancer(service)
serviceName := getServiceName(service)
glog.V(5).Infof("delete(%s): START clusterName=%q", serviceName, clusterName)
serviceIPToCleanup, err := az.findServiceIPAddress(clusterName, service, isInternal)
serviceIPToCleanup, err := az.findServiceIPAddress(ctx, clusterName, service, isInternal)
if err != nil {
return err
}
@ -373,12 +374,12 @@ func flipServiceInternalAnnotation(service *v1.Service) *v1.Service {
return copyService
}
func (az *Cloud) findServiceIPAddress(clusterName string, service *v1.Service, isInternalLb bool) (string, error) {
func (az *Cloud) findServiceIPAddress(ctx context.Context, clusterName string, service *v1.Service, isInternalLb bool) (string, error) {
if len(service.Spec.LoadBalancerIP) > 0 {
return service.Spec.LoadBalancerIP, nil
}
lbStatus, existsLb, err := az.GetLoadBalancer(clusterName, service)
lbStatus, existsLb, err := az.GetLoadBalancer(ctx, clusterName, service)
if err != nil {
return "", err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"k8s.io/kubernetes/pkg/cloudprovider"
@ -28,7 +29,7 @@ import (
)
// ListRoutes lists all managed routes that belong to the specified clusterName
func (az *Cloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (az *Cloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
glog.V(10).Infof("list: START clusterName=%q", clusterName)
routeTable, existsRouteTable, err := az.getRouteTable()
return processRoutes(routeTable, existsRouteTable, err)
@ -106,7 +107,7 @@ func (az *Cloud) createRouteTable() error {
// CreateRoute creates the described managed route
// route.Name will be ignored, although the cloud-provider may use nameHint
// to create a more user-meaningful name.
func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error {
func (az *Cloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, kubeRoute *cloudprovider.Route) error {
glog.V(2).Infof("create: creating route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
if err := az.createRouteTableIfNotExists(clusterName, kubeRoute); err != nil {
return err
@ -149,7 +150,7 @@ func (az *Cloud) CreateRoute(clusterName string, nameHint string, kubeRoute *clo
// DeleteRoute deletes the specified managed route
// Route should be as returned by ListRoutes
func (az *Cloud) DeleteRoute(clusterName string, kubeRoute *cloudprovider.Route) error {
func (az *Cloud) DeleteRoute(ctx context.Context, clusterName string, kubeRoute *cloudprovider.Route) error {
glog.V(2).Infof("delete: deleting route. clusterName=%q instance=%q cidr=%q", clusterName, kubeRoute.TargetNode, kubeRoute.DestinationCIDR)
routeName := mapNodeNameToRouteName(kubeRoute.TargetNode)

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"fmt"
"math"
"net/http"
@ -123,7 +124,7 @@ func testLoadBalancerServiceDefaultModeSelection(t *testing.T, isInternal bool)
svc = getTestService(svcName, v1.ProtocolTCP, 8081)
}
lbStatus, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
lbStatus, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -176,7 +177,7 @@ func testLoadBalancerServiceAutoModeSelection(t *testing.T, isInternal bool) {
svc = getTestService(svcName, v1.ProtocolTCP, 8081)
}
setLoadBalancerAutoModeAnnotation(&svc)
lbStatus, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
lbStatus, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -239,7 +240,7 @@ func testLoadBalancerServicesSpecifiedSelection(t *testing.T, isInternal bool) {
lbMode := fmt.Sprintf("%s,%s", selectedAvailabilitySetName1, selectedAvailabilitySetName2)
setLoadBalancerModeAnnotation(&svc, lbMode)
lbStatus, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
lbStatus, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -277,7 +278,7 @@ func testLoadBalancerMaxRulesServices(t *testing.T, isInternal bool) {
svc = getTestService(svcName, v1.ProtocolTCP, 8081)
}
lbStatus, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
lbStatus, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -303,7 +304,7 @@ func testLoadBalancerMaxRulesServices(t *testing.T, isInternal bool) {
} else {
svc = getTestService(svcName, v1.ProtocolTCP, 8081)
}
_, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
_, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err == nil {
t.Errorf("Expect any new service to fail as max limit in lb has reached")
} else {
@ -334,7 +335,7 @@ func testLoadBalancerServiceAutoModeDeleteSelection(t *testing.T, isInternal boo
svc = getTestService(svcName, v1.ProtocolTCP, 8081)
}
setLoadBalancerAutoModeAnnotation(&svc)
lbStatus, err := az.EnsureLoadBalancer(testClusterName, &svc, clusterResources.nodes)
lbStatus, err := az.EnsureLoadBalancer(context.TODO(), testClusterName, &svc, clusterResources.nodes)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}
@ -363,7 +364,7 @@ func testLoadBalancerServiceAutoModeDeleteSelection(t *testing.T, isInternal boo
t.Errorf("Unexpected number of LB's: Expected (%d) Found (%d)", expectedNumOfLB, lbCount)
}
err := az.EnsureLoadBalancerDeleted(testClusterName, &svc)
err := az.EnsureLoadBalancerDeleted(context.TODO(), testClusterName, &svc)
if err != nil {
t.Errorf("Unexpected error: %q", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure
import (
"context"
"encoding/json"
"io"
"io/ioutil"
@ -39,7 +40,7 @@ type instanceInfo struct {
}
// GetZone returns the Zone containing the current failure zone and locality region that the program is running in
func (az *Cloud) GetZone() (cloudprovider.Zone, error) {
func (az *Cloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
return az.getZoneFromURL(instanceInfoURL)
}
@ -64,19 +65,19 @@ func (az *Cloud) getZoneFromURL(url string) (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (az *Cloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (az *Cloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
nodeName, err := az.vmSet.GetNodeNameByProviderID(providerID)
if err != nil {
return cloudprovider.Zone{}, err
}
return az.GetZoneByNodeName(nodeName)
return az.GetZoneByNodeName(ctx, nodeName)
}
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (az *Cloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (az *Cloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
return az.vmSet.GetZoneByNodeName(string(nodeName))
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudstack
import (
"context"
"errors"
"fmt"
"io"
@ -187,7 +188,7 @@ func (cs *CSCloud) HasClusterID() bool {
}
// GetZone returns the Zone containing the region that the program is running in.
func (cs *CSCloud) GetZone() (cloudprovider.Zone, error) {
func (cs *CSCloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
zone := cloudprovider.Zone{}
if cs.zone == "" {
@ -215,7 +216,7 @@ func (cs *CSCloud) GetZone() (cloudprovider.Zone, error) {
}
// GetZoneByProviderID returns the Zone, found by using the provider ID.
func (cs *CSCloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (cs *CSCloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
zone := cloudprovider.Zone{}
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByID(
@ -237,7 +238,7 @@ func (cs *CSCloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, e
}
// GetZoneByNodeName returns the Zone, found by using the node name.
func (cs *CSCloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (cs *CSCloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
zone := cloudprovider.Zone{}
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByName(

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudstack
import (
"context"
"errors"
"fmt"
@ -28,7 +29,7 @@ import (
)
// NodeAddresses returns the addresses of the specified instance.
func (cs *CSCloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (cs *CSCloud) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByName(
string(name),
cloudstack.WithProject(cs.projectID),
@ -44,7 +45,7 @@ func (cs *CSCloud) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error)
}
// NodeAddressesByProviderID returns the addresses of the specified instance.
func (cs *CSCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (cs *CSCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByID(
providerID,
cloudstack.WithProject(cs.projectID),
@ -80,12 +81,12 @@ func (cs *CSCloud) nodeAddresses(instance *cloudstack.VirtualMachine) ([]v1.Node
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (cs *CSCloud) ExternalID(name types.NodeName) (string, error) {
return cs.InstanceID(name)
func (cs *CSCloud) ExternalID(ctx context.Context, name types.NodeName) (string, error) {
return cs.InstanceID(ctx, name)
}
// InstanceID returns the cloud provider ID of the specified instance.
func (cs *CSCloud) InstanceID(name types.NodeName) (string, error) {
func (cs *CSCloud) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByName(
string(name),
cloudstack.WithProject(cs.projectID),
@ -101,7 +102,7 @@ func (cs *CSCloud) InstanceID(name types.NodeName) (string, error) {
}
// InstanceType returns the type of the specified instance.
func (cs *CSCloud) InstanceType(name types.NodeName) (string, error) {
func (cs *CSCloud) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByName(
string(name),
cloudstack.WithProject(cs.projectID),
@ -117,7 +118,7 @@ func (cs *CSCloud) InstanceType(name types.NodeName) (string, error) {
}
// InstanceTypeByProviderID returns the type of the specified instance.
func (cs *CSCloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (cs *CSCloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instance, count, err := cs.client.VirtualMachine.GetVirtualMachineByID(
providerID,
cloudstack.WithProject(cs.projectID),
@ -133,17 +134,17 @@ func (cs *CSCloud) InstanceTypeByProviderID(providerID string) (string, error) {
}
// AddSSHKeyToAllInstances is currently not implemented.
func (cs *CSCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (cs *CSCloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return errors.New("AddSSHKeyToAllInstances not implemented")
}
// CurrentNodeName returns the name of the node we are currently running on.
func (cs *CSCloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (cs *CSCloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
// InstanceExistsByProviderID returns if the instance still exists.
func (cs *CSCloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (cs *CSCloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
_, count, err := cs.client.VirtualMachine.GetVirtualMachineByID(
providerID,
cloudstack.WithProject(cs.projectID),

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudstack
import (
"context"
"fmt"
"strconv"
@ -40,7 +41,7 @@ type loadBalancer struct {
}
// GetLoadBalancer returns whether the specified load balancer exists, and if so, what its status is.
func (cs *CSCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (cs *CSCloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
glog.V(4).Infof("GetLoadBalancer(%v, %v, %v)", clusterName, service.Namespace, service.Name)
// Get the load balancer details and existing rules.
@ -63,7 +64,7 @@ func (cs *CSCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1
}
// EnsureLoadBalancer creates a new load balancer, or updates the existing one. Returns the status of the balancer.
func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (status *v1.LoadBalancerStatus, err error) {
func (cs *CSCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (status *v1.LoadBalancerStatus, err error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v)", clusterName, service.Namespace, service.Name, service.Spec.LoadBalancerIP, service.Spec.Ports, nodes)
if len(service.Spec.Ports) == 0 {
@ -165,7 +166,7 @@ func (cs *CSCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, n
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
func (cs *CSCloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v, %v)", clusterName, service.Namespace, service.Name, nodes)
// Get the load balancer details and existing rules.
@ -211,7 +212,7 @@ func (cs *CSCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, n
// EnsureLoadBalancerDeleted deletes the specified load balancer if it exists, returning
// nil if the load balancer specified either didn't exist or was successfully deleted.
func (cs *CSCloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (cs *CSCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v, %v)", clusterName, service.Namespace, service.Name)
// Get the load balancer details and existing rules.

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudstack
import (
"context"
"os"
"strconv"
"strings"
@ -107,7 +108,7 @@ func TestLoadBalancer(t *testing.T) {
t.Fatalf("LoadBalancer() returned false")
}
_, exists, err := lb.GetLoadBalancer(testClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "noexist"}})
_, exists, err := lb.GetLoadBalancer(context.TODO(), testClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "noexist"}})
if err != nil {
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloudstack
import (
"context"
"errors"
"fmt"
"io/ioutil"
@ -46,7 +47,7 @@ const (
)
// NodeAddresses returns the addresses of the specified instance.
func (m *metadata) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (m *metadata) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
externalIP, err := m.get(metadataTypeExternalIP)
if err != nil {
return nil, fmt.Errorf("could not get external IP: %v", err)
@ -64,17 +65,17 @@ func (m *metadata) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error)
}
// NodeAddressesByProviderID returns the addresses of the specified instance.
func (m *metadata) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (m *metadata) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return nil, errors.New("NodeAddressesByProviderID not implemented")
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (m *metadata) ExternalID(name types.NodeName) (string, error) {
return m.InstanceID(name)
func (m *metadata) ExternalID(ctx context.Context, name types.NodeName) (string, error) {
return m.InstanceID(ctx, name)
}
// InstanceID returns the cloud provider ID of the specified instance.
func (m *metadata) InstanceID(name types.NodeName) (string, error) {
func (m *metadata) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
instanceID, err := m.get(metadataTypeInstanceID)
if err != nil {
return "", fmt.Errorf("could not get instance ID: %v", err)
@ -89,7 +90,7 @@ func (m *metadata) InstanceID(name types.NodeName) (string, error) {
}
// InstanceType returns the type of the specified instance.
func (m *metadata) InstanceType(name types.NodeName) (string, error) {
func (m *metadata) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
instanceType, err := m.get(metadataTypeInstanceType)
if err != nil {
return "", fmt.Errorf("could not get instance type: %v", err)
@ -99,27 +100,27 @@ func (m *metadata) InstanceType(name types.NodeName) (string, error) {
}
// InstanceTypeByProviderID returns the type of the specified instance.
func (m *metadata) InstanceTypeByProviderID(providerID string) (string, error) {
func (m *metadata) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", errors.New("InstanceTypeByProviderID not implemented")
}
// AddSSHKeyToAllInstances is currently not implemented.
func (m *metadata) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (m *metadata) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return errors.New("AddSSHKeyToAllInstances not implemented")
}
// CurrentNodeName returns the name of the node we are currently running on.
func (m *metadata) CurrentNodeName(hostname string) (types.NodeName, error) {
func (m *metadata) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
// InstanceExistsByProviderID returns if the instance still exists.
func (m *metadata) InstanceExistsByProviderID(providerID string) (bool, error) {
func (m *metadata) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, errors.New("InstanceExistsByProviderID not implemented")
}
// GetZone returns the Zone containing the region that the program is running in.
func (m *metadata) GetZone() (cloudprovider.Zone, error) {
func (m *metadata) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
zone := cloudprovider.Zone{}
if m.zone == "" {
@ -139,12 +140,12 @@ func (m *metadata) GetZone() (cloudprovider.Zone, error) {
}
// GetZoneByProviderID returns the Zone, found by using the provider ID.
func (m *metadata) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (m *metadata) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, errors.New("GetZoneByProviderID not implemented")
}
// GetZoneByNodeName returns the Zone, found by using the node name.
func (m *metadata) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (m *metadata) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, errors.New("GetZoneByNodeName not implemented")
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package fake
import (
"context"
"fmt"
"net"
"regexp"
@ -90,11 +91,11 @@ func (f *FakeCloud) ClearCalls() {
// Initialize passes a Kubernetes clientBuilder interface to the cloud provider
func (f *FakeCloud) Initialize(clientBuilder controller.ControllerClientBuilder) {}
func (f *FakeCloud) ListClusters() ([]string, error) {
func (f *FakeCloud) ListClusters(ctx context.Context) ([]string, error) {
return f.ClusterList, f.Err
}
func (f *FakeCloud) Master(name string) (string, error) {
func (f *FakeCloud) Master(ctx context.Context, name string) (string, error) {
return f.MasterName, f.Err
}
@ -137,7 +138,7 @@ func (f *FakeCloud) Routes() (cloudprovider.Routes, bool) {
}
// GetLoadBalancer is a stub implementation of LoadBalancer.GetLoadBalancer.
func (f *FakeCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (f *FakeCloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
status := &v1.LoadBalancerStatus{}
status.Ingress = []v1.LoadBalancerIngress{{IP: f.ExternalIP.String()}}
@ -146,7 +147,7 @@ func (f *FakeCloud) GetLoadBalancer(clusterName string, service *v1.Service) (*v
// EnsureLoadBalancer is a test-spy implementation of LoadBalancer.EnsureLoadBalancer.
// It adds an entry "create" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (f *FakeCloud) EnsureLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
f.addCall("create")
if f.Balancers == nil {
f.Balancers = make(map[string]FakeBalancer)
@ -155,7 +156,7 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
name := cloudprovider.GetLoadBalancerName(service)
spec := service.Spec
zone, err := f.GetZone()
zone, err := f.GetZone(context.TODO())
if err != nil {
return nil, err
}
@ -171,7 +172,7 @@ func (f *FakeCloud) EnsureLoadBalancer(clusterName string, service *v1.Service,
// UpdateLoadBalancer is a test-spy implementation of LoadBalancer.UpdateLoadBalancer.
// It adds an entry "update" into the internal method call record.
func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
func (f *FakeCloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
f.addCall("update")
f.UpdateCalls = append(f.UpdateCalls, FakeUpdateBalancerCall{service, nodes})
return f.Err
@ -179,30 +180,30 @@ func (f *FakeCloud) UpdateLoadBalancer(clusterName string, service *v1.Service,
// EnsureLoadBalancerDeleted is a test-spy implementation of LoadBalancer.EnsureLoadBalancerDeleted.
// It adds an entry "delete" into the internal method call record.
func (f *FakeCloud) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (f *FakeCloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
f.addCall("delete")
return f.Err
}
func (f *FakeCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (f *FakeCloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
// Implementation of Instances.CurrentNodeName
func (f *FakeCloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (f *FakeCloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
// NodeAddresses is a test-spy implementation of Instances.NodeAddresses.
// It adds an entry "node-addresses" into the internal method call record.
func (f *FakeCloud) NodeAddresses(instance types.NodeName) ([]v1.NodeAddress, error) {
func (f *FakeCloud) NodeAddresses(ctx context.Context, instance types.NodeName) ([]v1.NodeAddress, error) {
f.addCall("node-addresses")
return f.Addresses, f.Err
}
// NodeAddressesByProviderID is a test-spy implementation of Instances.NodeAddressesByProviderID.
// It adds an entry "node-addresses-by-provider-id" into the internal method call record.
func (f *FakeCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (f *FakeCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
f.addCall("node-addresses-by-provider-id")
return f.Addresses, f.Err
}
@ -210,32 +211,32 @@ func (f *FakeCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddre
// ExternalID is a test-spy implementation of Instances.ExternalID.
// It adds an entry "external-id" into the internal method call record.
// It returns an external id to the mapped instance name, if not found, it will return "ext-{instance}"
func (f *FakeCloud) ExternalID(nodeName types.NodeName) (string, error) {
func (f *FakeCloud) ExternalID(ctx context.Context, nodeName types.NodeName) (string, error) {
f.addCall("external-id")
return f.ExtID[nodeName], f.Err
}
// InstanceID returns the cloud provider ID of the node with the specified Name.
func (f *FakeCloud) InstanceID(nodeName types.NodeName) (string, error) {
func (f *FakeCloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
f.addCall("instance-id")
return f.ExtID[nodeName], nil
}
// InstanceType returns the type of the specified instance.
func (f *FakeCloud) InstanceType(instance types.NodeName) (string, error) {
func (f *FakeCloud) InstanceType(ctx context.Context, instance types.NodeName) (string, error) {
f.addCall("instance-type")
return f.InstanceTypes[instance], nil
}
// InstanceTypeByProviderID returns the type of the specified instance.
func (f *FakeCloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (f *FakeCloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
f.addCall("instance-type-by-provider-id")
return f.InstanceTypes[types.NodeName(providerID)], nil
}
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (f *FakeCloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (f *FakeCloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
f.addCall("instance-exists-by-provider-id")
return f.ExistsByProviderID, f.ErrByProviderID
}
@ -253,7 +254,7 @@ func (f *FakeCloud) List(filter string) ([]types.NodeName, error) {
return result, f.Err
}
func (f *FakeCloud) GetZone() (cloudprovider.Zone, error) {
func (f *FakeCloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
f.addCall("get-zone")
return f.Zone, f.Err
}
@ -261,7 +262,7 @@ func (f *FakeCloud) GetZone() (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (f *FakeCloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (f *FakeCloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
f.addCall("get-zone-by-provider-id")
return f.Zone, f.Err
}
@ -269,12 +270,12 @@ func (f *FakeCloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone,
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (f *FakeCloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (f *FakeCloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
f.addCall("get-zone-by-node-name")
return f.Zone, f.Err
}
func (f *FakeCloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (f *FakeCloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
f.Lock.Lock()
defer f.Lock.Unlock()
f.addCall("list-routes")
@ -288,7 +289,7 @@ func (f *FakeCloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, erro
return routes, f.Err
}
func (f *FakeCloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
func (f *FakeCloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
f.Lock.Lock()
defer f.Lock.Unlock()
f.addCall("create-route")
@ -305,7 +306,7 @@ func (f *FakeCloud) CreateRoute(clusterName string, nameHint string, route *clou
return nil
}
func (f *FakeCloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
func (f *FakeCloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
f.Lock.Lock()
defer f.Lock.Unlock()
f.addCall("delete-route")
@ -318,7 +319,7 @@ func (f *FakeCloud) DeleteRoute(clusterName string, route *cloudprovider.Route)
return nil
}
func (c *FakeCloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
func (c *FakeCloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
if val, ok := c.VolumeLabelMap[pv.Name]; ok {
return val, nil
}

View File

@ -16,11 +16,13 @@ limitations under the License.
package gce
import "context"
func newClustersMetricContext(request, zone string) *metricContext {
return newGenericMetricContext("clusters", request, unusedMetricLabel, zone, computeV1Version)
}
func (gce *GCECloud) ListClusters() ([]string, error) {
func (gce *GCECloud) ListClusters(ctx context.Context) ([]string, error) {
allClusters := []string{}
for _, zone := range gce.managedZones {
@ -35,7 +37,7 @@ func (gce *GCECloud) ListClusters() ([]string, error) {
return allClusters, nil
}
func (gce *GCECloud) Master(clusterName string) (string, error) {
func (gce *GCECloud) Master(ctx context.Context, clusterName string) (string, error) {
return "k8s-" + clusterName + "-master.internal", nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package gce
import (
"context"
"encoding/json"
"fmt"
"net/http"
@ -577,7 +578,7 @@ func newDiskMetricContextRegional(request, region string) *metricContext {
return newGenericMetricContext("disk", request, region, unusedMetricLabel, computeV1Version)
}
func (gce *GCECloud) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
func (gce *GCECloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.GCEPersistentDisk.PDName == volume.ProvisionedVolumeName {
return nil, nil

View File

@ -77,7 +77,7 @@ func (gce *GCECloud) ToInstanceReferences(zone string, instanceNames []string) (
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
func (gce *GCECloud) NodeAddresses(_ context.Context, _ types.NodeName) ([]v1.NodeAddress, error) {
internalIP, err := metadata.Get("instance/network-interfaces/0/ip")
if err != nil {
return nil, fmt.Errorf("couldn't get internal IP: %v", err)
@ -94,7 +94,7 @@ func (gce *GCECloud) NodeAddresses(_ types.NodeName) ([]v1.NodeAddress, error) {
// NodeAddressesByProviderID will not be called from the node that is requesting this ID.
// i.e. metadata service and other local methods cannot be used here
func (gce *GCECloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (gce *GCECloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
_, zone, name, err := splitProviderID(providerID)
if err != nil {
return []v1.NodeAddress{}, err
@ -141,7 +141,7 @@ func (gce *GCECloud) instanceByProviderID(providerID string) (*gceInstance, erro
// with the specified unique providerID This method will not be called from the
// node that is requesting this ID. i.e. metadata service and other local
// methods cannot be used here
func (gce *GCECloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (gce *GCECloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instance, err := gce.instanceByProviderID(providerID)
if err != nil {
return "", err
@ -151,7 +151,7 @@ func (gce *GCECloud) InstanceTypeByProviderID(providerID string) (string, error)
}
// ExternalID returns the cloud provider ID of the node with the specified NodeName (deprecated).
func (gce *GCECloud) ExternalID(nodeName types.NodeName) (string, error) {
func (gce *GCECloud) ExternalID(ctx context.Context, nodeName types.NodeName) (string, error) {
instanceName := mapNodeNameToInstanceName(nodeName)
if gce.useMetadataServer {
// Use metadata, if possible, to fetch ID. See issue #12000
@ -173,7 +173,7 @@ func (gce *GCECloud) ExternalID(nodeName types.NodeName) (string, error) {
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (gce *GCECloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (gce *GCECloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
_, err := gce.instanceByProviderID(providerID)
if err != nil {
if err == cloudprovider.InstanceNotFound {
@ -186,7 +186,7 @@ func (gce *GCECloud) InstanceExistsByProviderID(providerID string) (bool, error)
}
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
func (gce *GCECloud) InstanceID(nodeName types.NodeName) (string, error) {
func (gce *GCECloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
instanceName := mapNodeNameToInstanceName(nodeName)
if gce.useMetadataServer {
// Use metadata, if possible, to fetch ID. See issue #12000
@ -205,7 +205,7 @@ func (gce *GCECloud) InstanceID(nodeName types.NodeName) (string, error) {
}
// InstanceType returns the type of the specified node with the specified NodeName.
func (gce *GCECloud) InstanceType(nodeName types.NodeName) (string, error) {
func (gce *GCECloud) InstanceType(ctx context.Context, nodeName types.NodeName) (string, error) {
instanceName := mapNodeNameToInstanceName(nodeName)
if gce.useMetadataServer {
// Use metadata, if possible, to fetch ID. See issue #12000
@ -223,7 +223,7 @@ func (gce *GCECloud) InstanceType(nodeName types.NodeName) (string, error) {
return instance.Type, nil
}
func (gce *GCECloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (gce *GCECloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return wait.Poll(2*time.Second, 30*time.Second, func() (bool, error) {
project, err := gce.c.Projects().Get(context.Background(), gce.projectID)
if err != nil {
@ -336,7 +336,7 @@ func (gce *GCECloud) DeleteInstance(project, zone, name string) error {
}
// Implementation of Instances.CurrentNodeName
func (gce *GCECloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (gce *GCECloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package gce
import (
"context"
"flag"
"fmt"
"net"
@ -96,7 +97,7 @@ func LoadBalancerSrcRanges() []string {
}
// GetLoadBalancer is an implementation of LoadBalancer.GetLoadBalancer
func (gce *GCECloud) GetLoadBalancer(clusterName string, svc *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (gce *GCECloud) GetLoadBalancer(ctx context.Context, clusterName string, svc *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
fwd, err := gce.GetRegionForwardingRule(loadBalancerName, gce.region)
if err == nil {
@ -109,7 +110,7 @@ func (gce *GCECloud) GetLoadBalancer(clusterName string, svc *v1.Service) (*v1.L
}
// EnsureLoadBalancer is an implementation of LoadBalancer.EnsureLoadBalancer.
func (gce *GCECloud) EnsureLoadBalancer(clusterName string, svc *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (gce *GCECloud) EnsureLoadBalancer(ctx context.Context, clusterName string, svc *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
desiredScheme := getSvcScheme(svc)
clusterID, err := gce.ClusterID.GetID()
@ -158,7 +159,7 @@ func (gce *GCECloud) EnsureLoadBalancer(clusterName string, svc *v1.Service, nod
}
// UpdateLoadBalancer is an implementation of LoadBalancer.UpdateLoadBalancer.
func (gce *GCECloud) UpdateLoadBalancer(clusterName string, svc *v1.Service, nodes []*v1.Node) error {
func (gce *GCECloud) UpdateLoadBalancer(ctx context.Context, clusterName string, svc *v1.Service, nodes []*v1.Node) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
scheme := getSvcScheme(svc)
clusterID, err := gce.ClusterID.GetID()
@ -179,7 +180,7 @@ func (gce *GCECloud) UpdateLoadBalancer(clusterName string, svc *v1.Service, nod
}
// EnsureLoadBalancerDeleted is an implementation of LoadBalancer.EnsureLoadBalancerDeleted.
func (gce *GCECloud) EnsureLoadBalancerDeleted(clusterName string, svc *v1.Service) error {
func (gce *GCECloud) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, svc *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(svc)
scheme := getSvcScheme(svc)
clusterID, err := gce.ClusterID.GetID()

View File

@ -36,7 +36,7 @@ func newRoutesMetricContext(request string) *metricContext {
}
// ListRoutes in the cloud environment.
func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (gce *GCECloud) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
mc := newRoutesMetricContext("list")
prefix := truncateClusterName(clusterName)
f := filter.Regexp("name", prefix+"-.*").AndRegexp("network", gce.NetworkURL()).AndRegexp("description", k8sNodeRouteTag)
@ -59,7 +59,7 @@ func (gce *GCECloud) ListRoutes(clusterName string) ([]*cloudprovider.Route, err
}
// CreateRoute in the cloud environment.
func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
func (gce *GCECloud) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
mc := newRoutesMetricContext("create")
targetInstance, err := gce.getInstanceByName(mapNodeNameToInstanceName(route.TargetNode))
@ -83,7 +83,7 @@ func (gce *GCECloud) CreateRoute(clusterName string, nameHint string, route *clo
}
// DeleteRoute from the cloud environment.
func (gce *GCECloud) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
func (gce *GCECloud) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
mc := newRoutesMetricContext("delete")
return mc.Observe(gce.c.Routes().Delete(context.Background(), meta.GlobalKey(route.Name)))
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package gce
import (
"context"
"encoding/json"
"reflect"
"strings"
@ -100,7 +101,7 @@ func TestGetRegion(t *testing.T) {
if !ok {
t.Fatalf("Unexpected missing zones impl")
}
zone, err := zones.GetZone()
zone, err := zones.GetZone(context.TODO())
if err != nil {
t.Fatalf("unexpected error %v", err)
}
@ -305,7 +306,7 @@ func TestGetZoneByProviderID(t *testing.T) {
region: "us-central1",
}
for _, test := range tests {
zone, err := gce.GetZoneByProviderID(test.providerID)
zone, err := gce.GetZoneByProviderID(context.TODO(), test.providerID)
if (err != nil) != test.fail {
t.Errorf("Expected to fail=%t, provider ID %v, tests %s", test.fail, test, test.description)
}

View File

@ -32,7 +32,7 @@ func newZonesMetricContext(request, region string) *metricContext {
}
// GetZone creates a cloudprovider.Zone of the current zone and region
func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
func (gce *GCECloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
return cloudprovider.Zone{
FailureDomain: gce.localZone,
Region: gce.region,
@ -42,7 +42,7 @@ func (gce *GCECloud) GetZone() (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (gce *GCECloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (gce *GCECloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
_, zone, _, err := splitProviderID(providerID)
if err != nil {
return cloudprovider.Zone{}, err
@ -57,7 +57,7 @@ func (gce *GCECloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone,
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (gce *GCECloud) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (gce *GCECloud) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
instanceName := mapNodeNameToInstanceName(nodeName)
instance, err := gce.getInstanceByName(instanceName)
if err != nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"crypto/tls"
"errors"
"fmt"
@ -606,7 +607,7 @@ func (os *OpenStack) Zones() (cloudprovider.Zones, bool) {
}
// GetZone returns the current zone
func (os *OpenStack) GetZone() (cloudprovider.Zone, error) {
func (os *OpenStack) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
md, err := getMetadata(os.metadataOpts.SearchOrder)
if err != nil {
return cloudprovider.Zone{}, err
@ -623,7 +624,7 @@ func (os *OpenStack) GetZone() (cloudprovider.Zone, error) {
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (os *OpenStack) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (os *OpenStack) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
instanceID, err := instanceIDFromProviderID(providerID)
if err != nil {
return cloudprovider.Zone{}, err
@ -650,7 +651,7 @@ func (os *OpenStack) GetZoneByProviderID(providerID string) (cloudprovider.Zone,
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (os *OpenStack) GetZoneByNodeName(nodeName types.NodeName) (cloudprovider.Zone, error) {
func (os *OpenStack) GetZoneByNodeName(ctx context.Context, nodeName types.NodeName) (cloudprovider.Zone, error) {
compute, err := os.NewComputeV2()
if err != nil {
return cloudprovider.Zone{}, err

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"fmt"
"regexp"
@ -54,7 +55,7 @@ func (os *OpenStack) Instances() (cloudprovider.Instances, bool) {
// CurrentNodeName implements Instances.CurrentNodeName
// Note this is *not* necessarily the same as hostname.
func (i *Instances) CurrentNodeName(hostname string) (types.NodeName, error) {
func (i *Instances) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
md, err := getMetadata(i.opts.SearchOrder)
if err != nil {
return "", err
@ -63,12 +64,12 @@ func (i *Instances) CurrentNodeName(hostname string) (types.NodeName, error) {
}
// AddSSHKeyToAllInstances is not implemented for OpenStack
func (i *Instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (i *Instances) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
// NodeAddresses implements Instances.NodeAddresses
func (i *Instances) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (i *Instances) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
glog.V(4).Infof("NodeAddresses(%v) called", name)
addrs, err := getAddressesByName(i.compute, name)
@ -83,7 +84,7 @@ func (i *Instances) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error)
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (i *Instances) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (i *Instances) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
instanceID, err := instanceIDFromProviderID(providerID)
if err != nil {
@ -105,7 +106,7 @@ func (i *Instances) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddre
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (i *Instances) ExternalID(name types.NodeName) (string, error) {
func (i *Instances) ExternalID(ctx context.Context, name types.NodeName) (string, error) {
srv, err := getServerByName(i.compute, name, true)
if err != nil {
if err == ErrNotFound {
@ -118,7 +119,7 @@ func (i *Instances) ExternalID(name types.NodeName) (string, error) {
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (i *Instances) InstanceExistsByProviderID(providerID string) (bool, error) {
func (i *Instances) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
instanceID, err := instanceIDFromProviderID(providerID)
if err != nil {
return false, err
@ -153,7 +154,7 @@ func (os *OpenStack) InstanceID() (string, error) {
}
// InstanceID returns the cloud provider ID of the specified instance.
func (i *Instances) InstanceID(name types.NodeName) (string, error) {
func (i *Instances) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
srv, err := getServerByName(i.compute, name, true)
if err != nil {
if err == ErrNotFound {
@ -169,7 +170,7 @@ func (i *Instances) InstanceID(name types.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (i *Instances) InstanceTypeByProviderID(providerID string) (string, error) {
func (i *Instances) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
instanceID, err := instanceIDFromProviderID(providerID)
if err != nil {
@ -186,7 +187,7 @@ func (i *Instances) InstanceTypeByProviderID(providerID string) (string, error)
}
// InstanceType returns the type of the specified instance.
func (i *Instances) InstanceType(name types.NodeName) (string, error) {
func (i *Instances) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
srv, err := getServerByName(i.compute, name, true)
if err != nil {

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"fmt"
"net"
"reflect"
@ -457,7 +458,7 @@ func (lbaas *LbaasV2) createLoadBalancer(service *v1.Service, name string, inter
}
// GetLoadBalancer returns whether the specified load balancer exists and its status
func (lbaas *LbaasV2) GetLoadBalancer(clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
func (lbaas *LbaasV2) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (*v1.LoadBalancerStatus, bool, error) {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
loadbalancer, err := getLoadbalancerByName(lbaas.lb, loadBalancerName)
if err == ErrNotFound {
@ -635,7 +636,7 @@ func getFloatingNetworkIDForLB(client *gophercloud.ServiceClient) (string, error
// each region.
// EnsureLoadBalancer creates a new load balancer 'name', or updates the existing one.
func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
func (lbaas *LbaasV2) EnsureLoadBalancer(ctx context.Context, clusterName string, apiService *v1.Service, nodes []*v1.Node) (*v1.LoadBalancerStatus, error) {
glog.V(4).Infof("EnsureLoadBalancer(%v, %v, %v, %v, %v, %v, %v)", clusterName, apiService.Namespace, apiService.Name, apiService.Spec.LoadBalancerIP, apiService.Spec.Ports, nodes, apiService.Annotations)
if len(nodes) == 0 {
@ -972,7 +973,7 @@ func (lbaas *LbaasV2) EnsureLoadBalancer(clusterName string, apiService *v1.Serv
err := lbaas.ensureSecurityGroup(clusterName, apiService, nodes, loadbalancer)
if err != nil {
// cleanup what was created so far
_ = lbaas.EnsureLoadBalancerDeleted(clusterName, apiService)
_ = lbaas.EnsureLoadBalancerDeleted(ctx, clusterName, apiService)
return status, err
}
}
@ -1153,7 +1154,7 @@ func (lbaas *LbaasV2) ensureSecurityGroup(clusterName string, apiService *v1.Ser
}
// UpdateLoadBalancer updates hosts under the specified load balancer.
func (lbaas *LbaasV2) UpdateLoadBalancer(clusterName string, service *v1.Service, nodes []*v1.Node) error {
func (lbaas *LbaasV2) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("UpdateLoadBalancer(%v, %v, %v)", clusterName, loadBalancerName, nodes)
@ -1374,7 +1375,7 @@ func (lbaas *LbaasV2) updateSecurityGroup(clusterName string, apiService *v1.Ser
}
// EnsureLoadBalancerDeleted deletes the specified load balancer
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(clusterName string, service *v1.Service) error {
func (lbaas *LbaasV2) EnsureLoadBalancerDeleted(ctx context.Context, clusterName string, service *v1.Service) error {
loadBalancerName := cloudprovider.GetLoadBalancerName(service)
glog.V(4).Infof("EnsureLoadBalancerDeleted(%v, %v)", clusterName, loadBalancerName)

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"errors"
"github.com/gophercloud/gophercloud"
@ -52,7 +53,7 @@ func NewRoutes(compute *gophercloud.ServiceClient, network *gophercloud.ServiceC
}
// ListRoutes lists all managed routes that belong to the specified clusterName
func (r *Routes) ListRoutes(clusterName string) ([]*cloudprovider.Route, error) {
func (r *Routes) ListRoutes(ctx context.Context, clusterName string) ([]*cloudprovider.Route, error) {
glog.V(4).Infof("ListRoutes(%v)", clusterName)
nodeNamesByAddr := make(map[string]types.NodeName)
@ -140,7 +141,7 @@ func updateAllowedAddressPairs(network *gophercloud.ServiceClient, port *neutron
}
// CreateRoute creates the described managed route
func (r *Routes) CreateRoute(clusterName string, nameHint string, route *cloudprovider.Route) error {
func (r *Routes) CreateRoute(ctx context.Context, clusterName string, nameHint string, route *cloudprovider.Route) error {
glog.V(4).Infof("CreateRoute(%v, %v, %v)", clusterName, nameHint, route)
onFailure := newCaller()
@ -213,7 +214,7 @@ func (r *Routes) CreateRoute(clusterName string, nameHint string, route *cloudpr
}
// DeleteRoute deletes the specified managed route
func (r *Routes) DeleteRoute(clusterName string, route *cloudprovider.Route) error {
func (r *Routes) DeleteRoute(ctx context.Context, clusterName string, route *cloudprovider.Route) error {
glog.V(4).Infof("DeleteRoute(%v, %v)", clusterName, route)
onFailure := newCaller()

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"net"
"testing"
@ -52,12 +53,12 @@ func TestRoutes(t *testing.T) {
DestinationCIDR: "10.164.2.0/24",
TargetNode: types.NodeName(servername),
}
err = r.CreateRoute(clusterName, "myhint", &newroute)
err = r.CreateRoute(context.TODO(), clusterName, "myhint", &newroute)
if err != nil {
t.Fatalf("CreateRoute error: %v", err)
}
routelist, err := r.ListRoutes(clusterName)
routelist, err := r.ListRoutes(context.TODO(), clusterName)
if err != nil {
t.Fatalf("ListRoutes() error: %v", err)
}
@ -70,7 +71,7 @@ func TestRoutes(t *testing.T) {
t.Logf("%s via %s", cidr, route.TargetNode)
}
err = r.DeleteRoute(clusterName, &newroute)
err = r.DeleteRoute(context.TODO(), clusterName, &newroute)
if err != nil {
t.Fatalf("DeleteRoute error: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"fmt"
"os"
"reflect"
@ -492,7 +493,7 @@ func TestLoadBalancer(t *testing.T) {
t.Fatalf("LoadBalancer() returned false - perhaps your stack doesn't support Neutron?")
}
_, exists, err := lb.GetLoadBalancer(testClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "noexist"}})
_, exists, err := lb.GetLoadBalancer(context.TODO(), testClusterName, &v1.Service{ObjectMeta: metav1.ObjectMeta{Name: "noexist"}})
if err != nil {
t.Fatalf("GetLoadBalancer(\"noexist\") returned error: %s", err)
}
@ -518,7 +519,7 @@ func TestZones(t *testing.T) {
t.Fatalf("Zones() returned false")
}
zone, err := z.GetZone()
zone, err := z.GetZone(context.TODO())
if err != nil {
t.Fatalf("GetZone() returned error: %s", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package openstack
import (
"context"
"errors"
"fmt"
"io/ioutil"
@ -691,7 +692,7 @@ func (os *OpenStack) ShouldTrustDevicePath() bool {
}
// GetLabelsForVolume implements PVLabeler.GetLabelsForVolume
func (os *OpenStack) GetLabelsForVolume(pv *v1.PersistentVolume) (map[string]string, error) {
func (os *OpenStack) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.Cinder.VolumeID == k8s_volume.ProvisionedVolumeName {
return nil, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package ovirt
import (
"context"
"encoding/xml"
"fmt"
"io"
@ -154,7 +155,7 @@ func (v *OVirtCloud) Routes() (cloudprovider.Routes, bool) {
}
// NodeAddresses returns the NodeAddresses of the instance with the specified nodeName.
func (v *OVirtCloud) NodeAddresses(nodeName types.NodeName) ([]v1.NodeAddress, error) {
func (v *OVirtCloud) NodeAddresses(ctx context.Context, nodeName types.NodeName) ([]v1.NodeAddress, error) {
name := mapNodeNameToInstanceName(nodeName)
instance, err := v.fetchInstance(name)
if err != nil {
@ -185,7 +186,7 @@ func (v *OVirtCloud) NodeAddresses(nodeName types.NodeName) ([]v1.NodeAddress, e
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (v *OVirtCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (v *OVirtCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, cloudprovider.NotImplemented
}
@ -196,7 +197,7 @@ func mapNodeNameToInstanceName(nodeName types.NodeName) string {
}
// ExternalID returns the cloud provider ID of the specified node with the specified NodeName (deprecated).
func (v *OVirtCloud) ExternalID(nodeName types.NodeName) (string, error) {
func (v *OVirtCloud) ExternalID(ctx context.Context, nodeName types.NodeName) (string, error) {
name := mapNodeNameToInstanceName(nodeName)
instance, err := v.fetchInstance(name)
if err != nil {
@ -207,12 +208,12 @@ func (v *OVirtCloud) ExternalID(nodeName types.NodeName) (string, error) {
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (v *OVirtCloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (v *OVirtCloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, cloudprovider.NotImplemented
}
// InstanceID returns the cloud provider ID of the node with the specified NodeName.
func (v *OVirtCloud) InstanceID(nodeName types.NodeName) (string, error) {
func (v *OVirtCloud) InstanceID(ctx context.Context, nodeName types.NodeName) (string, error) {
name := mapNodeNameToInstanceName(nodeName)
instance, err := v.fetchInstance(name)
if err != nil {
@ -226,12 +227,12 @@ func (v *OVirtCloud) InstanceID(nodeName types.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (v *OVirtCloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (v *OVirtCloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", cloudprovider.NotImplemented
}
// InstanceType returns the type of the specified instance.
func (v *OVirtCloud) InstanceType(name types.NodeName) (string, error) {
func (v *OVirtCloud) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
return "", nil
}
@ -310,10 +311,10 @@ func (m *OVirtInstanceMap) ListSortedNames() []string {
}
// Implementation of Instances.CurrentNodeName
func (v *OVirtCloud) CurrentNodeName(hostname string) (types.NodeName, error) {
func (v *OVirtCloud) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return types.NodeName(hostname), nil
}
func (v *OVirtCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (v *OVirtCloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}

View File

@ -25,6 +25,7 @@ package photon
import (
"bufio"
"context"
"errors"
"fmt"
"io"
@ -92,18 +93,18 @@ type PCConfig struct {
type Disks interface {
// AttachDisk attaches given disk to given node. Current node
// is used when nodeName is empty string.
AttachDisk(pdID string, nodeName k8stypes.NodeName) error
AttachDisk(ctx context.Context, pdID string, nodeName k8stypes.NodeName) error
// DetachDisk detaches given disk to given node. Current node
// is used when nodeName is empty string.
DetachDisk(pdID string, nodeName k8stypes.NodeName) error
DetachDisk(ctx context.Context, pdID string, nodeName k8stypes.NodeName) error
// DiskIsAttached checks if a disk is attached to the given node.
DiskIsAttached(pdID string, nodeName k8stypes.NodeName) (bool, error)
DiskIsAttached(ctx context.Context, pdID string, nodeName k8stypes.NodeName) (bool, error)
// DisksAreAttached is a batch function to check if a list of disks are attached
// to the node with the specified NodeName.
DisksAreAttached(pdIDs []string, nodeName k8stypes.NodeName) (map[string]bool, error)
DisksAreAttached(ctx context.Context, pdIDs []string, nodeName k8stypes.NodeName) (map[string]bool, error)
// CreateDisk creates a new PD with given properties.
CreateDisk(volumeOptions *VolumeOptions) (pdID string, err error)
@ -299,7 +300,7 @@ func (pc *PCCloud) List(filter string) ([]k8stypes.NodeName, error) {
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (pc *PCCloud) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) {
func (pc *PCCloud) NodeAddresses(ctx context.Context, nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) {
nodeAddrs := []v1.NodeAddress{}
name := string(nodeName)
@ -420,15 +421,15 @@ func (pc *PCCloud) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress,
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (pc *PCCloud) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (pc *PCCloud) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, cloudprovider.NotImplemented
}
func (pc *PCCloud) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (pc *PCCloud) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
func (pc *PCCloud) CurrentNodeName(hostname string) (k8stypes.NodeName, error) {
func (pc *PCCloud) CurrentNodeName(ctx context.Context, hostname string) (k8stypes.NodeName, error) {
pc.localK8sHostname = hostname
return k8stypes.NodeName(hostname), nil
}
@ -454,7 +455,7 @@ func getInstanceID(pc *PCCloud, name string) (string, error) {
}
// ExternalID returns the cloud provider ID of the specified instance (deprecated).
func (pc *PCCloud) ExternalID(nodeName k8stypes.NodeName) (string, error) {
func (pc *PCCloud) ExternalID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
name := string(nodeName)
if name == pc.localK8sHostname {
return pc.localInstanceID, nil
@ -472,12 +473,12 @@ func (pc *PCCloud) ExternalID(nodeName k8stypes.NodeName) (string, error) {
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (pc *PCCloud) InstanceExistsByProviderID(providerID string) (bool, error) {
func (pc *PCCloud) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, cloudprovider.NotImplemented
}
// InstanceID returns the cloud provider ID of the specified instance.
func (pc *PCCloud) InstanceID(nodeName k8stypes.NodeName) (string, error) {
func (pc *PCCloud) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
name := string(nodeName)
if name == pc.localK8sHostname {
return pc.localInstanceID, nil
@ -496,11 +497,11 @@ func (pc *PCCloud) InstanceID(nodeName k8stypes.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (pc *PCCloud) InstanceTypeByProviderID(providerID string) (string, error) {
func (pc *PCCloud) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", cloudprovider.NotImplemented
}
func (pc *PCCloud) InstanceType(nodeName k8stypes.NodeName) (string, error) {
func (pc *PCCloud) InstanceType(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
return "", nil
}
@ -523,21 +524,21 @@ func (pc *PCCloud) Zones() (cloudprovider.Zones, bool) {
return pc, true
}
func (pc *PCCloud) GetZone() (cloudprovider.Zone, error) {
func (pc *PCCloud) GetZone(ctx context.Context) (cloudprovider.Zone, error) {
return pc.Zone, nil
}
// GetZoneByProviderID implements Zones.GetZoneByProviderID
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (pc *PCCloud) GetZoneByProviderID(providerID string) (cloudprovider.Zone, error) {
func (pc *PCCloud) GetZoneByProviderID(ctx context.Context, providerID string) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, errors.New("GetZoneByProviderID not implemented")
}
// GetZoneByNodeName implements Zones.GetZoneByNodeName
// This is particularly useful in external cloud providers where the kubelet
// does not initialize node data.
func (pc *PCCloud) GetZoneByNodeName(nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
func (pc *PCCloud) GetZoneByNodeName(ctx context.Context, nodeName k8stypes.NodeName) (cloudprovider.Zone, error) {
return cloudprovider.Zone{}, errors.New("GetZoneByNodeName not imeplemented")
}
@ -552,7 +553,7 @@ func (pc *PCCloud) HasClusterID() bool {
}
// Attaches given virtual disk volume to the compute running kubelet.
func (pc *PCCloud) AttachDisk(pdID string, nodeName k8stypes.NodeName) error {
func (pc *PCCloud) AttachDisk(ctx context.Context, pdID string, nodeName k8stypes.NodeName) error {
photonClient, err := getPhotonClient(pc)
if err != nil {
glog.Errorf("Photon Cloud Provider: Failed to get photon client for AttachDisk, error: [%v]", err)
@ -563,7 +564,7 @@ func (pc *PCCloud) AttachDisk(pdID string, nodeName k8stypes.NodeName) error {
DiskID: pdID,
}
vmID, err := pc.InstanceID(nodeName)
vmID, err := pc.InstanceID(ctx, nodeName)
if err != nil {
glog.Errorf("Photon Cloud Provider: pc.InstanceID failed for AttachDisk. Error[%v]", err)
return err
@ -585,7 +586,7 @@ func (pc *PCCloud) AttachDisk(pdID string, nodeName k8stypes.NodeName) error {
}
// Detaches given virtual disk volume from the compute running kubelet.
func (pc *PCCloud) DetachDisk(pdID string, nodeName k8stypes.NodeName) error {
func (pc *PCCloud) DetachDisk(ctx context.Context, pdID string, nodeName k8stypes.NodeName) error {
photonClient, err := getPhotonClient(pc)
if err != nil {
glog.Errorf("Photon Cloud Provider: Failed to get photon client for DetachDisk, error: [%v]", err)
@ -596,7 +597,7 @@ func (pc *PCCloud) DetachDisk(pdID string, nodeName k8stypes.NodeName) error {
DiskID: pdID,
}
vmID, err := pc.InstanceID(nodeName)
vmID, err := pc.InstanceID(ctx, nodeName)
if err != nil {
glog.Errorf("Photon Cloud Provider: pc.InstanceID failed for DetachDisk. Error[%v]", err)
return err
@ -618,7 +619,7 @@ func (pc *PCCloud) DetachDisk(pdID string, nodeName k8stypes.NodeName) error {
}
// DiskIsAttached returns if disk is attached to the VM using controllers supported by the plugin.
func (pc *PCCloud) DiskIsAttached(pdID string, nodeName k8stypes.NodeName) (bool, error) {
func (pc *PCCloud) DiskIsAttached(ctx context.Context, pdID string, nodeName k8stypes.NodeName) (bool, error) {
photonClient, err := getPhotonClient(pc)
if err != nil {
glog.Errorf("Photon Cloud Provider: Failed to get photon client for DiskIsAttached, error: [%v]", err)
@ -631,7 +632,7 @@ func (pc *PCCloud) DiskIsAttached(pdID string, nodeName k8stypes.NodeName) (bool
return false, err
}
vmID, err := pc.InstanceID(nodeName)
vmID, err := pc.InstanceID(ctx, nodeName)
if err == cloudprovider.InstanceNotFound {
glog.Infof("Instance %q does not exist, disk %s will be detached automatically.", nodeName, pdID)
return false, nil
@ -651,7 +652,7 @@ func (pc *PCCloud) DiskIsAttached(pdID string, nodeName k8stypes.NodeName) (bool
}
// DisksAreAttached returns if disks are attached to the VM using controllers supported by the plugin.
func (pc *PCCloud) DisksAreAttached(pdIDs []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
func (pc *PCCloud) DisksAreAttached(ctx context.Context, pdIDs []string, nodeName k8stypes.NodeName) (map[string]bool, error) {
attached := make(map[string]bool)
photonClient, err := getPhotonClient(pc)
if err != nil {
@ -663,7 +664,7 @@ func (pc *PCCloud) DisksAreAttached(pdIDs []string, nodeName k8stypes.NodeName)
attached[pdID] = false
}
vmID, err := pc.InstanceID(nodeName)
vmID, err := pc.InstanceID(ctx, nodeName)
if err == cloudprovider.InstanceNotFound {
glog.Infof("Instance %q does not exist, its disks will be detached automatically.", nodeName)
// make all the disks as detached.

View File

@ -17,6 +17,7 @@ limitations under the License.
package photon
import (
"context"
"log"
"os"
"strconv"
@ -132,14 +133,14 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances() returned false")
}
externalId, err := i.ExternalID(NodeName)
externalId, err := i.ExternalID(context.TODO(), NodeName)
if err != nil {
t.Fatalf("Instances.ExternalID(%s) failed: %s", testVM, err)
}
t.Logf("Found ExternalID(%s) = %s\n", testVM, externalId)
nonExistingVM := types.NodeName(rand.String(15))
externalId, err = i.ExternalID(nonExistingVM)
externalId, err = i.ExternalID(context.TODO(), nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -148,13 +149,13 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances.ExternalID did not fail as expected, err: %v", err)
}
instanceId, err := i.InstanceID(NodeName)
instanceId, err := i.InstanceID(context.TODO(), NodeName)
if err != nil {
t.Fatalf("Instances.InstanceID(%s) failed: %s", testVM, err)
}
t.Logf("Found InstanceID(%s) = %s\n", testVM, instanceId)
instanceId, err = i.InstanceID(nonExistingVM)
instanceId, err = i.InstanceID(context.TODO(), nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -163,7 +164,7 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances.InstanceID did not fail as expected, err: %v", err)
}
addrs, err := i.NodeAddresses(NodeName)
addrs, err := i.NodeAddresses(context.TODO(), NodeName)
if err != nil {
t.Fatalf("Instances.NodeAddresses(%s) failed: %s", testVM, err)
}
@ -194,17 +195,17 @@ func TestVolumes(t *testing.T) {
t.Fatalf("Cannot create a Photon persistent disk: %v", err)
}
err = pc.AttachDisk(pdID, NodeName)
err = pc.AttachDisk(context.TODO(), pdID, NodeName)
if err != nil {
t.Fatalf("Cannot attach persistent disk(%s) to VM(%s): %v", pdID, testVM, err)
}
_, err = pc.DiskIsAttached(pdID, NodeName)
_, err = pc.DiskIsAttached(context.TODO(), pdID, NodeName)
if err != nil {
t.Fatalf("Cannot attach persistent disk(%s) to VM(%s): %v", pdID, testVM, err)
}
err = pc.DetachDisk(pdID, NodeName)
err = pc.DetachDisk(context.TODO(), pdID, NodeName)
if err != nil {
t.Fatalf("Cannot detach persisten disk(%s) from VM(%s): %v", pdID, testVM, err)
}

View File

@ -487,7 +487,7 @@ func (vs *VSphere) getVMFromNodeName(ctx context.Context, nodeName k8stypes.Node
}
// NodeAddresses is an implementation of Instances.NodeAddresses.
func (vs *VSphere) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) {
func (vs *VSphere) NodeAddresses(ctx context.Context, nodeName k8stypes.NodeName) ([]v1.NodeAddress, error) {
// Get local IP addresses if node is local node
if vs.hostName == convertToString(nodeName) {
return getLocalIP()
@ -546,17 +546,17 @@ func (vs *VSphere) NodeAddresses(nodeName k8stypes.NodeName) ([]v1.NodeAddress,
// NodeAddressesByProviderID returns the node addresses of an instances with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (vs *VSphere) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
return vs.NodeAddresses(convertToK8sType(providerID))
func (vs *VSphere) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return vs.NodeAddresses(ctx, convertToK8sType(providerID))
}
// AddSSHKeyToAllInstances add SSH key to all instances
func (vs *VSphere) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (vs *VSphere) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return cloudprovider.NotImplemented
}
// CurrentNodeName gives the current node name
func (vs *VSphere) CurrentNodeName(hostname string) (k8stypes.NodeName, error) {
func (vs *VSphere) CurrentNodeName(ctx context.Context, hostname string) (k8stypes.NodeName, error) {
return convertToK8sType(vs.hostName), nil
}
@ -569,14 +569,14 @@ func convertToK8sType(vmName string) k8stypes.NodeName {
}
// ExternalID returns the cloud provider ID of the node with the specified Name (deprecated).
func (vs *VSphere) ExternalID(nodeName k8stypes.NodeName) (string, error) {
return vs.InstanceID(nodeName)
func (vs *VSphere) ExternalID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
return vs.InstanceID(ctx, nodeName)
}
// InstanceExistsByProviderID returns true if the instance with the given provider id still exists and is running.
// If false is returned with no error, the instance will be immediately deleted by the cloud controller manager.
func (vs *VSphere) InstanceExistsByProviderID(providerID string) (bool, error) {
_, err := vs.InstanceID(convertToK8sType(providerID))
func (vs *VSphere) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
_, err := vs.InstanceID(ctx, convertToK8sType(providerID))
if err == nil {
return true, nil
}
@ -585,7 +585,7 @@ func (vs *VSphere) InstanceExistsByProviderID(providerID string) (bool, error) {
}
// InstanceID returns the cloud provider ID of the node with the specified Name.
func (vs *VSphere) InstanceID(nodeName k8stypes.NodeName) (string, error) {
func (vs *VSphere) InstanceID(ctx context.Context, nodeName k8stypes.NodeName) (string, error) {
instanceIDInternal := func() (string, error) {
if vs.hostName == convertToString(nodeName) {
@ -649,11 +649,11 @@ func (vs *VSphere) InstanceID(nodeName k8stypes.NodeName) (string, error) {
// InstanceTypeByProviderID returns the cloudprovider instance type of the node with the specified unique providerID
// This method will not be called from the node that is requesting this ID. i.e. metadata service
// and other local methods cannot be used here
func (vs *VSphere) InstanceTypeByProviderID(providerID string) (string, error) {
func (vs *VSphere) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", nil
}
func (vs *VSphere) InstanceType(name k8stypes.NodeName) (string, error) {
func (vs *VSphere) InstanceType(ctx context.Context, name k8stypes.NodeName) (string, error) {
return "", nil
}

View File

@ -169,19 +169,19 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances() returned false")
}
nodeName, err := vs.CurrentNodeName("")
nodeName, err := vs.CurrentNodeName(context.TODO(), "")
if err != nil {
t.Fatalf("CurrentNodeName() failed: %s", err)
}
externalID, err := i.ExternalID(nodeName)
externalID, err := i.ExternalID(context.TODO(), nodeName)
if err != nil {
t.Fatalf("Instances.ExternalID(%s) failed: %s", nodeName, err)
}
t.Logf("Found ExternalID(%s) = %s\n", nodeName, externalID)
nonExistingVM := types.NodeName(rand.String(15))
externalID, err = i.ExternalID(nonExistingVM)
externalID, err = i.ExternalID(context.TODO(), nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -190,13 +190,13 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances.ExternalID did not fail as expected, err: %v", err)
}
instanceID, err := i.InstanceID(nodeName)
instanceID, err := i.InstanceID(context.TODO(), nodeName)
if err != nil {
t.Fatalf("Instances.InstanceID(%s) failed: %s", nodeName, err)
}
t.Logf("Found InstanceID(%s) = %s\n", nodeName, instanceID)
instanceID, err = i.InstanceID(nonExistingVM)
instanceID, err = i.InstanceID(context.TODO(), nonExistingVM)
if err == cloudprovider.InstanceNotFound {
t.Logf("VM %s was not found as expected\n", nonExistingVM)
} else if err == nil {
@ -205,7 +205,7 @@ func TestInstances(t *testing.T) {
t.Fatalf("Instances.InstanceID did not fail as expected, err: %v", err)
}
addrs, err := i.NodeAddresses(nodeName)
addrs, err := i.NodeAddresses(context.TODO(), nodeName)
if err != nil {
t.Fatalf("Instances.NodeAddresses(%s) failed: %s", nodeName, err)
}
@ -223,7 +223,7 @@ func TestVolumes(t *testing.T) {
t.Fatalf("Failed to construct/authenticate vSphere: %s", err)
}
nodeName, err := vs.CurrentNodeName("")
nodeName, err := vs.CurrentNodeName(context.TODO(), "")
if err != nil {
t.Fatalf("CurrentNodeName() failed: %s", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloud
import (
"context"
"fmt"
"time"
@ -299,7 +300,7 @@ func (cnc *CloudNodeController) AddCloudNode(obj interface{}) {
}
if curNode.Spec.ProviderID == "" {
providerID, err := cloudprovider.GetInstanceProviderID(cnc.cloud, types.NodeName(curNode.Name))
providerID, err := cloudprovider.GetInstanceProviderID(context.TODO(), cnc.cloud, types.NodeName(curNode.Name))
if err == nil {
curNode.Spec.ProviderID = providerID
} else {
@ -402,10 +403,10 @@ func excludeTaintFromList(taints []v1.Taint, toExclude v1.Taint) []v1.Taint {
// ensureNodeExistsByProviderIDOrExternalID first checks if the instance exists by the provider id and then by calling external id with node name
func ensureNodeExistsByProviderIDOrExternalID(instances cloudprovider.Instances, node *v1.Node) (bool, error) {
exists, err := instances.InstanceExistsByProviderID(node.Spec.ProviderID)
exists, err := instances.InstanceExistsByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
_, err = instances.ExternalID(types.NodeName(node.Name))
_, err = instances.ExternalID(context.TODO(), types.NodeName(node.Name))
if err == nil {
return true, nil
}
@ -420,10 +421,10 @@ func ensureNodeExistsByProviderIDOrExternalID(instances cloudprovider.Instances,
}
func getNodeAddressesByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) ([]v1.NodeAddress, error) {
nodeAddresses, err := instances.NodeAddressesByProviderID(node.Spec.ProviderID)
nodeAddresses, err := instances.NodeAddressesByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
nodeAddresses, err = instances.NodeAddresses(types.NodeName(node.Name))
nodeAddresses, err = instances.NodeAddresses(context.TODO(), types.NodeName(node.Name))
if err != nil {
return nil, fmt.Errorf("NodeAddress: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
@ -467,10 +468,10 @@ func ensureNodeProvidedIPExists(node *v1.Node, nodeAddresses []v1.NodeAddress) (
}
func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *v1.Node) (string, error) {
instanceType, err := instances.InstanceTypeByProviderID(node.Spec.ProviderID)
instanceType, err := instances.InstanceTypeByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
instanceType, err = instances.InstanceType(types.NodeName(node.Name))
instanceType, err = instances.InstanceType(context.TODO(), types.NodeName(node.Name))
if err != nil {
return "", fmt.Errorf("InstanceType: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}
@ -481,10 +482,10 @@ func getInstanceTypeByProviderIDOrName(instances cloudprovider.Instances, node *
// getZoneByProviderIDorName will attempt to get the zone of node using its providerID
// then it's name. If both attempts fail, an error is returned
func getZoneByProviderIDOrName(zones cloudprovider.Zones, node *v1.Node) (cloudprovider.Zone, error) {
zone, err := zones.GetZoneByProviderID(node.Spec.ProviderID)
zone, err := zones.GetZoneByProviderID(context.TODO(), node.Spec.ProviderID)
if err != nil {
providerIDErr := err
zone, err = zones.GetZoneByNodeName(types.NodeName(node.Name))
zone, err = zones.GetZoneByNodeName(context.TODO(), types.NodeName(node.Name))
if err != nil {
return cloudprovider.Zone{}, fmt.Errorf("Zone: Error fetching by providerID: %v Error fetching by NodeName: %v", providerIDErr, err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cloud
import (
"context"
"encoding/json"
"fmt"
"time"
@ -185,7 +186,7 @@ func (pvlc *PersistentVolumeLabelController) addLabelsToVolume(vol *v1.Persisten
// Only add labels if the next pending initializer.
if needsInitialization(vol.Initializers, initializerName) {
if labeler, ok := (pvlc.cloud).(cloudprovider.PVLabeler); ok {
labels, err := labeler.GetLabelsForVolume(vol)
labels, err := labeler.GetLabelsForVolume(context.TODO(), vol)
if err != nil {
return fmt.Errorf("error querying volume %v: %v", vol.Spec, err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package route
import (
"context"
"fmt"
"net"
"sync"
@ -121,7 +122,7 @@ func (rc *RouteController) Run(stopCh <-chan struct{}, syncPeriod time.Duration)
}
func (rc *RouteController) reconcileNodeRoutes() error {
routeList, err := rc.routes.ListRoutes(rc.clusterName)
routeList, err := rc.routes.ListRoutes(context.TODO(), rc.clusterName)
if err != nil {
return fmt.Errorf("error listing routes: %v", err)
}
@ -170,7 +171,7 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
// CreateRoute calls in flight.
rateLimiter <- struct{}{}
glog.Infof("Creating route for node %s %s with hint %s, throttled %v", nodeName, route.DestinationCIDR, nameHint, time.Now().Sub(startTime))
err := rc.routes.CreateRoute(rc.clusterName, nameHint, route)
err := rc.routes.CreateRoute(context.TODO(), rc.clusterName, nameHint, route)
<-rateLimiter
rc.updateNetworkingCondition(nodeName, err == nil)
@ -210,7 +211,7 @@ func (rc *RouteController) reconcile(nodes []*v1.Node, routes []*cloudprovider.R
// Delete the route.
go func(route *cloudprovider.Route, startTime time.Time) {
glog.Infof("Deleting route %s %s", route.Name, route.DestinationCIDR)
if err := rc.routes.DeleteRoute(rc.clusterName, route); err != nil {
if err := rc.routes.DeleteRoute(context.TODO(), rc.clusterName, route); err != nil {
glog.Errorf("Could not delete route %s %s after %v: %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime), err)
} else {
glog.Infof("Deleted route %s %s after %v", route.Name, route.DestinationCIDR, time.Now().Sub(startTime))

View File

@ -17,6 +17,7 @@ limitations under the License.
package route
import (
"context"
"net"
"testing"
"time"
@ -279,7 +280,7 @@ func TestReconcile(t *testing.T) {
for {
select {
case <-tick.C:
if finalRoutes, err = routes.ListRoutes(cluster); err == nil && routeListEqual(finalRoutes, testCase.expectedRoutes) {
if finalRoutes, err = routes.ListRoutes(context.TODO(), cluster); err == nil && routeListEqual(finalRoutes, testCase.expectedRoutes) {
break poll
}
case <-timeoutChan:

View File

@ -17,6 +17,7 @@ limitations under the License.
package service
import (
"context"
"fmt"
"sync"
"time"
@ -281,14 +282,14 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
var err error
if !wantsLoadBalancer(service) {
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
_, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err)
}
if exists {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil {
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
@ -370,7 +371,7 @@ func (s *ServiceController) ensureLoadBalancer(service *v1.Service) (*v1.LoadBal
// - Only one protocol supported per service
// - Not all cloud providers support all protocols and the next step is expected to return
// an error for unsupported protocols
return s.balancer.EnsureLoadBalancer(s.clusterName, service, nodes)
return s.balancer.EnsureLoadBalancer(context.TODO(), s.clusterName, service, nodes)
}
// ListKeys implements the interface required by DeltaFIFO to list the keys we
@ -672,7 +673,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
}
// This operation doesn't normally take very long (and happens pretty often), so we only record the final event
err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts)
err := s.balancer.UpdateLoadBalancer(context.TODO(), s.clusterName, service, hosts)
if err == nil {
// If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it.
if len(hosts) == 0 {
@ -684,7 +685,7 @@ func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, h
}
// It's only an actual error if the load balancer still exists.
if _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service); err != nil {
if _, exists, err := s.balancer.GetLoadBalancer(context.TODO(), s.clusterName, service); err != nil {
glog.Errorf("External error while checking if load balancer %q exists: name, %v", cloudprovider.GetLoadBalancerName(service), err)
} else if !exists {
return nil
@ -753,7 +754,7 @@ func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedServi
return nil
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service)
if err != nil {
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err)
return err

View File

@ -17,6 +17,7 @@ limitations under the License.
package node
import (
"context"
"errors"
"fmt"
"strings"
@ -177,7 +178,7 @@ func ExistsInCloudProvider(cloud cloudprovider.Interface, nodeName types.NodeNam
if !ok {
return false, fmt.Errorf("%v", ErrCloudInstance)
}
if _, err := instances.ExternalID(nodeName); err != nil {
if _, err := instances.ExternalID(context.TODO(), nodeName); err != nil {
if err == cloudprovider.InstanceNotFound {
return false, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"context"
"crypto/tls"
"fmt"
"net"
@ -369,7 +370,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, fmt.Errorf("failed to get instances from cloud provider")
}
nodeName, err = instances.CurrentNodeName(hostname)
nodeName, err = instances.CurrentNodeName(context.TODO(), hostname)
if err != nil {
return nil, fmt.Errorf("error fetching current instance name from cloud provider: %v", err)
}
@ -377,7 +378,7 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
glog.V(2).Infof("cloud provider determined current node name to be %s", nodeName)
if utilfeature.DefaultFeatureGate.Enabled(features.RotateKubeletServerCertificate) {
nodeAddresses, err := instances.NodeAddresses(nodeName)
nodeAddresses, err := instances.NodeAddresses(context.TODO(), nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get the addresses of the current instance from the cloud provider: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package kubelet
import (
"context"
"fmt"
"math"
"net"
@ -303,7 +304,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// TODO(roberthbailey): Can we do this without having credentials to talk
// to the cloud provider?
// TODO: ExternalID is deprecated, we'll have to drop this code
externalID, err := instances.ExternalID(kl.nodeName)
externalID, err := instances.ExternalID(context.TODO(), kl.nodeName)
if err != nil {
return nil, fmt.Errorf("failed to get external ID from cloud provider: %v", err)
}
@ -313,13 +314,13 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// cloudprovider from arbitrary nodes. At most, we should talk to a
// local metadata server here.
if node.Spec.ProviderID == "" {
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(kl.cloud, kl.nodeName)
node.Spec.ProviderID, err = cloudprovider.GetInstanceProviderID(context.TODO(), kl.cloud, kl.nodeName)
if err != nil {
return nil, err
}
}
instanceType, err := instances.InstanceType(kl.nodeName)
instanceType, err := instances.InstanceType(context.TODO(), kl.nodeName)
if err != nil {
return nil, err
}
@ -330,7 +331,7 @@ func (kl *Kubelet) initialNode() (*v1.Node, error) {
// If the cloud has zone information, label the node with the zone information
zones, ok := kl.cloud.Zones()
if ok {
zone, err := zones.GetZone()
zone, err := zones.GetZone(context.TODO())
if err != nil {
return nil, fmt.Errorf("failed to get zone from cloud provider: %v", err)
}
@ -453,7 +454,7 @@ func (kl *Kubelet) setNodeAddress(node *v1.Node) error {
// to the cloud provider?
// TODO(justinsb): We can if CurrentNodeName() was actually CurrentNode() and returned an interface
// TODO: If IP addresses couldn't be fetched from the cloud provider, should kubelet fallback on the other methods for getting the IP below?
nodeAddresses, err := instances.NodeAddresses(kl.nodeName)
nodeAddresses, err := instances.NodeAddresses(context.TODO(), kl.nodeName)
if err != nil {
return fmt.Errorf("failed to get node address from cloud provider: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package tunneler
import (
"context"
"fmt"
"io/ioutil"
"net"
@ -35,7 +36,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
type InstallSSHKey func(user string, data []byte) error
type InstallSSHKey func(ctx context.Context, user string, data []byte) error
type AddressFunc func() (addresses []string, err error)
@ -175,7 +176,7 @@ func (c *SSHTunneler) installSSHKeySyncLoop(user, publicKeyfile string) {
glog.Errorf("Failed to encode public key: %v", err)
return
}
if err := c.InstallSSHKey(user, keyData); err != nil {
if err := c.InstallSSHKey(context.TODO(), user, keyData); err != nil {
glog.Errorf("Failed to install ssh key: %v", err)
return
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package azure_dd
import (
"context"
"fmt"
"os"
"path"
@ -64,7 +65,7 @@ func (a *azureDiskAttacher) Attach(spec *volume.Spec, nodeName types.NodeName) (
return "", err
}
instanceid, err := a.cloud.InstanceID(nodeName)
instanceid, err := a.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
glog.Warningf("failed to get azure instance id (%v)", err)
return "", fmt.Errorf("failed to get azure instance id for node %q (%v)", nodeName, err)
@ -271,7 +272,7 @@ func (d *azureDiskDetacher) Detach(diskURI string, nodeName types.NodeName) erro
return fmt.Errorf("invalid disk to detach: %q", diskURI)
}
instanceid, err := d.cloud.InstanceID(nodeName)
instanceid, err := d.cloud.InstanceID(context.TODO(), nodeName)
if err != nil {
glog.Warningf("no instance id for node %q, skip detaching (%v)", nodeName, err)
return nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package cinder
import (
"context"
"fmt"
"os"
"path"
@ -401,7 +402,7 @@ func (attacher *cinderDiskAttacher) nodeInstanceID(nodeName types.NodeName) (str
if !res {
return "", fmt.Errorf("failed to list openstack instances")
}
instanceID, err := instances.InstanceID(nodeName)
instanceID, err := instances.InstanceID(context.TODO(), nodeName)
if err != nil {
return "", err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package cinder
import (
"context"
"errors"
"reflect"
"testing"
@ -703,31 +704,31 @@ type instances struct {
instanceID string
}
func (instances *instances) NodeAddresses(name types.NodeName) ([]v1.NodeAddress, error) {
func (instances *instances) NodeAddresses(ctx context.Context, name types.NodeName) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("Not implemented")
}
func (instances *instances) NodeAddressesByProviderID(providerID string) ([]v1.NodeAddress, error) {
func (instances *instances) NodeAddressesByProviderID(ctx context.Context, providerID string) ([]v1.NodeAddress, error) {
return []v1.NodeAddress{}, errors.New("Not implemented")
}
func (instances *instances) ExternalID(name types.NodeName) (string, error) {
func (instances *instances) ExternalID(ctx context.Context, name types.NodeName) (string, error) {
return "", errors.New("Not implemented")
}
func (instances *instances) InstanceID(name types.NodeName) (string, error) {
func (instances *instances) InstanceID(ctx context.Context, name types.NodeName) (string, error) {
return instances.instanceID, nil
}
func (instances *instances) InstanceType(name types.NodeName) (string, error) {
func (instances *instances) InstanceType(ctx context.Context, name types.NodeName) (string, error) {
return "", errors.New("Not implemented")
}
func (instances *instances) InstanceTypeByProviderID(providerID string) (string, error) {
func (instances *instances) InstanceTypeByProviderID(ctx context.Context, providerID string) (string, error) {
return "", errors.New("Not implemented")
}
func (instances *instances) InstanceExistsByProviderID(providerID string) (bool, error) {
func (instances *instances) InstanceExistsByProviderID(ctx context.Context, providerID string) (bool, error) {
return false, errors.New("unimplemented")
}
@ -735,10 +736,10 @@ func (instances *instances) List(filter string) ([]types.NodeName, error) {
return []types.NodeName{}, errors.New("Not implemented")
}
func (instances *instances) AddSSHKeyToAllInstances(user string, keyData []byte) error {
func (instances *instances) AddSSHKeyToAllInstances(ctx context.Context, user string, keyData []byte) error {
return errors.New("Not implemented")
}
func (instances *instances) CurrentNodeName(hostname string) (types.NodeName, error) {
func (instances *instances) CurrentNodeName(ctx context.Context, hostname string) (types.NodeName, error) {
return "", errors.New("Not implemented")
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package photon_pd
import (
"context"
"fmt"
"os"
"path"
@ -68,7 +69,7 @@ func (attacher *photonPersistentDiskAttacher) Attach(spec *volume.Spec, nodeName
glog.Errorf("Photon Controller attacher: Attach failed to get volume source")
return "", err
}
attached, err := attacher.photonDisks.DiskIsAttached(volumeSource.PdID, nodeName)
attached, err := attacher.photonDisks.DiskIsAttached(context.TODO(), volumeSource.PdID, nodeName)
if err != nil {
glog.Warningf("Photon Controller: couldn't check if disk is Attached for host %s, will try attach disk: %+v", hostName, err)
@ -78,7 +79,7 @@ func (attacher *photonPersistentDiskAttacher) Attach(spec *volume.Spec, nodeName
if !attached {
glog.V(4).Infof("Photon Controller: Attach disk called for host %s", hostName)
err = attacher.photonDisks.AttachDisk(volumeSource.PdID, nodeName)
err = attacher.photonDisks.AttachDisk(context.TODO(), volumeSource.PdID, nodeName)
if err != nil {
glog.Errorf("Error attaching volume %q to node %q: %+v", volumeSource.PdID, nodeName, err)
return "", err
@ -104,7 +105,7 @@ func (attacher *photonPersistentDiskAttacher) VolumesAreAttached(specs []*volume
volumesAttachedCheck[spec] = true
volumeSpecMap[volumeSource.PdID] = spec
}
attachedResult, err := attacher.photonDisks.DisksAreAttached(pdIDList, nodeName)
attachedResult, err := attacher.photonDisks.DisksAreAttached(context.TODO(), pdIDList, nodeName)
if err != nil {
glog.Errorf(
"Error checking if volumes (%v) are attached to current node (%q). err=%v",
@ -247,7 +248,7 @@ func (detacher *photonPersistentDiskDetacher) Detach(volumeName string, nodeName
hostName := string(nodeName)
pdID := volumeName
attached, err := detacher.photonDisks.DiskIsAttached(pdID, nodeName)
attached, err := detacher.photonDisks.DiskIsAttached(context.TODO(), pdID, nodeName)
if err != nil {
// Log error and continue with detach
glog.Errorf(
@ -261,7 +262,7 @@ func (detacher *photonPersistentDiskDetacher) Detach(volumeName string, nodeName
return nil
}
if err := detacher.photonDisks.DetachDisk(pdID, nodeName); err != nil {
if err := detacher.photonDisks.DetachDisk(context.TODO(), pdID, nodeName); err != nil {
glog.Errorf("Error detaching volume %q: %v", pdID, err)
return err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package photon_pd
import (
"context"
"errors"
"testing"
@ -233,7 +234,7 @@ type diskIsAttachedCall struct {
ret error
}
func (testcase *testcase) AttachDisk(diskName string, nodeName types.NodeName) error {
func (testcase *testcase) AttachDisk(ctx context.Context, diskName string, nodeName types.NodeName) error {
expected := &testcase.attach
if expected.diskName == "" && expected.nodeName == "" {
@ -258,7 +259,7 @@ func (testcase *testcase) AttachDisk(diskName string, nodeName types.NodeName) e
return expected.ret
}
func (testcase *testcase) DetachDisk(diskName string, nodeName types.NodeName) error {
func (testcase *testcase) DetachDisk(ctx context.Context, diskName string, nodeName types.NodeName) error {
expected := &testcase.detach
if expected.diskName == "" && expected.nodeName == "" {
@ -283,7 +284,7 @@ func (testcase *testcase) DetachDisk(diskName string, nodeName types.NodeName) e
return expected.ret
}
func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeName) (bool, error) {
func (testcase *testcase) DiskIsAttached(ctx context.Context, diskName string, nodeName types.NodeName) (bool, error) {
expected := &testcase.diskIsAttached
if expected.diskName == "" && expected.nodeName == "" {
@ -308,7 +309,7 @@ func (testcase *testcase) DiskIsAttached(diskName string, nodeName types.NodeNam
return expected.isAttached, expected.ret
}
func (testcase *testcase) DisksAreAttached(diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
func (testcase *testcase) DisksAreAttached(ctx context.Context, diskNames []string, nodeName types.NodeName) (map[string]bool, error) {
return nil, errors.New("Not implemented")
}