remove cloud provider dependencies to pkg/volume

Co-authored-by: Weibin Lin <linweibin1@huawei.com>
pull/564/head
Andrew Kim 2019-02-05 13:44:45 -05:00
parent fd633d192f
commit ca6a051b00
53 changed files with 2200 additions and 2227 deletions

View File

@ -28,7 +28,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/cloudprovider/providers/aws",
deps = [
"//pkg/credentialprovider/aws:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -44,6 +43,8 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/awserr:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws/credentials:go_default_library",
@ -78,11 +79,11 @@ go_test(
],
embed = [":go_default_library"],
deps = [
"//pkg/volume:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/types:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/aws:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/ec2:go_default_library",
"//vendor/github.com/aws/aws-sdk-go/service/elb:go_default_library",

View File

@ -60,7 +60,8 @@ import (
cloudprovider "k8s.io/cloud-provider"
nodehelpers "k8s.io/cloud-provider/node/helpers"
servicehelpers "k8s.io/cloud-provider/service/helpers"
"k8s.io/kubernetes/pkg/volume"
cloudvolume "k8s.io/cloud-provider/volume"
volerr "k8s.io/cloud-provider/volume/errors"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
@ -2035,7 +2036,7 @@ func (d *awsDisk) deleteVolume() (bool, error) {
}
if awsError, ok := err.(awserr.Error); ok {
if awsError.Code() == "VolumeInUse" {
return false, volume.NewDeletedVolumeInUseError(err.Error())
return false, volerr.NewDeletedVolumeInUseError(err.Error())
}
}
return false, fmt.Errorf("error deleting EBS volume %q: %q", d.awsID, err)
@ -2424,7 +2425,7 @@ func (c *Cloud) checkIfAvailable(disk *awsDisk, opName string, instance string)
devicePath := aws.StringValue(attachment.Device)
nodeName := mapInstanceToNodeName(attachedInstance)
danglingErr := volumeutil.NewDanglingError(attachErr, nodeName, devicePath)
danglingErr := volerr.NewDanglingError(attachErr, nodeName, devicePath)
return false, danglingErr
}
@ -2443,7 +2444,7 @@ func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume)
}
// Ignore any volumes that are being provisioned
if pv.Spec.AWSElasticBlockStore.VolumeID == volume.ProvisionedVolumeName {
if pv.Spec.AWSElasticBlockStore.VolumeID == cloudvolume.ProvisionedVolumeName {
return nil, nil
}

View File

@ -35,7 +35,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/volume"
cloudvolume "k8s.io/cloud-provider/volume"
)
const TestClusterID = "clusterid.test"
@ -1221,7 +1221,7 @@ func TestGetLabelsForVolume(t *testing.T) {
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
AWSElasticBlockStore: &v1.AWSElasticBlockStoreVolumeSource{
VolumeID: volume.ProvisionedVolumeName,
VolumeID: cloudvolume.ProvisionedVolumeName,
},
},
},

View File

