Merge pull request #44897 from msau42/local-storage-plugin

Automatic merge from submit-queue (batch tested with PRs 46076, 43879, 44897, 46556, 46654)

Local storage plugin

**What this PR does / why we need it**:
Volume plugin implementation for local persistent volumes.  Scheduler predicate will direct already-bound PVCs to the node that the local PV is at.  PVC binding still happens independently.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: 
Part of #43640

**Release note**:

```
Alpha feature: Local volume plugin allows local directories to be created and consumed as a Persistent Volume.  These volumes have node affinity and pods will only be scheduled to the node that the volume is at.
```
pull/6/head
Kubernetes Submit Queue 2017-05-30 23:20:02 -07:00 committed by GitHub
commit 0aad9d30e3
32 changed files with 1365 additions and 18 deletions

View File

@ -84,6 +84,7 @@ go_library(
"//pkg/volume/gce_pd:go_default_library",
"//pkg/volume/glusterfs:go_default_library",
"//pkg/volume/host_path:go_default_library",
"//pkg/volume/local:go_default_library",
"//pkg/volume/nfs:go_default_library",
"//pkg/volume/photon_pd:go_default_library",
"//pkg/volume/portworx:go_default_library",

View File

@ -47,6 +47,7 @@ import (
"k8s.io/kubernetes/pkg/volume/gce_pd"
"k8s.io/kubernetes/pkg/volume/glusterfs"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/local"
"k8s.io/kubernetes/pkg/volume/nfs"
"k8s.io/kubernetes/pkg/volume/photon_pd"
"k8s.io/kubernetes/pkg/volume/portworx"
@ -121,6 +122,7 @@ func ProbeControllerVolumePlugins(cloud cloudprovider.Interface, config componen
allPlugins = append(allPlugins, flocker.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
if cloud != nil {
switch {

View File

@ -92,6 +92,7 @@ go_library(
"//pkg/volume/glusterfs:go_default_library",
"//pkg/volume/host_path:go_default_library",
"//pkg/volume/iscsi:go_default_library",
"//pkg/volume/local:go_default_library",
"//pkg/volume/nfs:go_default_library",
"//pkg/volume/photon_pd:go_default_library",
"//pkg/volume/portworx:go_default_library",

View File

@ -45,6 +45,7 @@ import (
"k8s.io/kubernetes/pkg/volume/glusterfs"
"k8s.io/kubernetes/pkg/volume/host_path"
"k8s.io/kubernetes/pkg/volume/iscsi"
"k8s.io/kubernetes/pkg/volume/local"
"k8s.io/kubernetes/pkg/volume/nfs"
"k8s.io/kubernetes/pkg/volume/photon_pd"
"k8s.io/kubernetes/pkg/volume/portworx"
@ -95,6 +96,7 @@ func ProbeVolumePlugins(pluginDir string) []volume.VolumePlugin {
allPlugins = append(allPlugins, projected.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, portworx.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, scaleio.ProbeVolumePlugins()...)
allPlugins = append(allPlugins, local.ProbeVolumePlugins()...)
return allPlugins
}

View File

@ -590,6 +590,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*api.N
// Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes
// TODO: update when storage node affinity graduates to beta
func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *api.NodeAffinity) error {
if affinity == nil {
return nil
}
json, err := json.Marshal(*affinity)
if err != nil {
return err

View File

@ -517,6 +517,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*v1.No
// Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes
// TODO: update when storage node affinity graduates to beta
func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *v1.NodeAffinity) error {
if affinity == nil {
return nil
}
json, err := json.Marshal(*affinity)
if err != nil {
return err

View File

@ -565,3 +565,7 @@ func (adc *attachDetachController) addNodeToDswp(node *v1.Node, nodeName types.N
adc.desiredStateOfWorld.AddNode(nodeName, keepTerminatedPodVolumes)
}
}
func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
return nil, fmt.Errorf("GetNodeLabels() unsupported in Attach/Detach controller")
}

View File

@ -86,3 +86,7 @@ func (adc *PersistentVolumeController) GetSecretFunc() func(namespace, name stri
return nil, fmt.Errorf("GetSecret unsupported in PersistentVolumeController")
}
}
func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, error) {
return nil, fmt.Errorf("GetNodeLabels() unsupported in PersistentVolumeController")
}

View File

@ -140,3 +140,11 @@ func (kvh *kubeletVolumeHost) GetNodeAllocatable() (v1.ResourceList, error) {
func (kvh *kubeletVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secret, error) {
return kvh.secretManager.GetSecret
}
func (kvh *kubeletVolumeHost) GetNodeLabels() (map[string]string, error) {
node, err := kvh.kubelet.GetNode()
if err != nil {
return nil, fmt.Errorf("error retrieving node: %v", err)
}
return node.Labels, nil
}

View File

@ -106,6 +106,7 @@ filegroup(
"//pkg/volume/glusterfs:all-srcs",
"//pkg/volume/host_path:all-srcs",
"//pkg/volume/iscsi:all-srcs",
"//pkg/volume/local:all-srcs",
"//pkg/volume/nfs:all-srcs",
"//pkg/volume/photon_pd:all-srcs",
"//pkg/volume/portworx:all-srcs",

56
pkg/volume/local/BUILD Normal file
View File

@ -0,0 +1,56 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = [
"doc.go",
"local.go",
],
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/util/strings:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["local_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/testing:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

21
pkg/volume/local/OWNERS Normal file
View File

@ -0,0 +1,21 @@
approvers:
- saad-ali
- thockin
- vishh
- msau42
- jingxu97
- jsafrane
reviewers:
- thockin
- smarterclayton
- deads2k
- brendandburns
- derekwaynecarr
- pmorie
- saad-ali
- justinsb
- jsafrane
- rootfs
- jingxu97
- msau42
- vishh

18
pkg/volume/local/doc.go Normal file
View File

@ -0,0 +1,18 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package local contains the internal representation of local volumes
package local // import "k8s.io/kubernetes/pkg/volume/local"

267
pkg/volume/local/local.go Normal file
View File

@ -0,0 +1,267 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"fmt"
"os"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/strings"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
)
// This is the primary entrypoint for volume plugins.
func ProbeVolumePlugins() []volume.VolumePlugin {
return []volume.VolumePlugin{&localVolumePlugin{}}
}
type localVolumePlugin struct {
host volume.VolumeHost
}
var _ volume.VolumePlugin = &localVolumePlugin{}
var _ volume.PersistentVolumePlugin = &localVolumePlugin{}
const (
localVolumePluginName = "kubernetes.io/local-volume"
)
func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
return nil
}
func (plugin *localVolumePlugin) GetPluginName() string {
return localVolumePluginName
}
func (plugin *localVolumePlugin) GetVolumeName(spec *volume.Spec) (string, error) {
// This volume is only supported as a PersistentVolumeSource, so the PV name is unique
return spec.Name(), nil
}
func (plugin *localVolumePlugin) CanSupport(spec *volume.Spec) bool {
// This volume is only supported as a PersistentVolumeSource
return (spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Local != nil)
}
func (plugin *localVolumePlugin) RequiresRemount() bool {
return false
}
func (plugin *localVolumePlugin) SupportsMountOption() bool {
return false
}
func (plugin *localVolumePlugin) SupportsBulkVolumeVerification() bool {
return false
}
func (plugin *localVolumePlugin) GetAccessModes() []v1.PersistentVolumeAccessMode {
// The current meaning of AccessMode is how many nodes can attach to it, not how many pods can mount it
return []v1.PersistentVolumeAccessMode{
v1.ReadWriteOnce,
}
}
func getVolumeSource(spec *volume.Spec) (*v1.LocalVolumeSource, bool, error) {
if spec.PersistentVolume != nil && spec.PersistentVolume.Spec.Local != nil {
return spec.PersistentVolume.Spec.Local, spec.ReadOnly, nil
}
return nil, false, fmt.Errorf("Spec does not reference a Local volume type")
}
func (plugin *localVolumePlugin) NewMounter(spec *volume.Spec, pod *v1.Pod, _ volume.VolumeOptions) (volume.Mounter, error) {
volumeSource, readOnly, err := getVolumeSource(spec)
if err != nil {
return nil, err
}
return &localVolumeMounter{
localVolume: &localVolume{
podUID: pod.UID,
volName: spec.Name(),
mounter: plugin.host.GetMounter(),
plugin: plugin,
globalPath: volumeSource.Path,
},
readOnly: readOnly,
}, nil
}
func (plugin *localVolumePlugin) NewUnmounter(volName string, podUID types.UID) (volume.Unmounter, error) {
return &localVolumeUnmounter{
localVolume: &localVolume{
podUID: podUID,
volName: volName,
mounter: plugin.host.GetMounter(),
plugin: plugin,
},
}, nil
}
// TODO: check if no path and no topology constraints are ok
func (plugin *localVolumePlugin) ConstructVolumeSpec(volumeName, mountPath string) (*volume.Spec, error) {
localVolume := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: volumeName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: "",
},
},
},
}
return volume.NewSpecFromPersistentVolume(localVolume, false), nil
}
// Local volumes represent a local directory on a node.
// The directory at the globalPath will be bind-mounted to the pod's directory
type localVolume struct {
volName string
podUID types.UID
// Global path to the volume
globalPath string
// Mounter interface that provides system calls to mount the global path to the pod local path.
mounter mount.Interface
plugin *localVolumePlugin
// TODO: add metrics
volume.MetricsNil
}
func (l *localVolume) GetPath() string {
return l.plugin.host.GetPodVolumeDir(l.podUID, strings.EscapeQualifiedNameForDisk(localVolumePluginName), l.volName)
}
type localVolumeMounter struct {
*localVolume
readOnly bool
}
var _ volume.Mounter = &localVolumeMounter{}
func (m *localVolumeMounter) GetAttributes() volume.Attributes {
return volume.Attributes{
ReadOnly: m.readOnly,
Managed: !m.readOnly,
SupportsSELinux: true,
}
}
// CanMount checks prior to mount operations to verify that the required components (binaries, etc.)
// to mount the volume are available on the underlying node.
// If not, it returns an error
func (m *localVolumeMounter) CanMount() error {
return nil
}
// SetUp bind mounts the directory to the volume path
func (m *localVolumeMounter) SetUp(fsGroup *types.UnixGroupID) error {
return m.SetUpAt(m.GetPath(), fsGroup)
}
// SetUpAt bind mounts the directory to the volume path and sets up volume ownership
func (m *localVolumeMounter) SetUpAt(dir string, fsGroup *types.UnixGroupID) error {
if m.globalPath == "" {
err := fmt.Errorf("LocalVolume volume %q path is empty", m.volName)
return err
}
notMnt, err := m.mounter.IsLikelyNotMountPoint(dir)
glog.V(4).Infof("LocalVolume mount setup: PodDir(%s) VolDir(%s) Mounted(%t) Error(%v), ReadOnly(%t)", dir, m.globalPath, !notMnt, err, m.readOnly)
if err != nil && !os.IsNotExist(err) {
glog.Errorf("cannot validate mount point: %s %v", dir, err)
return err
}
if !notMnt {
return nil
}
if err := os.MkdirAll(dir, 0750); err != nil {
glog.Errorf("mkdir failed on disk %s (%v)", dir, err)
return err
}
// Perform a bind mount to the full path to allow duplicate mounts of the same volume.
options := []string{"bind"}
if m.readOnly {
options = append(options, "ro")
}
glog.V(4).Infof("attempting to mount %s", dir)
err = m.mounter.Mount(m.globalPath, dir, "", options)
if err != nil {
glog.Errorf("Mount of volume %s failed: %v", dir, err)
notMnt, mntErr := m.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notMnt {
if mntErr = m.mounter.Unmount(dir); mntErr != nil {
glog.Errorf("Failed to unmount: %v", mntErr)
return err
}
notMnt, mntErr = m.mounter.IsLikelyNotMountPoint(dir)
if mntErr != nil {
glog.Errorf("IsLikelyNotMountPoint check failed: %v", mntErr)
return err
}
if !notMnt {
// This is very odd, we don't expect it. We'll try again next sync loop.
glog.Errorf("%s is still mounted, despite call to unmount(). Will try again next sync loop.", dir)
return err
}
}
os.Remove(dir)
return err
}
if !m.readOnly {
// TODO: how to prevent multiple mounts with conflicting fsGroup?
return volume.SetVolumeOwnership(m, fsGroup)
}
return nil
}
type localVolumeUnmounter struct {
*localVolume
}
var _ volume.Unmounter = &localVolumeUnmounter{}
// TearDown unmounts the bind mount
func (u *localVolumeUnmounter) TearDown() error {
return u.TearDownAt(u.GetPath())
}
// TearDownAt unmounts the bind mount
func (u *localVolumeUnmounter) TearDownAt(dir string) error {
glog.V(4).Infof("Unmounting volume %q at path %q\n", u.volName, dir)
return util.UnmountPath(dir, u.mounter)
}

View File

@ -0,0 +1,288 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package local
import (
"os"
"path"
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
)
const (
testPVName = "pvA"
testMountPath = "pods/poduid/volumes/kubernetes.io~local-volume/pvA"
testNodeName = "fakeNodeName"
)
func getPlugin(t *testing.T) (string, volume.VolumePlugin) {
tmpDir, err := utiltesting.MkTmpdir("localVolumeTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
plug, err := plugMgr.FindPluginByName(localVolumePluginName)
if err != nil {
os.RemoveAll(tmpDir)
t.Fatalf("Can't find the plugin by name")
}
if plug.GetPluginName() != localVolumePluginName {
t.Errorf("Wrong name: %s", plug.GetPluginName())
}
return tmpDir, plug
}
func getPersistentPlugin(t *testing.T) (string, volume.PersistentVolumePlugin) {
tmpDir, err := utiltesting.MkTmpdir("localVolumeTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
plug, err := plugMgr.FindPersistentPluginByName(localVolumePluginName)
if err != nil {
os.RemoveAll(tmpDir)
t.Fatalf("Can't find the plugin by name")
}
if plug.GetPluginName() != localVolumePluginName {
t.Errorf("Wrong name: %s", plug.GetPluginName())
}
return tmpDir, plug
}
func getTestVolume(readOnly bool) *volume.Spec {
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
Name: testPVName,
},
Spec: v1.PersistentVolumeSpec{
PersistentVolumeSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: "/test-vol",
},
},
},
}
return volume.NewSpecFromPersistentVolume(pv, readOnly)
}
func contains(modes []v1.PersistentVolumeAccessMode, mode v1.PersistentVolumeAccessMode) bool {
for _, m := range modes {
if m == mode {
return true
}
}
return false
}
func TestCanSupport(t *testing.T) {
tmpDir, plug := getPlugin(t)
defer os.RemoveAll(tmpDir)
if !plug.CanSupport(getTestVolume(false)) {
t.Errorf("Expected true")
}
}
func TestGetAccessModes(t *testing.T) {
tmpDir, plug := getPersistentPlugin(t)
defer os.RemoveAll(tmpDir)
modes := plug.GetAccessModes()
if !contains(modes, v1.ReadWriteOnce) {
t.Errorf("Expected AccessModeType %q", v1.ReadWriteOnce)
}
if contains(modes, v1.ReadWriteMany) {
t.Errorf("Found AccessModeType %q, expected not", v1.ReadWriteMany)
}
if contains(modes, v1.ReadOnlyMany) {
t.Errorf("Found AccessModeType %q, expected not", v1.ReadOnlyMany)
}
}
func TestGetVolumeName(t *testing.T) {
tmpDir, plug := getPersistentPlugin(t)
defer os.RemoveAll(tmpDir)
volName, err := plug.GetVolumeName(getTestVolume(false))
if err != nil {
t.Errorf("Failed to get volume name: %v", err)
}
if volName != testPVName {
t.Errorf("Expected volume name %q, got %q", testPVName, volName)
}
}
func TestMountUnmount(t *testing.T) {
tmpDir, plug := getPlugin(t)
defer os.RemoveAll(tmpDir)
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
mounter, err := plug.NewMounter(getTestVolume(false), pod, volume.VolumeOptions{})
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatalf("Got a nil Mounter")
}
volPath := path.Join(tmpDir, testMountPath)
path := mounter.GetPath()
if path != volPath {
t.Errorf("Got unexpected path: %s", path)
}
if err := mounter.SetUp(nil); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err != nil {
if os.IsNotExist(err) {
t.Errorf("SetUp() failed, volume path not created: %s", path)
} else {
t.Errorf("SetUp() failed: %v", err)
}
}
unmounter, err := plug.NewUnmounter(testPVName, pod.UID)
if err != nil {
t.Errorf("Failed to make a new Unmounter: %v", err)
}
if unmounter == nil {
t.Fatalf("Got a nil Unmounter")
}
if err := unmounter.TearDown(); err != nil {
t.Errorf("Expected success, got: %v", err)
}
if _, err := os.Stat(path); err == nil {
t.Errorf("TearDown() failed, volume path still exists: %s", path)
} else if !os.IsNotExist(err) {
t.Errorf("SetUp() failed: %v", err)
}
}
func TestConstructVolumeSpec(t *testing.T) {
tmpDir, plug := getPlugin(t)
defer os.RemoveAll(tmpDir)
volPath := path.Join(tmpDir, testMountPath)
spec, err := plug.ConstructVolumeSpec(testPVName, volPath)
if err != nil {
t.Errorf("ConstructVolumeSpec() failed: %v", err)
}
if spec == nil {
t.Fatalf("ConstructVolumeSpec() returned nil")
}
volName := spec.Name()
if volName != testPVName {
t.Errorf("Expected volume name %q, got %q", testPVName, volName)
}
if spec.Volume != nil {
t.Errorf("Volume object returned, expected nil")
}
pv := spec.PersistentVolume
if pv == nil {
t.Fatalf("PersistentVolume object nil")
}
ls := pv.Spec.PersistentVolumeSource.Local
if ls == nil {
t.Fatalf("LocalVolumeSource object nil")
}
}
func TestPersistentClaimReadOnlyFlag(t *testing.T) {
tmpDir, plug := getPlugin(t)
defer os.RemoveAll(tmpDir)
// Read only == true
pod := &v1.Pod{ObjectMeta: metav1.ObjectMeta{UID: types.UID("poduid")}}
mounter, err := plug.NewMounter(getTestVolume(true), pod, volume.VolumeOptions{})
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatalf("Got a nil Mounter")
}
if !mounter.GetAttributes().ReadOnly {
t.Errorf("Expected true for mounter.IsReadOnly")
}
// Read only == false
mounter, err = plug.NewMounter(getTestVolume(false), pod, volume.VolumeOptions{})
if err != nil {
t.Errorf("Failed to make a new Mounter: %v", err)
}
if mounter == nil {
t.Fatalf("Got a nil Mounter")
}
if mounter.GetAttributes().ReadOnly {
t.Errorf("Expected false for mounter.IsReadOnly")
}
}
func TestUnsupportedPlugins(t *testing.T) {
tmpDir, err := utiltesting.MkTmpdir("localVolumeTest")
if err != nil {
t.Fatalf("can't make a temp dir: %v", err)
}
defer os.RemoveAll(tmpDir)
plugMgr := volume.VolumePluginMgr{}
plugMgr.InitPlugins(ProbeVolumePlugins(), volumetest.NewFakeVolumeHost(tmpDir, nil, nil))
spec := getTestVolume(false)
recyclePlug, err := plugMgr.FindRecyclablePluginBySpec(spec)
if err == nil && recyclePlug != nil {
t.Errorf("Recyclable plugin found, expected none")
}
deletePlug, err := plugMgr.FindDeletablePluginByName(localVolumePluginName)
if err == nil && deletePlug != nil {
t.Errorf("Deletable plugin found, expected none")
}
attachPlug, err := plugMgr.FindAttachablePluginByName(localVolumePluginName)
if err == nil && attachPlug != nil {
t.Errorf("Attachable plugin found, expected none")
}
createPlug, err := plugMgr.FindCreatablePluginBySpec(spec)
if err == nil && createPlug != nil {
t.Errorf("Creatable plugin found, expected none")
}
provisionPlug, err := plugMgr.FindProvisionablePluginByName(localVolumePluginName)
if err == nil && provisionPlug != nil {
t.Errorf("Provisionable plugin found, expected none")
}
}