@ -37,7 +37,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/cloudprovider/providers/azure",
deps = [
"//pkg/cloudprovider/providers/azure/auth:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -57,6 +56,8 @@ go_library(
"//staging/src/k8s.io/client-go/util/flowcontrol:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/network/mgmt/2017-09-01/network:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage:go_default_library",

View File

@ -31,10 +31,10 @@ import (
azstorage "github.com/Azure/azure-sdk-for-go/storage"
"github.com/Azure/go-autorest/autorest/to"
"github.com/rubiojr/go-vhd/vhd"
"k8s.io/klog"
kwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
volerr "k8s.io/cloud-provider/volume/errors"
"k8s.io/klog"
)
const (
@ -119,7 +119,7 @@ func (c *BlobDiskController) DeleteVolume(diskURI string) error {
if strings.Contains(detail, errLeaseIDMissing) {
// disk is still being used
// see https://msdn.microsoft.com/en-us/library/microsoft.windowsazure.storage.blob.protocol.bloberrorcodestrings.leaseidmissing.aspx
return volume.NewDeletedVolumeInUseError(fmt.Sprintf("disk %q is still in use while being deleted", diskURI))
return volerr.NewDeletedVolumeInUseError(fmt.Sprintf("disk %q is still in use while being deleted", diskURI))
}
return fmt.Errorf("failed to delete vhd %v, account %s, blob %s, err: %v", diskURI, accountName, blob, err)
}

View File

@ -30,7 +30,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
kwait "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/kubernetes/pkg/volume"
cloudvolume "k8s.io/cloud-provider/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
@ -281,7 +281,7 @@ func (c *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume)
}
// Ignore any volumes that are being provisioned
if pv.Spec.AzureDisk.DiskName == volume.ProvisionedVolumeName {
if pv.Spec.AzureDisk.DiskName == cloudvolume.ProvisionedVolumeName {
return nil, nil
}

View File

@ -41,7 +41,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/cloudprovider/providers/gce",
visibility = ["//visibility:public"],
deps = [
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -66,6 +65,9 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/features:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/cloud.google.com/go/compute/metadata:go_default_library",
"//vendor/github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud:go_default_library",
"//vendor/github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud/filter:go_default_library",

View File

@ -29,7 +29,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
"k8s.io/kubernetes/pkg/volume"
cloudvolume "k8s.io/cloud-provider/volume"
volerr "k8s.io/cloud-provider/volume/errors"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"github.com/GoogleCloudPlatform/k8s-cloud-provider/pkg/cloud"
@ -504,7 +506,7 @@ func newDiskMetricContextRegional(request, region string) *metricContext {
// GetLabelsForVolume retrieved the label info for the provided volume
func (g *Cloud) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if pv.Spec.GCEPersistentDisk.PDName == volume.ProvisionedVolumeName {
if pv.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
return nil, nil
}
@ -730,7 +732,7 @@ func getDiskType(diskType string) (string, error) {
func (g *Cloud) DeleteDisk(diskToDelete string) error {
err := g.doDeleteDisk(diskToDelete)
if isGCEError(err, "resourceInUseByAnotherResource") {
return volume.NewDeletedVolumeInUseError(err.Error())
return volerr.NewDeletedVolumeInUseError(err.Error())
}
if err == cloudprovider.DiskNotFound {
@ -811,7 +813,7 @@ func (g *Cloud) GetAutoLabelsForPD(name string, zone string) (map[string]string,
// However it is more consistent to ensure the disk exists,
// and in future we may gather addition information (e.g. disk type, IOPS etc)
if utilfeature.DefaultFeatureGate.Enabled(cloudfeatures.GCERegionalPersistentDisk) {
zoneSet, err := volumeutil.LabelZonesToSet(zone)
zoneSet, err := volumehelpers.LabelZonesToSet(zone)
if err != nil {
klog.Warningf("Failed to parse zone field: %q. Will use raw field.", zone)
}
@ -852,7 +854,7 @@ func (g *Cloud) GetAutoLabelsForPD(name string, zone string) (map[string]string,
return nil, fmt.Errorf("PD is regional but does not have any replicaZones specified: %v", disk)
}
labels[v1.LabelZoneFailureDomain] =
volumeutil.ZonesSetToLabelValue(zoneInfo.replicaZones)
volumehelpers.ZonesSetToLabelValue(zoneInfo.replicaZones)
labels[v1.LabelZoneRegion] = disk.Region
case nil:
// Unexpected, but sanity-check

View File

@ -21,7 +21,6 @@ go_library(
importpath = "k8s.io/kubernetes/pkg/cloudprovider/providers/openstack",
deps = [
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -33,6 +32,8 @@ go_library(
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/node/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/service/helpers:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//vendor/github.com/gophercloud/gophercloud:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack:go_default_library",
"//vendor/github.com/gophercloud/gophercloud/openstack/blockstorage/extensions/volumeactions:go_default_library",

View File

@ -30,7 +30,8 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/types"
cloudprovider "k8s.io/cloud-provider"
k8s_volume "k8s.io/kubernetes/pkg/volume"
cloudvolume "k8s.io/cloud-provider/volume"
volerr "k8s.io/cloud-provider/volume/errors"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"github.com/gophercloud/gophercloud"
@ -344,7 +345,7 @@ func (os *OpenStack) AttachDisk(instanceID, volumeID string) (string, error) {
}
// using volume.AttachedDevice may cause problems because cinder does not report device path correctly see issue #33128
devicePath := volume.AttachedDevice
danglingErr := volumeutil.NewDanglingError(attachErr, nodeName, devicePath)
danglingErr := volerr.NewDanglingError(attachErr, nodeName, devicePath)
klog.V(2).Infof("Found dangling volume %s attached to node %s", volumeID, nodeName)
return "", danglingErr
}
@ -571,7 +572,7 @@ func (os *OpenStack) DeleteVolume(volumeID string) error {
}
if used {
msg := fmt.Sprintf("Cannot delete the volume %q, it's still attached to a node", volumeID)
return k8s_volume.NewDeletedVolumeInUseError(msg)
return volerr.NewDeletedVolumeInUseError(msg)
}
volumes, err := os.volumeService("")
@ -702,7 +703,7 @@ func (os *OpenStack) GetLabelsForVolume(ctx context.Context, pv *v1.PersistentVo
}
// Ignore any volumes that are being provisioned
if pv.Spec.Cinder.VolumeID == k8s_volume.ProvisionedVolumeName {
if pv.Spec.Cinder.VolumeID == cloudvolume.ProvisionedVolumeName {
return nil, nil
}

View File

@ -21,7 +21,6 @@ go_library(
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/util/node:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
@ -42,6 +41,7 @@ go_library(
"//staging/src/k8s.io/client-go/util/retry:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
@ -60,7 +60,6 @@ go_test(
"//pkg/controller/testutil:go_default_library",
"//pkg/kubelet/apis:go_default_library",
"//pkg/scheduler/api:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library",
@ -73,6 +72,7 @@ go_test(
"//staging/src/k8s.io/client-go/testing:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -38,9 +38,9 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
// PersistentVolumeLabelController handles adding labels to persistent volumes when they are created
@ -210,7 +210,7 @@ func (pvlc *PersistentVolumeLabelController) createPatch(vol *v1.PersistentVolum
if populateAffinity {
var values []string
if k == v1.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
zones, err := volumehelpers.LabelZonesToSet(v)
if err != nil {
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v)
}

View File

@ -29,7 +29,7 @@ import (
sets "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/kubernetes/fake"
core "k8s.io/client-go/testing"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake"
)
@ -368,7 +368,7 @@ func TestCreatePatch(t *testing.T) {
},
}
zones, _ := volumeutil.ZonesToSet("1,2,3")
zones, _ := volumehelpers.ZonesToSet("1,2,3")
testCases := map[string]struct {
vol v1.PersistentVolume
labels map[string]string
@ -416,12 +416,12 @@ func TestCreatePatch(t *testing.T) {
},
"cloudprovider multizone": {
vol: awsPV,
labels: map[string]string{v1.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
labels: map[string]string{v1.LabelZoneFailureDomain: volumehelpers.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPV,
},
"cloudprovider multizone pre-existing affinity non-conflicting": {
vol: awsPVWithAffinity,
labels: map[string]string{v1.LabelZoneFailureDomain: volumeutil.ZonesSetToLabelValue(zones)},
labels: map[string]string{v1.LabelZoneFailureDomain: volumehelpers.ZonesSetToLabelValue(zones)},
expectedAffinity: &expectedAffinityZonesMergedWithAWSPVWithAffinity,
},
}

View File

@ -59,6 +59,7 @@ go_library(
"//staging/src/k8s.io/client-go/tools/reference:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -38,6 +38,7 @@ import (
ref "k8s.io/client-go/tools/reference"
"k8s.io/client-go/util/workqueue"
cloudprovider "k8s.io/cloud-provider"
volerr "k8s.io/cloud-provider/volume/errors"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/controller/volume/events"
"k8s.io/kubernetes/pkg/controller/volume/persistentvolume/metrics"
@ -1177,7 +1178,7 @@ func (ctrl *PersistentVolumeController) deleteVolumeOperation(volume *v1.Persist
if err != nil {
// Delete failed, update the volume and emit an event.
klog.V(3).Infof("deletion of volume %q failed: %v", volume.Name, err)
if vol.IsDeletedVolumeInUse(err) {
if volerr.IsDeletedVolumeInUse(err) {
// The plugin needs more time, don't mark the volume as Failed
// and send Normal event only
ctrl.eventRecorder.Event(volume, v1.EventTypeNormal, events.VolumeDelete, err.Error())

View File

@ -40,6 +40,7 @@ go_library(
"//staging/src/k8s.io/client-go/listers/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/storage/v1:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -36,6 +36,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
corelisters "k8s.io/client-go/listers/core/v1"
storagelisters "k8s.io/client-go/listers/storage/v1"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
v1qos "k8s.io/kubernetes/pkg/apis/core/v1/helper/qos"
"k8s.io/kubernetes/pkg/features"
@ -691,7 +692,7 @@ func (c *VolumeZoneChecker) predicate(pod *v1.Pod, meta PredicateMetadata, nodeI
continue
}
nodeV, _ := nodeConstraints[k]
volumeVSet, err := volumeutil.LabelZonesToSet(v)
volumeVSet, err := volumehelpers.LabelZonesToSet(v)
if err != nil {
klog.Warningf("Failed to parse label for %q: %q. Ignoring the label. err=%v. ", k, v, err)
continue

View File

@ -30,6 +30,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/strings:go_default_library",
],

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
cloudprovider "k8s.io/cloud-provider"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
@ -132,7 +133,7 @@ func (util *AWSDiskUtil) CreateVolume(c *awsElasticBlockStoreProvisioner, node *
// returns volumeOptions for EBS based on storageclass parameters and node configuration
func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quantity, tags map[string]string, storageParams map[string]string, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, zonesWithNodes sets.String) (*aws.VolumeOptions, error) {
requestGiB, err := volumeutil.RoundUpToGiBInt(capacityGB)
requestGiB, err := volumehelpers.RoundUpToGiBInt(capacityGB)
if err != nil {
return nil, err
}
@ -157,7 +158,7 @@ func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quant
zone = v
case "zones":
zonesPresent = true
zones, err = volumeutil.ZonesToSet(v)
zones, err = volumehelpers.ZonesToSet(v)
if err != nil {
return nil, fmt.Errorf("error parsing zones %s, must be strings separated by commas: %v", zones, err)
}
@ -180,7 +181,7 @@ func populateVolumeOptions(pluginName, pvcName string, capacityGB resource.Quant
}
}
volumeOptions.AvailabilityZone, err = volumeutil.SelectZoneForVolume(zonePresent, zonesPresent, zone, zones, zonesWithNodes, node, allowedTopologies, pvcName)
volumeOptions.AvailabilityZone, err = volumehelpers.SelectZoneForVolume(zonePresent, zonesPresent, zone, zones, zonesWithNodes, node, allowedTopologies, pvcName)
if err != nil {
return nil, err
}

View File

@ -36,6 +36,7 @@ go_library(
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/compute/mgmt/2018-10-01/compute:go_default_library",
"//vendor/github.com/Azure/azure-sdk-for-go/services/storage/mgmt/2018-07-01/storage:go_default_library",
"//vendor/k8s.io/klog:go_default_library",

View File

@ -28,6 +28,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/cloudprovider/providers/azure"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/volume"
@ -134,7 +135,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
// maxLength = 79 - (4 for ".vhd") = 75
name := util.GenerateVolumeName(p.options.ClusterName, p.options.PVName, 75)
capacity := p.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
requestGiB, err := util.RoundUpToGiBInt(capacity)
requestGiB, err := volumehelpers.RoundUpToGiBInt(capacity)
if err != nil {
return nil, err
}
@ -162,7 +163,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
availabilityZone = v
case "zones":
zonesPresent = true
availabilityZones, err = util.ZonesToSet(v)
availabilityZones, err = volumehelpers.ZonesToSet(v)
if err != nil {
return nil, fmt.Errorf("error parsing zones %s, must be strings separated by commas: %v", v, err)
}
@ -224,7 +225,7 @@ func (p *azureDiskProvisioner) Provision(selectedNode *v1.Node, allowedTopologie
}
if availabilityZone != "" || availabilityZones.Len() != 0 || activeZones.Len() != 0 || len(allowedTopologies) != 0 {
selectedAvailabilityZone, err = util.SelectZoneForVolume(zonePresent, zonesPresent, availabilityZone, availabilityZones, activeZones, selectedNode, allowedTopologies, p.options.PVC.Name)
selectedAvailabilityZone, err = volumehelpers.SelectZoneForVolume(zonePresent, zonesPresent, availabilityZone, availabilityZones, activeZones, selectedNode, allowedTopologies, p.options.PVC.Name)
if err != nil {
return nil, err
}

View File

@ -32,6 +32,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/keymutex:go_default_library",

View File

@ -30,6 +30,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
clientset "k8s.io/client-go/kubernetes"
cloudvolumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/volume"
volutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/utils/exec"
@ -206,7 +207,7 @@ func (util *DiskUtil) CreateVolume(c *cinderVolumeProvisioner, node *v1.Node, al
// if we did not get any zones, lets leave it blank and gophercloud will
// use zone "nova" as default
if len(zones) > 0 {
availability, err = volutil.SelectZoneForVolume(false, false, "", nil, zones, node, allowedTopologies, c.options.PVC.Name)
availability, err = cloudvolumehelpers.SelectZoneForVolume(false, false, "", nil, zones, node, allowedTopologies, c.options.PVC.Name)
if err != nil {
klog.V(2).Infof("error selecting zone for volume: %v", err)
return "", 0, nil, "", err

View File

@ -31,6 +31,8 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/features:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
"//vendor/k8s.io/utils/exec:go_default_library",
"//vendor/k8s.io/utils/path:go_default_library",
@ -59,6 +61,8 @@ go_test(
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/fake:go_default_library",
"//staging/src/k8s.io/client-go/util/testing:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -24,9 +24,9 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
cloudvolume "k8s.io/cloud-provider/volume"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/apimachinery/pkg/types"
"k8s.io/klog"
@ -284,7 +284,7 @@ func createPVSpec(name string, readOnly bool, zones []string) *volume.Spec {
}
if zones != nil {
zonesLabel := strings.Join(zones, volumeutil.LabelMultiZoneDelimiter)
zonesLabel := strings.Join(zones, cloudvolume.LabelMultiZoneDelimiter)
spec.PersistentVolume.ObjectMeta.Labels = map[string]string{
v1.LabelZoneFailureDomain: zonesLabel,
}

View File

@ -29,6 +29,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/klog"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/features"
@ -544,7 +545,7 @@ func (c *gcePersistentDiskProvisioner) Provision(selectedNode *v1.Node, allowedT
pv.Labels[k] = v
var values []string
if k == v1.LabelZoneFailureDomain {
values, err = util.LabelZonesToList(v)
values, err = volumehelpers.LabelZonesToList(v)
if err != nil {
return nil, fmt.Errorf("failed to convert label string for Zone: %s to a List: %v", v, err)
}

View File

@ -29,11 +29,11 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/fake"
utiltesting "k8s.io/client-go/util/testing"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
func TestCanSupport(t *testing.T) {
@ -190,7 +190,7 @@ func TestPlugin(t *testing.T) {
}
cap := persistentSpec.Spec.Capacity[v1.ResourceStorage]
size := cap.Value()
if size != 100*util.GIB {
if size != 100*volumehelpers.GiB {
t.Errorf("Provision() returned unexpected volume size: %v", size)
}
@ -216,7 +216,7 @@ func TestPlugin(t *testing.T) {
if r == nil || r.Values[0] != "yes" || r.Operator != v1.NodeSelectorOpIn {
t.Errorf("NodeSelectorRequirement fakepdmanager-in-yes not found in volume NodeAffinity")
}
zones, _ := volumeutil.ZonesToSet("zone1,zone2")
zones, _ := volumehelpers.ZonesToSet("zone1,zone2")
r, _ = getNodeSelectorRequirementWithKey(v1.LabelZoneFailureDomain, term)
if r == nil {
t.Errorf("NodeSelectorRequirement %s-in-%v not found in volume NodeAffinity", v1.LabelZoneFailureDomain, zones)

View File

@ -29,6 +29,8 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
cloudprovider "k8s.io/cloud-provider"
cloudfeatures "k8s.io/cloud-provider/features"
cloudvolume "k8s.io/cloud-provider/volume"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/klog"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
"k8s.io/kubernetes/pkg/util/mount"
@ -99,7 +101,7 @@ func (util *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.
name := volumeutil.GenerateVolumeName(c.options.ClusterName, c.options.PVName, 63) // GCE PD name can have up to 63 characters
capacity := c.options.PVC.Spec.Resources.Requests[v1.ResourceName(v1.ResourceStorage)]
// GCE PDs are allocated in chunks of GiBs
requestGB := volumeutil.RoundUpToGiB(capacity)
requestGB := volumehelpers.RoundUpToGiB(capacity)
// Apply Parameters.
// Values for parameter "replication-type" are canonicalized to lower case.
@ -121,7 +123,7 @@ func (util *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.
configuredZone = v
case "zones":
zonesPresent = true
configuredZones, err = volumeutil.ZonesToSet(v)
configuredZones, err = volumehelpers.ZonesToSet(v)
if err != nil {
return "", 0, nil, "", err
}
@ -152,7 +154,7 @@ func (util *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.
switch replicationType {
case replicationTypeRegionalPD:
selectedZones, err := volumeutil.SelectZonesForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name, maxRegionalPDZones)
selectedZones, err := volumehelpers.SelectZonesForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name, maxRegionalPDZones)
if err != nil {
klog.V(2).Infof("Error selecting zones for regional GCE PD volume: %v", err)
return "", 0, nil, "", err
@ -169,7 +171,7 @@ func (util *GCEDiskUtil) CreateVolume(c *gcePersistentDiskProvisioner, node *v1.
klog.V(2).Infof("Successfully created Regional GCE PD volume %s", name)
case replicationTypeNone:
selectedZone, err := volumeutil.SelectZoneForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name)
selectedZone, err := volumehelpers.SelectZoneForVolume(zonePresent, zonesPresent, configuredZone, configuredZones, activezones, node, allowedTopologies, c.options.PVC.Name)
if err != nil {
return "", 0, nil, "", err
}
@ -354,7 +356,7 @@ func udevadmChangeToDrive(drivePath string) error {
func isRegionalPD(spec *volume.Spec) bool {
if spec.PersistentVolume != nil {
zonesLabel := spec.PersistentVolume.Labels[v1.LabelZoneFailureDomain]
zones := strings.Split(zonesLabel, volumeutil.LabelMultiZoneDelimiter)
zones := strings.Split(zonesLabel, cloudvolume.LabelMultiZoneDelimiter)
return len(zones) > 1
}
return false

View File

@ -188,12 +188,6 @@ type DeletableVolumePlugin interface {
NewDeleter(spec *Spec) (Deleter, error)
}
const (
// Name of a volume in external cloud that is being provisioned and thus
// should be ignored by rest of Kubernetes.
ProvisionedVolumeName = "placeholder-for-provisioning"
)
// ProvisionableVolumePlugin is an extended interface of VolumePlugin and is
// used to create volumes for the cluster.
type ProvisionableVolumePlugin interface {

View File

@ -9,7 +9,6 @@ go_library(
"device_util_linux.go",
"device_util_unsupported.go",
"doc.go",
"error.go",
"finalizer.go",
"io_util.go",
"metrics.go",

View File

@ -30,6 +30,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/errors:go_default_library",
"//staging/src/k8s.io/csi-translation-lib:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -29,6 +29,7 @@ import (
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
volerr "k8s.io/cloud-provider/volume/errors"
csilib "k8s.io/csi-translation-lib"
"k8s.io/klog"
expandcache "k8s.io/kubernetes/pkg/controller/volume/expand/cache"
@ -316,7 +317,7 @@ func (og *operationGenerator) GenerateAttachVolumeFunc(
volumeToAttach.VolumeSpec, volumeToAttach.NodeName)
if attachErr != nil {
if derr, ok := attachErr.(*util.DanglingAttachError); ok {
if derr, ok := attachErr.(*volerr.DanglingAttachError); ok {
addErr := actualStateOfWorld.MarkVolumeAsAttached(
v1.UniqueVolumeName(""),
volumeToAttach.VolumeSpec,

View File

@ -18,14 +18,11 @@ package util
import (
"fmt"
"hash/fnv"
"io/ioutil"
"math/rand"
"os"
"path"
"path/filepath"
"reflect"
"strconv"
"strings"
v1 "k8s.io/api/core/v1"
@ -73,21 +70,8 @@ const (
// VolumeDynamicallyCreatedByKey is the key of the annotation on PersistentVolume
// object created dynamically
VolumeDynamicallyCreatedByKey = "kubernetes.io/createdby"
// LabelMultiZoneDelimiter separates zones for volumes
LabelMultiZoneDelimiter = "__"
)
// VolumeZoneConfig contains config information about zonal volume.
type VolumeZoneConfig struct {
ZonePresent bool
ZonesPresent bool
ReplicaZoneFromNodePresent bool
Zone string
Zones string
ReplicaZoneFromNode string
}
// IsReady checks for the existence of a regular file
// called 'ready' in the given directory and returns
// true if that file exists.
@ -219,174 +203,6 @@ func LoadPodFromFile(filePath string) (*v1.Pod, error) {
return pod, nil
}
// SelectZoneForVolume is a wrapper around SelectZonesForVolume
// to select a single zone for a volume based on parameters
func SelectZoneForVolume(zoneParameterPresent, zonesParameterPresent bool, zoneParameter string, zonesParameter, zonesWithNodes sets.String, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, pvcName string) (string, error) {
zones, err := SelectZonesForVolume(zoneParameterPresent, zonesParameterPresent, zoneParameter, zonesParameter, zonesWithNodes, node, allowedTopologies, pvcName, 1)
if err != nil {
return "", err
}
zone, ok := zones.PopAny()
if !ok {
return "", fmt.Errorf("could not determine a zone to provision volume in")
}
return zone, nil
}
// SelectZonesForVolume selects zones for a volume based on several factors:
// node.zone, allowedTopologies, zone/zones parameters from storageclass,
// zones with active nodes from the cluster. The number of zones = replicas.
func SelectZonesForVolume(zoneParameterPresent, zonesParameterPresent bool, zoneParameter string, zonesParameter, zonesWithNodes sets.String, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, pvcName string, numReplicas uint32) (sets.String, error) {
if zoneParameterPresent && zonesParameterPresent {
return nil, fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time")
}
var zoneFromNode string
// pick one zone from node if present
if node != nil {
// VolumeScheduling implicit since node is not nil
if zoneParameterPresent || zonesParameterPresent {
return nil, fmt.Errorf("zone[s] cannot be specified in StorageClass if VolumeBindingMode is set to WaitForFirstConsumer. Please specify allowedTopologies in StorageClass for constraining zones")
}
// pick node's zone for one of the replicas
var ok bool
zoneFromNode, ok = node.ObjectMeta.Labels[v1.LabelZoneFailureDomain]
if !ok {
return nil, fmt.Errorf("%s Label for node missing", v1.LabelZoneFailureDomain)
}
// if single replica volume and node with zone found, return immediately
if numReplicas == 1 {
return sets.NewString(zoneFromNode), nil
}
}
// pick zone from allowedZones if specified
allowedZones, err := ZonesFromAllowedTopologies(allowedTopologies)
if err != nil {
return nil, err
}
if (len(allowedTopologies) > 0) && (allowedZones.Len() == 0) {
return nil, fmt.Errorf("no matchLabelExpressions with %s key found in allowedTopologies. Please specify matchLabelExpressions with %s key", v1.LabelZoneFailureDomain, v1.LabelZoneFailureDomain)
}
if allowedZones.Len() > 0 {
// VolumeScheduling implicit since allowedZones present
if zoneParameterPresent || zonesParameterPresent {
return nil, fmt.Errorf("zone[s] cannot be specified in StorageClass if allowedTopologies specified")
}
// scheduler will guarantee if node != null above, zoneFromNode is member of allowedZones.
// so if zoneFromNode != "", we can safely assume it is part of allowedZones.
zones, err := chooseZonesForVolumeIncludingZone(allowedZones, pvcName, zoneFromNode, numReplicas)
if err != nil {
return nil, fmt.Errorf("cannot process zones in allowedTopologies: %v", err)
}
return zones, nil
}
// pick zone from parameters if present
if zoneParameterPresent {
if numReplicas > 1 {
return nil, fmt.Errorf("zone cannot be specified if desired number of replicas for pv is greather than 1. Please specify zones or allowedTopologies to specify desired zones")
}
return sets.NewString(zoneParameter), nil
}
if zonesParameterPresent {
if uint32(zonesParameter.Len()) < numReplicas {
return nil, fmt.Errorf("not enough zones found in zones parameter to provision a volume with %d replicas. Found %d zones, need %d zones", numReplicas, zonesParameter.Len(), numReplicas)
}
// directly choose from zones parameter; no zone from node need to be considered
return ChooseZonesForVolume(zonesParameter, pvcName, numReplicas), nil
}
// pick zone from zones with nodes
if zonesWithNodes.Len() > 0 {
// If node != null (and thus zoneFromNode != ""), zoneFromNode will be member of zonesWithNodes
zones, err := chooseZonesForVolumeIncludingZone(zonesWithNodes, pvcName, zoneFromNode, numReplicas)
if err != nil {
return nil, fmt.Errorf("cannot process zones where nodes exist in the cluster: %v", err)
}
return zones, nil
}
return nil, fmt.Errorf("cannot determine zones to provision volume in")
}
// ZonesFromAllowedTopologies returns a list of zones specified in allowedTopologies
func ZonesFromAllowedTopologies(allowedTopologies []v1.TopologySelectorTerm) (sets.String, error) {
zones := make(sets.String)
for _, term := range allowedTopologies {
for _, exp := range term.MatchLabelExpressions {
if exp.Key == v1.LabelZoneFailureDomain {
for _, value := range exp.Values {
zones.Insert(value)
}
} else {
return nil, fmt.Errorf("unsupported key found in matchLabelExpressions: %s", exp.Key)
}
}
}
return zones, nil
}
// ZonesSetToLabelValue converts zones set to label value
func ZonesSetToLabelValue(strSet sets.String) string {
return strings.Join(strSet.UnsortedList(), LabelMultiZoneDelimiter)
}
// ZonesToSet converts a string containing a comma separated list of zones to set
func ZonesToSet(zonesString string) (sets.String, error) {
zones, err := stringToSet(zonesString, ",")
if err != nil {
return nil, fmt.Errorf("error parsing zones %s, must be strings separated by commas: %v", zonesString, err)
}
return zones, nil
}
// LabelZonesToSet converts a PV label value from string containing a delimited list of zones to set
func LabelZonesToSet(labelZonesValue string) (sets.String, error) {
return stringToSet(labelZonesValue, LabelMultiZoneDelimiter)
}
// StringToSet converts a string containing list separated by specified delimiter to a set
func stringToSet(str, delimiter string) (sets.String, error) {
zonesSlice := strings.Split(str, delimiter)
zonesSet := make(sets.String)
for _, zone := range zonesSlice {
trimmedZone := strings.TrimSpace(zone)
if trimmedZone == "" {
return make(sets.String), fmt.Errorf(
"%q separated list (%q) must not contain an empty string",
delimiter,
str)
}
zonesSet.Insert(trimmedZone)
}
return zonesSet, nil
}
// LabelZonesToList converts a PV label value from string containing a delimited list of zones to list
func LabelZonesToList(labelZonesValue string) ([]string, error) {
return stringToList(labelZonesValue, LabelMultiZoneDelimiter)
}
// StringToList converts a string containing list separated by specified delimiter to a list
func stringToList(str, delimiter string) ([]string, error) {
zonesSlice := make([]string, 0)
for _, zone := range strings.Split(str, delimiter) {
trimmedZone := strings.TrimSpace(zone)
if trimmedZone == "" {
return nil, fmt.Errorf(
"%q separated list (%q) must not contain an empty string",
delimiter,
str)
}
zonesSlice = append(zonesSlice, trimmedZone)
}
return zonesSlice, nil
}
// CalculateTimeoutForVolume calculates time for a Recycler pod to complete a
// recycle operation. The calculation and return value is either the
// minimumTimeout or the timeoutIncrement per Gi of storage size, whichever is
@ -479,148 +295,6 @@ func GetPath(mounter volume.Mounter) (string, error) {
return path, nil
}
// ChooseZoneForVolume implements our heuristics for choosing a zone for volume creation based on the volume name
// Volumes are generally round-robin-ed across all active zones, using the hash of the PVC Name.
// However, if the PVCName ends with `-<integer>`, we will hash the prefix, and then add the integer to the hash.
// This means that a StatefulSet's volumes (`claimname-statefulsetname-id`) will spread across available zones,
// assuming the id values are consecutive.
func ChooseZoneForVolume(zones sets.String, pvcName string) string {
// No zones available, return empty string.
if zones.Len() == 0 {
return ""
}
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
hash, index := getPVCNameHashAndIndexOffset(pvcName)
// Zones.List returns zones in a consistent order (sorted)
// We do have a potential failure case where volumes will not be properly spread,
// if the set of zones changes during StatefulSet volume creation. However, this is
// probably relatively unlikely because we expect the set of zones to be essentially
// static for clusters.
// Hopefully we can address this problem if/when we do full scheduler integration of
// PVC placement (which could also e.g. avoid putting volumes in overloaded or
// unhealthy zones)
zoneSlice := zones.List()
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
klog.V(2).Infof("Creating volume for PVC %q; chose zone=%q from zones=%q", pvcName, zone, zoneSlice)
return zone
}
// chooseZonesForVolumeIncludingZone is a wrapper around ChooseZonesForVolume that ensures zoneToInclude is chosen
// zoneToInclude can either be empty in which case it is ignored. If non-empty, zoneToInclude is expected to be member of zones.
// numReplicas is expected to be > 0 and <= zones.Len()
func chooseZonesForVolumeIncludingZone(zones sets.String, pvcName, zoneToInclude string, numReplicas uint32) (sets.String, error) {
if numReplicas == 0 {
return nil, fmt.Errorf("invalid number of replicas passed")
}
if uint32(zones.Len()) < numReplicas {
return nil, fmt.Errorf("not enough zones found to provision a volume with %d replicas. Need at least %d distinct zones for a volume with %d replicas", numReplicas, numReplicas, numReplicas)
}
if zoneToInclude != "" && !zones.Has(zoneToInclude) {
return nil, fmt.Errorf("zone to be included: %s needs to be member of set: %v", zoneToInclude, zones)
}
if uint32(zones.Len()) == numReplicas {
return zones, nil
}
if zoneToInclude != "" {
zones.Delete(zoneToInclude)
numReplicas = numReplicas - 1
}
zonesChosen := ChooseZonesForVolume(zones, pvcName, numReplicas)
if zoneToInclude != "" {
zonesChosen.Insert(zoneToInclude)
}
return zonesChosen, nil
}
// ChooseZonesForVolume is identical to ChooseZoneForVolume, but selects a multiple zones, for multi-zone disks.
func ChooseZonesForVolume(zones sets.String, pvcName string, numZones uint32) sets.String {
// No zones available, return empty set.
replicaZones := sets.NewString()
if zones.Len() == 0 {
return replicaZones
}
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
hash, index := getPVCNameHashAndIndexOffset(pvcName)
// Zones.List returns zones in a consistent order (sorted)
// We do have a potential failure case where volumes will not be properly spread,
// if the set of zones changes during StatefulSet volume creation. However, this is
// probably relatively unlikely because we expect the set of zones to be essentially
// static for clusters.
// Hopefully we can address this problem if/when we do full scheduler integration of
// PVC placement (which could also e.g. avoid putting volumes in overloaded or
// unhealthy zones)
zoneSlice := zones.List()
startingIndex := index * numZones
for index = startingIndex; index < startingIndex+numZones; index++ {
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
replicaZones.Insert(zone)
}
klog.V(2).Infof("Creating volume for replicated PVC %q; chosen zones=%q from zones=%q",
pvcName, replicaZones.UnsortedList(), zoneSlice)
return replicaZones
}
func getPVCNameHashAndIndexOffset(pvcName string) (hash uint32, index uint32) {
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
klog.Warningf("No name defined during volume create; choosing random zone")
hash = rand.Uint32()
} else {
hashString := pvcName
// Heuristic to make sure that volumes in a StatefulSet are spread across zones
// StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
// where Id is an integer index.
// Note though that if a StatefulSet pod has multiple claims, we need them to be
// in the same zone, because otherwise the pod will be unable to mount both volumes,
// and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
// it looks like `ClaimName-StatefulSetName-Id`.
// We continue to round-robin volume names that look like `Name-Id` also; this is a useful
// feature for users that are creating statefulset-like functionality without using statefulsets.
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
statefulsetIDString := pvcName[lastDash+1:]
statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
if err == nil {
// Offset by the statefulsetID, so we round-robin across zones
index = uint32(statefulsetID)
// We still hash the volume name, but only the prefix
hashString = pvcName[:lastDash]
// In the special case where it looks like `ClaimName-StatefulSetName-Id`,
// hash only the StatefulSetName, so that different claims on the same StatefulSet
// member end up in the same zone.
// Note that StatefulSetName (and ClaimName) might themselves both have dashes.
// We actually just take the portion after the final - of ClaimName-StatefulSetName.
// For our purposes it doesn't much matter (just suboptimal spreading).
lastDash := strings.LastIndexByte(hashString, '-')
if lastDash != -1 {
hashString = hashString[lastDash+1:]
}
klog.V(2).Infof("Detected StatefulSet-style volume name %q; index=%d", pvcName, index)
}
}
// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}
return hash, index
}
// UnmountViaEmptyDir delegates the tear down operation for secret, configmap, git_repo and downwardapi
// to empty_dir
func UnmountViaEmptyDir(dir string, host volume.VolumeHost, volName string, volSpec volume.Spec, podUID utypes.UID) error {
@ -669,16 +343,6 @@ func JoinMountOptions(userOptions []string, systemOptions []string) []string {
return allMountOptions.List()
}
// ValidateZone returns:
// - an error in case zone is an empty string or contains only any combination of spaces and tab characters
// - nil otherwise
func ValidateZone(zone string) error {
if strings.TrimSpace(zone) == "" {
return fmt.Errorf("the provided %q zone is not valid, it's an empty string or contains only spaces and tab characters", zone)
}
return nil
}
// AccessModesContains returns whether the requested mode is contained by modes
func AccessModesContains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool {
for _, m := range modes {

File diff suppressed because it is too large Load Diff

View File

@ -260,28 +260,3 @@ type DeviceUnmounter interface {
// unmounted.
UnmountDevice(deviceMountPath string) error
}
// NewDeletedVolumeInUseError returns a new instance of DeletedVolumeInUseError
// error.
func NewDeletedVolumeInUseError(message string) error {
return deletedVolumeInUseError(message)
}
type deletedVolumeInUseError string
var _ error = deletedVolumeInUseError("")
// IsDeletedVolumeInUse returns true if an error returned from Delete() is
// deletedVolumeInUseError
func IsDeletedVolumeInUse(err error) bool {
switch err.(type) {
case deletedVolumeInUseError:
return true
default:
return false
}
}
func (err deletedVolumeInUseError) Error() string {
return string(err)
}

View File

@ -17,11 +17,11 @@ go_library(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/kubeapiserver/admission:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/admission:go_default_library",
"//staging/src/k8s.io/cloud-provider:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)

View File

@ -27,12 +27,12 @@ import (
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/admission"
cloudprovider "k8s.io/cloud-provider"
cloudvolume "k8s.io/cloud-provider/volume"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
"k8s.io/klog"
api "k8s.io/kubernetes/pkg/apis/core"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
kubeapiserveradmission "k8s.io/kubernetes/pkg/kubeapiserver/admission"
vol "k8s.io/kubernetes/pkg/volume"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
)
const (
@ -153,7 +153,7 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
// Set NodeSelectorRequirements based on the labels
var values []string
if k == v1.LabelZoneFailureDomain {
zones, err := volumeutil.LabelZonesToSet(v)
zones, err := volumehelpers.LabelZonesToSet(v)
if err != nil {
return admission.NewForbidden(a, fmt.Errorf("failed to convert label string for Zone: %s to a Set", v))
}
@ -192,7 +192,7 @@ func (l *persistentVolumeLabel) Admit(a admission.Attributes) (err error) {
func (l *persistentVolumeLabel) findAWSEBSLabels(volume *api.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.AWSElasticBlockStore.VolumeID == vol.ProvisionedVolumeName {
if volume.Spec.AWSElasticBlockStore.VolumeID == cloudvolume.ProvisionedVolumeName {
return nil, nil
}
pvlabler, err := l.getAWSPVLabeler()
@ -240,7 +240,7 @@ func (l *persistentVolumeLabel) getAWSPVLabeler() (cloudprovider.PVLabeler, erro
func (l *persistentVolumeLabel) findGCEPDLabels(volume *api.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.GCEPersistentDisk.PDName == vol.ProvisionedVolumeName {
if volume.Spec.GCEPersistentDisk.PDName == cloudvolume.ProvisionedVolumeName {
return nil, nil
}
@ -315,7 +315,7 @@ func (l *persistentVolumeLabel) getAzurePVLabeler() (cloudprovider.PVLabeler, er
func (l *persistentVolumeLabel) findAzureDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.AzureDisk.DiskName == vol.ProvisionedVolumeName {
if volume.Spec.AzureDisk.DiskName == cloudvolume.ProvisionedVolumeName {
return nil, nil
}
@ -364,7 +364,7 @@ func (l *persistentVolumeLabel) getOpenStackPVLabeler() (cloudprovider.PVLabeler
func (l *persistentVolumeLabel) findCinderDiskLabels(volume *api.PersistentVolume) (map[string]string, error) {
// Ignore any volumes that are being provisioned
if volume.Spec.Cinder.VolumeID == vol.ProvisionedVolumeName {
if volume.Spec.Cinder.VolumeID == cloudvolume.ProvisionedVolumeName {
return nil, nil
}

View File

@ -190,6 +190,7 @@
- k8s.io/apimachinery
- k8s.io/apiserver
- k8s.io/client-go
- k8s.io/cloud-provider
- k8s.io/klog
- k8s.io/utils
@ -213,4 +214,5 @@
- k8s.io/api
- k8s.io/apimachinery
- k8s.io/klog
- k8s.io/cloud-provider
- k8s.io/csi-translation-lib

View File

@ -989,3 +989,5 @@ rules:
branch: master
- repository: apimachinery
branch: master
- repository: cloud-provider
branch: master

View File

@ -38,6 +38,7 @@ filegroup(
"//staging/src/k8s.io/cloud-provider/features:all-srcs",
"//staging/src/k8s.io/cloud-provider/node:all-srcs",
"//staging/src/k8s.io/cloud-provider/service/helpers:all-srcs",
"//staging/src/k8s.io/cloud-provider/volume:all-srcs",
],
tags = ["automanaged"],
)

View File

@ -0,0 +1,27 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["constants.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/volume",
importpath = "k8s.io/cloud-provider/volume",
visibility = ["//visibility:public"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//staging/src/k8s.io/cloud-provider/volume/errors:all-srcs",
"//staging/src/k8s.io/cloud-provider/volume/helpers:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,26 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package volume
const (
// ProvisionedVolumeName is the name of a volume in an external cloud
// that is being provisioned and thus should be ignored by rest of Kubernetes.
ProvisionedVolumeName = "placeholder-for-provisioning"
// LabelMultiZoneDelimiter separates zones for volumes
LabelMultiZoneDelimiter = "__"
)

View File

@ -0,0 +1,24 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["errors.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/volume/errors",
importpath = "k8s.io/cloud-provider/volume/errors",
visibility = ["//visibility:public"],
deps = ["//staging/src/k8s.io/apimachinery/pkg/types:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -14,12 +14,37 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package util
package errors
import (
k8stypes "k8s.io/apimachinery/pkg/types"
)
// NewDeletedVolumeInUseError returns a new instance of DeletedVolumeInUseError
// error.
func NewDeletedVolumeInUseError(message string) error {
return deletedVolumeInUseError(message)
}
type deletedVolumeInUseError string
var _ error = deletedVolumeInUseError("")
// IsDeletedVolumeInUse returns true if an error returned from Delete() is
// deletedVolumeInUseError
func IsDeletedVolumeInUse(err error) bool {
switch err.(type) {
case deletedVolumeInUseError:
return true
default:
return false
}
}
func (err deletedVolumeInUseError) Error() string {
return string(err)
}
// DanglingAttachError indicates volume is attached to a different node
// than we expected.
type DanglingAttachError struct {

View File

@ -0,0 +1,39 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["zones.go"],
importmap = "k8s.io/kubernetes/vendor/k8s.io/cloud-provider/volume/helpers",
importpath = "k8s.io/cloud-provider/volume/helpers",
visibility = ["//visibility:public"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["zones_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,310 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package helpers
import (
"fmt"
"hash/fnv"
"math/rand"
"strconv"
"strings"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudvolume "k8s.io/cloud-provider/volume"
"k8s.io/klog"
)
// LabelZonesToSet converts a PV label value from string containing a delimited list of zones to set
func LabelZonesToSet(labelZonesValue string) (sets.String, error) {
return stringToSet(labelZonesValue, cloudvolume.LabelMultiZoneDelimiter)
}
// ZonesSetToLabelValue converts zones set to label value
func ZonesSetToLabelValue(strSet sets.String) string {
return strings.Join(strSet.UnsortedList(), cloudvolume.LabelMultiZoneDelimiter)
}
// ZonesToSet converts a string containing a comma separated list of zones to set
func ZonesToSet(zonesString string) (sets.String, error) {
zones, err := stringToSet(zonesString, ",")
if err != nil {
return nil, fmt.Errorf("error parsing zones %s, must be strings separated by commas: %v", zonesString, err)
}
return zones, nil
}
// StringToSet converts a string containing list separated by specified delimiter to a set
func stringToSet(str, delimiter string) (sets.String, error) {
zonesSlice := strings.Split(str, delimiter)
zonesSet := make(sets.String)
for _, zone := range zonesSlice {
trimmedZone := strings.TrimSpace(zone)
if trimmedZone == "" {
return make(sets.String), fmt.Errorf(
"%q separated list (%q) must not contain an empty string",
delimiter,
str)
}
zonesSet.Insert(trimmedZone)
}
return zonesSet, nil
}
// LabelZonesToList converts a PV label value from string containing a delimited list of zones to list
func LabelZonesToList(labelZonesValue string) ([]string, error) {
return stringToList(labelZonesValue, cloudvolume.LabelMultiZoneDelimiter)
}
// StringToList converts a string containing list separated by specified delimiter to a list
func stringToList(str, delimiter string) ([]string, error) {
zonesSlice := make([]string, 0)
for _, zone := range strings.Split(str, delimiter) {
trimmedZone := strings.TrimSpace(zone)
if trimmedZone == "" {
return nil, fmt.Errorf(
"%q separated list (%q) must not contain an empty string",
delimiter,
str)
}
zonesSlice = append(zonesSlice, trimmedZone)
}
return zonesSlice, nil
}
// SelectZoneForVolume is a wrapper around SelectZonesForVolume
// to select a single zone for a volume based on parameters
func SelectZoneForVolume(zoneParameterPresent, zonesParameterPresent bool, zoneParameter string, zonesParameter, zonesWithNodes sets.String, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, pvcName string) (string, error) {
zones, err := SelectZonesForVolume(zoneParameterPresent, zonesParameterPresent, zoneParameter, zonesParameter, zonesWithNodes, node, allowedTopologies, pvcName, 1)
if err != nil {
return "", err
}
zone, ok := zones.PopAny()
if !ok {
return "", fmt.Errorf("could not determine a zone to provision volume in")
}
return zone, nil
}
// SelectZonesForVolume selects zones for a volume based on several factors:
// node.zone, allowedTopologies, zone/zones parameters from storageclass,
// zones with active nodes from the cluster. The number of zones = replicas.
func SelectZonesForVolume(zoneParameterPresent, zonesParameterPresent bool, zoneParameter string, zonesParameter, zonesWithNodes sets.String, node *v1.Node, allowedTopologies []v1.TopologySelectorTerm, pvcName string, numReplicas uint32) (sets.String, error) {
if zoneParameterPresent && zonesParameterPresent {
return nil, fmt.Errorf("both zone and zones StorageClass parameters must not be used at the same time")
}
var zoneFromNode string
// pick one zone from node if present
if node != nil {
// VolumeScheduling implicit since node is not nil
if zoneParameterPresent || zonesParameterPresent {
return nil, fmt.Errorf("zone[s] cannot be specified in StorageClass if VolumeBindingMode is set to WaitForFirstConsumer. Please specify allowedTopologies in StorageClass for constraining zones")
}
// pick node's zone for one of the replicas
var ok bool
zoneFromNode, ok = node.ObjectMeta.Labels[v1.LabelZoneFailureDomain]
if !ok {
return nil, fmt.Errorf("%s Label for node missing", v1.LabelZoneFailureDomain)
}
// if single replica volume and node with zone found, return immediately
if numReplicas == 1 {
return sets.NewString(zoneFromNode), nil
}
}
// pick zone from allowedZones if specified
allowedZones, err := ZonesFromAllowedTopologies(allowedTopologies)
if err != nil {
return nil, err
}
if (len(allowedTopologies) > 0) && (allowedZones.Len() == 0) {
return nil, fmt.Errorf("no matchLabelExpressions with %s key found in allowedTopologies. Please specify matchLabelExpressions with %s key", v1.LabelZoneFailureDomain, v1.LabelZoneFailureDomain)
}
if allowedZones.Len() > 0 {
// VolumeScheduling implicit since allowedZones present
if zoneParameterPresent || zonesParameterPresent {
return nil, fmt.Errorf("zone[s] cannot be specified in StorageClass if allowedTopologies specified")
}
// scheduler will guarantee if node != null above, zoneFromNode is member of allowedZones.
// so if zoneFromNode != "", we can safely assume it is part of allowedZones.
zones, err := chooseZonesForVolumeIncludingZone(allowedZones, pvcName, zoneFromNode, numReplicas)
if err != nil {
return nil, fmt.Errorf("cannot process zones in allowedTopologies: %v", err)
}
return zones, nil
}
// pick zone from parameters if present
if zoneParameterPresent {
if numReplicas > 1 {
return nil, fmt.Errorf("zone cannot be specified if desired number of replicas for pv is greather than 1. Please specify zones or allowedTopologies to specify desired zones")
}
return sets.NewString(zoneParameter), nil
}
if zonesParameterPresent {
if uint32(zonesParameter.Len()) < numReplicas {
return nil, fmt.Errorf("not enough zones found in zones parameter to provision a volume with %d replicas. Found %d zones, need %d zones", numReplicas, zonesParameter.Len(), numReplicas)
}
// directly choose from zones parameter; no zone from node need to be considered
return ChooseZonesForVolume(zonesParameter, pvcName, numReplicas), nil
}
// pick zone from zones with nodes
if zonesWithNodes.Len() > 0 {
// If node != null (and thus zoneFromNode != ""), zoneFromNode will be member of zonesWithNodes
zones, err := chooseZonesForVolumeIncludingZone(zonesWithNodes, pvcName, zoneFromNode, numReplicas)
if err != nil {
return nil, fmt.Errorf("cannot process zones where nodes exist in the cluster: %v", err)
}
return zones, nil
}
return nil, fmt.Errorf("cannot determine zones to provision volume in")
}
// ZonesFromAllowedTopologies returns a list of zones specified in allowedTopologies
func ZonesFromAllowedTopologies(allowedTopologies []v1.TopologySelectorTerm) (sets.String, error) {
zones := make(sets.String)
for _, term := range allowedTopologies {
for _, exp := range term.MatchLabelExpressions {
if exp.Key == v1.LabelZoneFailureDomain {
for _, value := range exp.Values {
zones.Insert(value)
}
} else {
return nil, fmt.Errorf("unsupported key found in matchLabelExpressions: %s", exp.Key)
}
}
}
return zones, nil
}
// chooseZonesForVolumeIncludingZone is a wrapper around ChooseZonesForVolume that ensures zoneToInclude is chosen
// zoneToInclude can either be empty in which case it is ignored. If non-empty, zoneToInclude is expected to be member of zones.
// numReplicas is expected to be > 0 and <= zones.Len()
func chooseZonesForVolumeIncludingZone(zones sets.String, pvcName, zoneToInclude string, numReplicas uint32) (sets.String, error) {
if numReplicas == 0 {
return nil, fmt.Errorf("invalid number of replicas passed")
}
if uint32(zones.Len()) < numReplicas {
return nil, fmt.Errorf("not enough zones found to provision a volume with %d replicas. Need at least %d distinct zones for a volume with %d replicas", numReplicas, numReplicas, numReplicas)
}
if zoneToInclude != "" && !zones.Has(zoneToInclude) {
return nil, fmt.Errorf("zone to be included: %s needs to be member of set: %v", zoneToInclude, zones)
}
if uint32(zones.Len()) == numReplicas {
return zones, nil
}
if zoneToInclude != "" {
zones.Delete(zoneToInclude)
numReplicas = numReplicas - 1
}
zonesChosen := ChooseZonesForVolume(zones, pvcName, numReplicas)
if zoneToInclude != "" {
zonesChosen.Insert(zoneToInclude)
}
return zonesChosen, nil
}
// ChooseZonesForVolume is identical to ChooseZoneForVolume, but selects a multiple zones, for multi-zone disks.
func ChooseZonesForVolume(zones sets.String, pvcName string, numZones uint32) sets.String {
// No zones available, return empty set.
replicaZones := sets.NewString()
if zones.Len() == 0 {
return replicaZones
}
// We create the volume in a zone determined by the name
// Eventually the scheduler will coordinate placement into an available zone
hash, index := getPVCNameHashAndIndexOffset(pvcName)
// Zones.List returns zones in a consistent order (sorted)
// We do have a potential failure case where volumes will not be properly spread,
// if the set of zones changes during StatefulSet volume creation. However, this is
// probably relatively unlikely because we expect the set of zones to be essentially
// static for clusters.
// Hopefully we can address this problem if/when we do full scheduler integration of
// PVC placement (which could also e.g. avoid putting volumes in overloaded or
// unhealthy zones)
zoneSlice := zones.List()
startingIndex := index * numZones
for index = startingIndex; index < startingIndex+numZones; index++ {
zone := zoneSlice[(hash+index)%uint32(len(zoneSlice))]
replicaZones.Insert(zone)
}
klog.V(2).Infof("Creating volume for replicated PVC %q; chosen zones=%q from zones=%q",
pvcName, replicaZones.UnsortedList(), zoneSlice)
return replicaZones
}
func getPVCNameHashAndIndexOffset(pvcName string) (hash uint32, index uint32) {
if pvcName == "" {
// We should always be called with a name; this shouldn't happen
klog.Warningf("No name defined during volume create; choosing random zone")
hash = rand.Uint32()
} else {
hashString := pvcName
// Heuristic to make sure that volumes in a StatefulSet are spread across zones
// StatefulSet PVCs are (currently) named ClaimName-StatefulSetName-Id,
// where Id is an integer index.
// Note though that if a StatefulSet pod has multiple claims, we need them to be
// in the same zone, because otherwise the pod will be unable to mount both volumes,
// and will be unschedulable. So we hash _only_ the "StatefulSetName" portion when
// it looks like `ClaimName-StatefulSetName-Id`.
// We continue to round-robin volume names that look like `Name-Id` also; this is a useful
// feature for users that are creating statefulset-like functionality without using statefulsets.
lastDash := strings.LastIndexByte(pvcName, '-')
if lastDash != -1 {
statefulsetIDString := pvcName[lastDash+1:]
statefulsetID, err := strconv.ParseUint(statefulsetIDString, 10, 32)
if err == nil {
// Offset by the statefulsetID, so we round-robin across zones
index = uint32(statefulsetID)
// We still hash the volume name, but only the prefix
hashString = pvcName[:lastDash]
// In the special case where it looks like `ClaimName-StatefulSetName-Id`,
// hash only the StatefulSetName, so that different claims on the same StatefulSet
// member end up in the same zone.
// Note that StatefulSetName (and ClaimName) might themselves both have dashes.
// We actually just take the portion after the final - of ClaimName-StatefulSetName.
// For our purposes it doesn't much matter (just suboptimal spreading).
lastDash := strings.LastIndexByte(hashString, '-')
if lastDash != -1 {
hashString = hashString[lastDash+1:]
}
klog.V(2).Infof("Detected StatefulSet-style volume name %q; index=%d", pvcName, index)
}
}
// We hash the (base) volume name, so we don't bias towards the first N zones
h := fnv.New32()
h.Write([]byte(hashString))
hash = h.Sum32()
}
return hash, index
}

File diff suppressed because it is too large Load Diff

View File

@ -142,6 +142,10 @@
"ImportPath": "k8s.io/apimachinery/third_party/forked/golang/reflect",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/cloud-provider/volume",
"Rev": "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx"
},
{
"ImportPath": "k8s.io/klog",
"Rev": "8139d8cb77af419532b33dfa7dd09fbc5f1d344f"

View File

@ -14,6 +14,7 @@ go_library(
deps = [
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume:go_default_library",
],
)

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/sets"
cloudvolume "k8s.io/cloud-provider/volume"
)
const (
@ -39,10 +40,6 @@ const (
volIDDiskNameValue = 5
volIDTotalElements = 6
// LabelZoneFailureDomain is the label on PVs indicating the zone they are provisioned in
LabelZoneFailureDomain = "failure-domain.beta.kubernetes.io/zone"
// LabelMultiZoneDelimiter separates zones for RePD volumes
LabelMultiZoneDelimiter = "__"
// UnspecifiedValue is used for an unknown zone string
UnspecifiedValue = "UNSPECIFIED"
)
@ -72,8 +69,8 @@ func (g *gcePersistentDiskCSITranslator) TranslateInTreePVToCSI(pv *v1.Persisten
return nil, fmt.Errorf("pv is nil or GCE Persistent Disk source not defined on pv")
}
zonesLabel := pv.Labels[LabelZoneFailureDomain]
zones := strings.Split(zonesLabel, LabelMultiZoneDelimiter)
zonesLabel := pv.Labels[v1.LabelZoneFailureDomain]
zones := strings.Split(zonesLabel, cloudvolume.LabelMultiZoneDelimiter)
if len(zones) == 1 && len(zones[0]) != 0 {
// Zonal
volID = fmt.Sprintf(volIDZonalFmt, UnspecifiedValue, zones[0], pv.Spec.GCEPersistentDisk.PDName)

View File

@ -63,6 +63,7 @@ go_library(
"//staging/src/k8s.io/apiserver/pkg/authentication/serviceaccount:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/cloud-provider/volume/helpers:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/apis/csi/v1alpha1:go_default_library",
"//staging/src/k8s.io/csi-api/pkg/client/clientset/versioned:go_default_library",
"//test/e2e/framework:go_default_library",

View File

@ -37,8 +37,8 @@ import (
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
clientset "k8s.io/client-go/kubernetes"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/storage/testsuites"
"k8s.io/kubernetes/test/e2e/storage/utils"
@ -583,7 +583,7 @@ func waitForStatefulSetReplicasNotReady(statefulSetName, ns string, c clientset.
// If match is true, check if zones in PV exactly match zones given.
// Otherwise, check whether zones in PV is superset of zones given.
func verifyZonesInPV(volume *v1.PersistentVolume, zones sets.String, match bool) error {
pvZones, err := util.LabelZonesToSet(volume.Labels[v1.LabelZoneFailureDomain])
pvZones, err := volumehelpers.LabelZonesToSet(volume.Labels[v1.LabelZoneFailureDomain])
if err != nil {
return err
}

View File

@ -43,8 +43,8 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/authentication/serviceaccount"
clientset "k8s.io/client-go/kubernetes"
volumehelpers "k8s.io/cloud-provider/volume/helpers"
storageutil "k8s.io/kubernetes/pkg/apis/storage/v1/util"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/test/e2e/framework"
"k8s.io/kubernetes/test/e2e/framework/providers/gce"
"k8s.io/kubernetes/test/e2e/storage/testsuites"
@ -75,7 +75,7 @@ func checkZonesFromLabelAndAffinity(pv *v1.PersistentVolume, zones sets.String,
framework.Failf("label %s not found on PV", v1.LabelZoneFailureDomain)
}
zonesFromLabel, err := volumeutil.LabelZonesToSet(pvLabel)
zonesFromLabel, err := volumehelpers.LabelZonesToSet(pvLabel)
if err != nil {
framework.Failf("unable to parse zone labels %s: %v", pvLabel, err)
}