View File

@ -232,12 +232,16 @@ type VolumeHost interface {
// Returns a function that returns a secret.
GetSecretFunc() func(namespace, name string) (*v1.Secret, error)
// Returns the labels on the node
GetNodeLabels() (map[string]string, error)
}
// VolumePluginMgr tracks registered plugins.
type VolumePluginMgr struct {
mutex sync.Mutex
plugins map[string]VolumePlugin
Host VolumeHost
}
// Spec is an internal representation of a volume. All API volume types translate to Spec.
@ -336,6 +340,7 @@ func (pm *VolumePluginMgr) InitPlugins(plugins []VolumePlugin, host VolumeHost)
pm.mutex.Lock()
defer pm.mutex.Unlock()
pm.Host = host
if pm.plugins == nil {
pm.plugins = map[string]VolumePlugin{}
}

View File

@ -134,6 +134,10 @@ func (f *fakeVolumeHost) GetSecretFunc() func(namespace, name string) (*v1.Secre
}
}
func (f *fakeVolumeHost) GetNodeLabels() (map[string]string, error) {
return map[string]string{"test-label": "test-value"}, nil
}
func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
if _, ok := config.OtherAttributes["fake-property"]; ok {
return []VolumePlugin{

View File

@ -29,6 +29,7 @@ go_library(
"//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
],
)
@ -38,10 +39,14 @@ go_test(
srcs = [
"atomic_writer_test.go",
"device_util_linux_test.go",
"util_test.go",
],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/api/v1/helper:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],

View File

@ -18,9 +18,11 @@ go_library(
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/events:go_default_library",
"//pkg/util/mount:go_default_library",
"//pkg/volume:go_default_library",
"//pkg/volume/util:go_default_library",
"//pkg/volume/util/nestedpendingoperations:go_default_library",
"//pkg/volume/util/types:go_default_library",
"//pkg/volume/util/volumehelper:go_default_library",
@ -28,6 +30,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -24,12 +24,15 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/features"
kevents "k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/volume"
"k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
@ -363,6 +366,11 @@ func (og *operationGenerator) GenerateMountVolumeFunc(
return nil, volumeToMount.GenerateErrorDetailed("MountVolume.FindPluginBySpec failed", err)
}
affinityErr := checkNodeAffinity(og, volumeToMount, volumePlugin)
if affinityErr != nil {
return nil, affinityErr
}
volumeMounter, newMounterErr := volumePlugin.NewMounter(
volumeToMount.VolumeSpec,
volumeToMount.Pod,
@ -713,3 +721,27 @@ func checkMountOptionSupport(og *operationGenerator, volumeToMount VolumeToMount
}
return nil
}
// checkNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels
// This ensures that we don't mount a volume that doesn't belong to this node
func checkNodeAffinity(og *operationGenerator, volumeToMount VolumeToMount, plugin volume.VolumePlugin) error {
if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
return nil
}
pv := volumeToMount.VolumeSpec.PersistentVolume
if pv != nil {
nodeLabels, err := og.volumePluginMgr.Host.GetNodeLabels()
if err != nil {
return volumeToMount.GenerateErrorDetailed("Error getting node labels", err)
}
err = util.CheckNodeAffinity(pv, nodeLabels)
if err != nil {
eventErr, detailedErr := volumeToMount.GenerateError("Storage node affinity check failed", err)
og.recorder.Eventf(volumeToMount.Pod, v1.EventTypeWarning, kevents.FailedMountVolume, eventErr.Error())
return detailedErr
}
}
return nil
}

View File

@ -23,6 +23,7 @@ import (
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/kubernetes/pkg/api/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
storage "k8s.io/kubernetes/pkg/apis/storage/v1"
@ -164,3 +165,30 @@ func GetClassForVolume(kubeClient clientset.Interface, pv *v1.PersistentVolume)
}
return class, nil
}
// CheckNodeAffinity looks at the PV node affinity, and checks if the node has the same corresponding labels
// This ensures that we don't mount a volume that doesn't belong to this node
func CheckNodeAffinity(pv *v1.PersistentVolume, nodeLabels map[string]string) error {
affinity, err := v1helper.GetStorageNodeAffinityFromAnnotation(pv.Annotations)
if err != nil {
return fmt.Errorf("Error getting storage node affinity: %v", err)
}
if affinity == nil {
return nil
}
if affinity.RequiredDuringSchedulingIgnoredDuringExecution != nil {
terms := affinity.RequiredDuringSchedulingIgnoredDuringExecution.NodeSelectorTerms
glog.V(10).Infof("Match for RequiredDuringSchedulingIgnoredDuringExecution node selector terms %+v", terms)
for _, term := range terms {
selector, err := v1helper.NodeSelectorRequirementsAsSelector(term.MatchExpressions)
if err != nil {
return fmt.Errorf("Failed to parse MatchExpressions: %v", err)
}
if !selector.Matches(labels.Set(nodeLabels)) {
return fmt.Errorf("NodeSelectorTerm %+v does not match node labels", term.MatchExpressions)
}
}
}
return nil
}

View File

@ -0,0 +1,142 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package util
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/helper"
)
var nodeLabels map[string]string = map[string]string{
"test-key1": "test-value1",
"test-key2": "test-value2",
}
func TestCheckNodeAffinity(t *testing.T) {
type affinityTest struct {
name string
expectSuccess bool
pv *v1.PersistentVolume
}
cases := []affinityTest{
{
name: "valid-no-constraints",
expectSuccess: true,
pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{}),
},
{
name: "valid-constraints",
expectSuccess: true,
pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "test-key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value1", "test-value3"},
},
{
Key: "test-key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value0", "test-value2"},
},
},
},
},
},
}),
},
{
name: "invalid-key",
expectSuccess: false,
pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "test-key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value1", "test-value3"},
},
{
Key: "test-key3",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value0", "test-value2"},
},
},
},
},
},
}),
},
{
name: "invalid-values",
expectSuccess: false,
pv: testVolumeWithNodeAffinity(t, &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: "test-key1",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value3", "test-value4"},
},
{
Key: "test-key2",
Operator: v1.NodeSelectorOpIn,
Values: []string{"test-value0", "test-value2"},
},
},
},
},
},
}),
},
}
for _, c := range cases {
err := CheckNodeAffinity(c.pv, nodeLabels)
if err != nil && c.expectSuccess {
t.Errorf("CheckTopology %v returned error: %v", c.name, err)
}
if err == nil && !c.expectSuccess {
t.Errorf("CheckTopology %v returned success, expected error", c.name)
}
}
}
func testVolumeWithNodeAffinity(t *testing.T, affinity *v1.NodeAffinity) *v1.PersistentVolume {
objMeta := metav1.ObjectMeta{Name: "test-constraints"}
objMeta.Annotations = map[string]string{}
err := helper.StorageNodeAffinityToAlphaAnnotation(objMeta.Annotations, affinity)
if err != nil {
t.Fatalf("Failed to get node affinity annotation: %v", err)
}
return &v1.PersistentVolume{
ObjectMeta: objMeta,
}
}

View File

@ -22,6 +22,8 @@ go_library(
"//pkg/api/v1/helper:go_default_library",
"//pkg/api/v1/helper/qos:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/features:go_default_library",
"//pkg/volume/util:go_default_library",
"//plugin/pkg/scheduler/algorithm:go_default_library",
"//plugin/pkg/scheduler/algorithm/priorities/util:go_default_library",
"//plugin/pkg/scheduler/schedulercache:go_default_library",
@ -30,7 +32,9 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/labels:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//vendor/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/k8s.io/metrics/pkg/client/clientset_generated/clientset:go_default_library",
],
)

View File

@ -37,6 +37,7 @@ var (
ErrMaxVolumeCountExceeded = newPredicateFailureError("MaxVolumeCount")
ErrNodeUnderMemoryPressure = newPredicateFailureError("NodeUnderMemoryPressure")
ErrNodeUnderDiskPressure = newPredicateFailureError("NodeUnderDiskPressure")
ErrVolumeNodeConflict = newPredicateFailureError("NoVolumeNodeConflict")
// ErrFakePredicate is used for test only. The fake predicates returning false also returns error
// as ErrFakePredicate.
ErrFakePredicate = newPredicateFailureError("FakePredicateError")

View File

@ -29,14 +29,18 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
v1qos "k8s.io/kubernetes/pkg/api/v1/helper/qos"
corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/features"
volumeutil "k8s.io/kubernetes/pkg/volume/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
priorityutil "k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/priorities/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"k8s.io/metrics/pkg/client/clientset_generated/clientset"
)
// predicatePrecomputations: Helper types/variables...
@ -1264,3 +1268,81 @@ func CheckNodeDiskPressurePredicate(pod *v1.Pod, meta interface{}, nodeInfo *sch
}
return true, nil, nil
}
type VolumeNodeChecker struct {
pvInfo PersistentVolumeInfo
pvcInfo PersistentVolumeClaimInfo
client clientset.Interface
}
// VolumeNodeChecker evaluates if a pod can fit due to the volumes it requests, given
// that some volumes have node topology constraints, particularly when using Local PVs.
// The requirement is that any pod that uses a PVC that is bound to a PV with topology constraints
// must be scheduled to a node that satisfies the PV's topology labels.
func NewVolumeNodePredicate(pvInfo PersistentVolumeInfo, pvcInfo PersistentVolumeClaimInfo, client clientset.Interface) algorithm.FitPredicate {
c := &VolumeNodeChecker{
pvInfo: pvInfo,
pvcInfo: pvcInfo,
client: client,
}
return c.predicate
}
func (c *VolumeNodeChecker) predicate(pod *v1.Pod, meta interface{}, nodeInfo *schedulercache.NodeInfo) (bool, []algorithm.PredicateFailureReason, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.PersistentLocalVolumes) {
return true, nil, nil
}
// If a pod doesn't have any volume attached to it, the predicate will always be true.
// Thus we make a fast path for it, to avoid unnecessary computations in this case.
if len(pod.Spec.Volumes) == 0 {
return true, nil, nil
}
node := nodeInfo.Node()
if node == nil {
return false, nil, fmt.Errorf("node not found")
}
glog.V(2).Infof("Checking for prebound volumes with node affinity")
namespace := pod.Namespace
manifest := &(pod.Spec)
for i := range manifest.Volumes {
volume := &manifest.Volumes[i]
if volume.PersistentVolumeClaim == nil {
continue
}
pvcName := volume.PersistentVolumeClaim.ClaimName
if pvcName == "" {
return false, nil, fmt.Errorf("PersistentVolumeClaim had no name")
}
pvc, err := c.pvcInfo.GetPersistentVolumeClaimInfo(namespace, pvcName)
if err != nil {
return false, nil, err
}
if pvc == nil {
return false, nil, fmt.Errorf("PersistentVolumeClaim was not found: %q", pvcName)
}
pvName := pvc.Spec.VolumeName
if pvName == "" {
return false, nil, fmt.Errorf("PersistentVolumeClaim is not bound: %q", pvcName)
}
pv, err := c.pvInfo.GetPersistentVolumeInfo(pvName)
if err != nil {
return false, nil, err
}
if pv == nil {
return false, nil, fmt.Errorf("PersistentVolume not found: %q", pvName)
}
err = volumeutil.CheckNodeAffinity(pv, node.Labels)
if err != nil {
glog.V(2).Infof("Won't schedule pod %q onto node %q due to volume %q node mismatch: %v", pod.Name, node.Name, pvName, err.Error())
return false, []algorithm.PredicateFailureReason{ErrVolumeNodeConflict}, nil
}
glog.V(4).Infof("VolumeNode predicate allows node %q for pod %q due to volume %q", node.Name, pod.Name, pvName)
}
return true, nil, nil
}

View File

@ -313,6 +313,77 @@ func TestCompatibility_v1_Scheduler(t *testing.T) {
},
},
},
// Do not change this JSON after the corresponding release has been tagged.
// A failure indicates backwards compatibility with the specified release was broken.
"1.7": {
JSON: `{
"kind": "Policy",
"apiVersion": "v1",
"predicates": [
{"name": "MatchNodeSelector"},
{"name": "PodFitsResources"},
{"name": "PodFitsHostPorts"},
{"name": "HostName"},
{"name": "NoDiskConflict"},
{"name": "NoVolumeZoneConflict"},
{"name": "PodToleratesNodeTaints"},
{"name": "CheckNodeMemoryPressure"},
{"name": "CheckNodeDiskPressure"},
{"name": "MaxEBSVolumeCount"},
{"name": "MaxGCEPDVolumeCount"},
{"name": "MaxAzureDiskVolumeCount"},
{"name": "MatchInterPodAffinity"},
{"name": "GeneralPredicates"},
{"name": "TestServiceAffinity", "argument": {"serviceAffinity" : {"labels" : ["region"]}}},
{"name": "TestLabelsPresence", "argument": {"labelsPresence" : {"labels" : ["foo"], "presence":true}}},
{"name": "NoVolumeNodeConflict"}
],"priorities": [
{"name": "EqualPriority", "weight": 2},
{"name": "ImageLocalityPriority", "weight": 2},
{"name": "LeastRequestedPriority", "weight": 2},
{"name": "BalancedResourceAllocation", "weight": 2},
{"name": "SelectorSpreadPriority", "weight": 2},
{"name": "NodePreferAvoidPodsPriority", "weight": 2},
{"name": "NodeAffinityPriority", "weight": 2},
{"name": "TaintTolerationPriority", "weight": 2},
{"name": "InterPodAffinityPriority", "weight": 2},
{"name": "MostRequestedPriority", "weight": 2}
]
}`,
ExpectedPolicy: schedulerapi.Policy{
Predicates: []schedulerapi.PredicatePolicy{
{Name: "MatchNodeSelector"},
{Name: "PodFitsResources"},
{Name: "PodFitsHostPorts"},
{Name: "HostName"},
{Name: "NoDiskConflict"},
{Name: "NoVolumeZoneConflict"},
{Name: "PodToleratesNodeTaints"},
{Name: "CheckNodeMemoryPressure"},
{Name: "CheckNodeDiskPressure"},
{Name: "MaxEBSVolumeCount"},
{Name: "MaxGCEPDVolumeCount"},
{Name: "MaxAzureDiskVolumeCount"},
{Name: "MatchInterPodAffinity"},
{Name: "GeneralPredicates"},
{Name: "TestServiceAffinity", Argument: &schedulerapi.PredicateArgument{ServiceAffinity: &schedulerapi.ServiceAffinity{Labels: []string{"region"}}}},
{Name: "TestLabelsPresence", Argument: &schedulerapi.PredicateArgument{LabelsPresence: &schedulerapi.LabelsPresence{Labels: []string{"foo"}, Presence: true}}},
{Name: "NoVolumeNodeConflict"},
},
Priorities: []schedulerapi.PriorityPolicy{
{Name: "EqualPriority", Weight: 2},
{Name: "ImageLocalityPriority", Weight: 2},
{Name: "LeastRequestedPriority", Weight: 2},
{Name: "BalancedResourceAllocation", Weight: 2},
{Name: "SelectorSpreadPriority", Weight: 2},
{Name: "NodePreferAvoidPodsPriority", Weight: 2},
{Name: "NodeAffinityPriority", Weight: 2},
{Name: "TaintTolerationPriority", Weight: 2},
{Name: "InterPodAffinityPriority", Weight: 2},
{Name: "MostRequestedPriority", Weight: 2},
},
},
},
}
registeredPredicates := sets.NewString(factory.ListRegisteredFitPredicates()...)

View File

@ -176,6 +176,14 @@ func defaultPredicates() sets.String {
// Fit is determined by node disk pressure condition.
factory.RegisterFitPredicate("CheckNodeDiskPressure", predicates.CheckNodeDiskPressurePredicate),
// Fit is determined by volume zone requirements.
factory.RegisterFitPredicateFactory(
"NoVolumeNodeConflict",
func(args factory.PluginFactoryArgs) algorithm.FitPredicate {
return predicates.NewVolumeNodePredicate(args.PVInfo, args.PVCInfo, nil)
},
),
)
}

View File

@ -590,6 +590,10 @@ func GetStorageNodeAffinityFromAnnotation(annotations map[string]string) (*api.N
// Converts NodeAffinity type to Alpha annotation for use in PersistentVolumes
// TODO: update when storage node affinity graduates to beta
func StorageNodeAffinityToAlphaAnnotation(annotations map[string]string, affinity *api.NodeAffinity) error {
if affinity == nil {
return nil
}
json, err := json.Marshal(*affinity)
if err != nil {
return err

View File

@ -35,6 +35,7 @@ import (
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/api/v1/helper"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
awscloud "k8s.io/kubernetes/pkg/cloudprovider/providers/aws"
gcecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/gce"
@ -73,11 +74,13 @@ type PVCMap map[types.NamespacedName]pvcval
// },
// }
type PersistentVolumeConfig struct {
PVSource v1.PersistentVolumeSource
Prebind *v1.PersistentVolumeClaim
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
NamePrefix string
Labels labels.Set
PVSource v1.PersistentVolumeSource
Prebind *v1.PersistentVolumeClaim
ReclaimPolicy v1.PersistentVolumeReclaimPolicy
NamePrefix string
Labels labels.Set
StorageClassName string
NodeAffinity *v1.NodeAffinity
}
// PersistentVolumeClaimConfig is consumed by MakePersistentVolumeClaim() to generate a PVC object.
@ -85,9 +88,10 @@ type PersistentVolumeConfig struct {
// (+optional) Annotations defines the PVC's annotations
type PersistentVolumeClaimConfig struct {
AccessModes []v1.PersistentVolumeAccessMode
Annotations map[string]string
Selector *metav1.LabelSelector
AccessModes []v1.PersistentVolumeAccessMode
Annotations map[string]string
Selector *metav1.LabelSelector
StorageClassName *string
}
// Clean up a pv and pvc in a single pv/pvc test case.
@ -561,7 +565,7 @@ func makePvcKey(ns, name string) types.NamespacedName {
// is assigned, assumes "Retain". Specs are expected to match the test's PVC.
// Note: the passed-in claim does not have a name until it is created and thus the PV's
// ClaimRef cannot be completely filled-in in this func. Therefore, the ClaimRef's name
// is added later in createPVCPV.
// is added later in CreatePVCPV.
func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume {
var claimRef *v1.ObjectReference
// If the reclaimPolicy is not provided, assume Retain
@ -575,7 +579,7 @@ func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume
Namespace: pvConfig.Prebind.Namespace,
}
}
return &v1.PersistentVolume{
pv := &v1.PersistentVolume{
ObjectMeta: metav1.ObjectMeta{
GenerateName: pvConfig.NamePrefix,
Labels: pvConfig.Labels,
@ -594,9 +598,16 @@ func MakePersistentVolume(pvConfig PersistentVolumeConfig) *v1.PersistentVolume
v1.ReadOnlyMany,
v1.ReadWriteMany,
},
ClaimRef: claimRef,
ClaimRef: claimRef,
StorageClassName: pvConfig.StorageClassName,
},
}
err := helper.StorageNodeAffinityToAlphaAnnotation(pv.Annotations, pvConfig.NodeAffinity)
if err != nil {
Logf("Setting storage node affinity failed: %v", err)
return nil
}
return pv
}
// Returns a PVC definition based on the namespace.
@ -625,6 +636,7 @@ func MakePersistentVolumeClaim(cfg PersistentVolumeClaimConfig, ns string) *v1.P
v1.ResourceName(v1.ResourceStorage): resource.MustParse("1Gi"),
},
},
StorageClassName: cfg.StorageClassName,
},
}
}

View File

@ -76,11 +76,18 @@ type VolumeTestConfig struct {
ServerImage string
// Ports to export from the server pod. TCP only.
ServerPorts []int
// Commands to run in the container image.
ServerCmds []string
// Arguments to pass to the container image.
ServerArgs []string
// Volumes needed to be mounted to the server container from the host
// map <host (source) path> -> <container (dst.) path>
ServerVolumes map[string]string
// Wait for the pod to terminate successfully
// False indicates that the pod is long running
WaitForCompletion bool
// NodeName to run pod on. Default is any node.
NodeName string
}
// VolumeTest contains a volume to mount into a client pod and its
@ -133,6 +140,11 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1.
By(fmt.Sprint("creating ", serverPodName, " pod"))
privileged := new(bool)
*privileged = true
restartPolicy := v1.RestartPolicyAlways
if config.WaitForCompletion {
restartPolicy = v1.RestartPolicyNever
}
serverPod := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
@ -153,12 +165,15 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1.
SecurityContext: &v1.SecurityContext{
Privileged: privileged,
},
Command: config.ServerCmds,
Args: config.ServerArgs,
Ports: serverPodPorts,
VolumeMounts: mounts,
},
},
Volumes: volumes,
Volumes: volumes,
RestartPolicy: restartPolicy,
NodeName: config.NodeName,
},
}
@ -176,12 +191,16 @@ func StartVolumeServer(client clientset.Interface, config VolumeTestConfig) *v1.
ExpectNoError(err, "Failed to create %q pod: %v", serverPodName, err)
}
}
ExpectNoError(WaitForPodRunningInNamespace(client, serverPod))
if pod == nil {
By(fmt.Sprintf("locating the %q server pod", serverPodName))
pod, err = podClient.Get(serverPodName, metav1.GetOptions{})
ExpectNoError(err, "Cannot locate the server pod %q: %v", serverPodName, err)
if config.WaitForCompletion {
ExpectNoError(WaitForPodSuccessInNamespace(client, serverPod.Name, serverPod.Namespace))
ExpectNoError(podClient.Delete(serverPod.Name, nil))
} else {
ExpectNoError(WaitForPodRunningInNamespace(client, serverPod))
if pod == nil {
By(fmt.Sprintf("locating the %q server pod", serverPodName))
pod, err = podClient.Get(serverPodName, metav1.GetOptions{})
ExpectNoError(err, "Cannot locate the server pod %q: %v", serverPodName, err)
}
}
return pod
}

View File

@ -14,6 +14,7 @@ go_library(
"persistent_volumes.go",
"persistent_volumes-disruptive.go",
"persistent_volumes-gce.go",
"persistent_volumes-local.go",
"persistent_volumes-vsphere.go",
"pv_reclaimpolicy.go",
"pvc_label_selector.go",

View File

@ -0,0 +1,245 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storage
import (
"fmt"
"path/filepath"
. "github.com/onsi/ginkgo"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/test/e2e/framework"
)
type localTestConfig struct {
ns string
nodes []v1.Node
client clientset.Interface
}
type localTestVolume struct {
// Node that the volume is on
node *v1.Node
// Path to the volume on the host node
hostDir string
// Path to the volume in the local util container
containerDir string
// PVC for this volume
pvc *v1.PersistentVolumeClaim
// PV for this volume
pv *v1.PersistentVolume
}
const (
// TODO: This may not be available/writable on all images.
hostBase = "/tmp"
containerBase = "/myvol"
testFile = "test-file"
testContents = "testdata"
testSC = "local-test-storagclass"
)
var _ = framework.KubeDescribe("[Volume] PersistentVolumes-local [Feature:LocalPersistentVolumes] [Serial]", func() {
f := framework.NewDefaultFramework("persistent-local-volumes-test")
var (
config *localTestConfig
)
BeforeEach(func() {
config = &localTestConfig{
ns: f.Namespace.Name,
client: f.ClientSet,
nodes: []v1.Node{},
}
// Get all the schedulable nodes
nodes, err := config.client.CoreV1().Nodes().List(metav1.ListOptions{})
if err != nil {
framework.Failf("Failed to get nodes: %v", err)
}
for _, node := range nodes.Items {
if !node.Spec.Unschedulable {
// TODO: does this need to be a deep copy
config.nodes = append(config.nodes, node)
}
}
if len(config.nodes) == 0 {
framework.Failf("No available nodes for scheduling")
}
})
Context("when one pod requests one prebound PVC", func() {
var (
testVol *localTestVolume
node *v1.Node
)
BeforeEach(func() {
// Choose the first node
node = &config.nodes[0]
})
AfterEach(func() {
cleanupLocalVolume(config, testVol)
testVol = nil
})
It("should be able to mount and read from the volume", func() {
By("Initializing test volume")
testVol = setupLocalVolume(config, node)
By("Creating local PVC and PV")
createLocalPVCPV(config, testVol)
By("Creating a pod to consume the PV")
readCmd := fmt.Sprintf("cat /mnt/volume1/%s", testFile)
podSpec := createLocalPod(config, testVol, readCmd)
f.TestContainerOutput("pod consumes PV", podSpec, 0, []string{testContents})
})
It("should be able to mount and write to the volume", func() {
By("Initializing test volume")
testVol = setupLocalVolume(config, node)
By("Creating local PVC and PV")
createLocalPVCPV(config, testVol)
By("Creating a pod to write to the PV")
testFilePath := filepath.Join("/mnt/volume1", testFile)
cmd := fmt.Sprintf("echo %s > %s; cat %s", testVol.hostDir, testFilePath, testFilePath)
podSpec := createLocalPod(config, testVol, cmd)
f.TestContainerOutput("pod writes to PV", podSpec, 0, []string{testVol.hostDir})
})
})
})
// Launches a pod with hostpath volume on a specific node to setup a directory to use
// for the local PV
func setupLocalVolume(config *localTestConfig, node *v1.Node) *localTestVolume {
testDirName := "local-volume-test-" + string(uuid.NewUUID())
testDir := filepath.Join(containerBase, testDirName)
hostDir := filepath.Join(hostBase, testDirName)
testFilePath := filepath.Join(testDir, testFile)
writeCmd := fmt.Sprintf("mkdir %s; echo %s > %s", testDir, testContents, testFilePath)
framework.Logf("Creating local volume on node %q at path %q", node.Name, hostDir)
runLocalUtil(config, node.Name, writeCmd)
return &localTestVolume{
node: node,
hostDir: hostDir,
containerDir: testDir,
}
}
// Deletes the PVC/PV, and launches a pod with hostpath volume to remove the test directory
func cleanupLocalVolume(config *localTestConfig, volume *localTestVolume) {
if volume == nil {
return
}
By("Cleaning up PVC and PV")
errs := framework.PVPVCCleanup(config.client, config.ns, volume.pv, volume.pvc)
if len(errs) > 0 {
framework.Logf("AfterEach: Failed to delete PV and/or PVC: %v", utilerrors.NewAggregate(errs))
}
By("Removing the test directory")
removeCmd := fmt.Sprintf("rm -r %s", volume.containerDir)
runLocalUtil(config, volume.node.Name, removeCmd)
}
func runLocalUtil(config *localTestConfig, nodeName, cmd string) {
framework.StartVolumeServer(config.client, framework.VolumeTestConfig{
Namespace: config.ns,
Prefix: "local-volume-init",
ServerImage: "gcr.io/google_containers/busybox:1.24",
ServerCmds: []string{"/bin/sh"},
ServerArgs: []string{"-c", cmd},
ServerVolumes: map[string]string{
hostBase: containerBase,
},
WaitForCompletion: true,
NodeName: nodeName,
})
}
func makeLocalPVCConfig() framework.PersistentVolumeClaimConfig {
sc := testSC
return framework.PersistentVolumeClaimConfig{
AccessModes: []v1.PersistentVolumeAccessMode{v1.ReadWriteOnce},
StorageClassName: &sc,
}
}
func makeLocalPVConfig(volume *localTestVolume) framework.PersistentVolumeConfig {
// TODO: hostname may not be the best option
nodeKey := "kubernetes.io/hostname"
if volume.node.Labels == nil {
framework.Failf("Node does not have labels")
}
nodeValue, found := volume.node.Labels[nodeKey]
if !found {
framework.Failf("Node does not have required label %q", nodeKey)
}
return framework.PersistentVolumeConfig{
PVSource: v1.PersistentVolumeSource{
Local: &v1.LocalVolumeSource{
Path: volume.hostDir,
},
},
NamePrefix: "local-pv",
StorageClassName: testSC,
NodeAffinity: &v1.NodeAffinity{
RequiredDuringSchedulingIgnoredDuringExecution: &v1.NodeSelector{
NodeSelectorTerms: []v1.NodeSelectorTerm{
{
MatchExpressions: []v1.NodeSelectorRequirement{
{
Key: nodeKey,
Operator: v1.NodeSelectorOpIn,
Values: []string{nodeValue},
},
},
},
},
},
},
}
}
// Creates a PVC and PV with prebinding
func createLocalPVCPV(config *localTestConfig, volume *localTestVolume) {
pvcConfig := makeLocalPVCConfig()
pvConfig := makeLocalPVConfig(volume)
var err error
volume.pv, volume.pvc, err = framework.CreatePVPVC(config.client, pvConfig, pvcConfig, config.ns, true)
framework.ExpectNoError(err)
framework.WaitOnPVandPVC(config.client, config.ns, volume.pv, volume.pvc)
}
func createLocalPod(config *localTestConfig, volume *localTestVolume, cmd string) *v1.Pod {
return framework.MakePod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, cmd)
}