Merge pull request #33432 from pmorie/kubelet-move

Automatic merge from submit-queue

Move Kubelet pod-management code into kubelet_pods.go

Finish the kubelet code moves started during the 1.3 dev cycle -- move pod management code into a file called `kubelet_pods.go`.
pull/6/head
Kubernetes Submit Queue 2016-09-27 08:59:34 -07:00 committed by GitHub
commit 95fae4baf4
5 changed files with 2778 additions and 2704 deletions

File diff suppressed because it is too large Load Diff

1210
pkg/kubelet/kubelet_pods.go Normal file

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -17,9 +17,16 @@ limitations under the License.
package kubelet
import (
"fmt"
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/volume"
volumetest "k8s.io/kubernetes/pkg/volume/testing"
"k8s.io/kubernetes/pkg/volume/util/volumehelper"
)
func TestPodVolumesExist(t *testing.T) {
@ -105,3 +112,298 @@ func TestPodVolumesExist(t *testing.T) {
}
}
}
func TestVolumeAttachAndMountControllerDisabled(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device",
},
},
},
},
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
assert.NoError(t, err)
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
expectedPodVolumes := []string{"vol1"}
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
for _, name := range expectedPodVolumes {
assert.Contains(t, podVolumes, name, "Volumes for pod %+v", pod)
}
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyWaitForAttachCallCount(
1 /* expectedWaitForAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyAttachCallCount(
1 /* expectedAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyMountDeviceCallCount(
1 /* expectedMountDeviceCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifySetUpCallCount(
1 /* expectedSetUpCallCount */, testKubelet.volumePlugin))
}
func TestVolumeUnmountAndDetachControllerDisabled(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device",
},
},
},
},
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
// Add pod
kubelet.podManager.SetPods([]*api.Pod{pod})
// Verify volumes attached
err := kubelet.volumeManager.WaitForAttachAndMount(pod)
assert.NoError(t, err)
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
expectedPodVolumes := []string{"vol1"}
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
for _, name := range expectedPodVolumes {
assert.Contains(t, podVolumes, name, "Volumes for pod %+v", pod)
}
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyWaitForAttachCallCount(
1 /* expectedWaitForAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyAttachCallCount(
1 /* expectedAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyMountDeviceCallCount(
1 /* expectedMountDeviceCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifySetUpCallCount(
1 /* expectedSetUpCallCount */, testKubelet.volumePlugin))
// Remove pod
kubelet.podManager.SetPods([]*api.Pod{})
assert.NoError(t, waitForVolumeUnmount(kubelet.volumeManager, pod))
// Verify volumes unmounted
podVolumes = kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
assert.Len(t, podVolumes, 0,
"Expected volumes to be unmounted and detached. But some volumes are still mounted: %#v", podVolumes)
assert.NoError(t, volumetest.VerifyTearDownCallCount(
1 /* expectedTearDownCallCount */, testKubelet.volumePlugin))
// Verify volumes detached and no longer reported as in use
assert.NoError(t, waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager))
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyDetachCallCount(
1 /* expectedDetachCallCount */, testKubelet.volumePlugin))
}
func TestVolumeAttachAndMountControllerEnabled(t *testing.T) {
testKubelet := newTestKubelet(t, true /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("get", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Status: api.NodeStatus{
VolumesAttached: []api.AttachedVolume{
{
Name: "fake/vol1",
DevicePath: "fake/path",
},
}},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device",
},
},
},
},
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod))
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
expectedPodVolumes := []string{"vol1"}
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
for _, name := range expectedPodVolumes {
assert.Contains(t, podVolumes, name, "Volumes for pod %+v", pod)
}
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyWaitForAttachCallCount(
1 /* expectedWaitForAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyZeroAttachCalls(testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyMountDeviceCallCount(
1 /* expectedMountDeviceCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifySetUpCallCount(
1 /* expectedSetUpCallCount */, testKubelet.volumePlugin))
}
func TestVolumeUnmountAndDetachControllerEnabled(t *testing.T) {
testKubelet := newTestKubelet(t, true /* controllerAttachDetachEnabled */)
kubelet := testKubelet.kubelet
kubeClient := testKubelet.fakeKubeClient
kubeClient.AddReactor("get", "nodes",
func(action core.Action) (bool, runtime.Object, error) {
return true, &api.Node{
ObjectMeta: api.ObjectMeta{Name: testKubeletHostname},
Status: api.NodeStatus{
VolumesAttached: []api.AttachedVolume{
{
Name: "fake/vol1",
DevicePath: "fake/path",
},
}},
Spec: api.NodeSpec{ExternalID: testKubeletHostname},
}, nil
})
kubeClient.AddReactor("*", "*", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, fmt.Errorf("no reaction implemented for %s", action)
})
pod := podWithUidNameNsSpec("12345678", "foo", "test", api.PodSpec{
Volumes: []api.Volume{
{
Name: "vol1",
VolumeSource: api.VolumeSource{
GCEPersistentDisk: &api.GCEPersistentDiskVolumeSource{
PDName: "fake-device",
},
},
},
},
})
stopCh := runVolumeManager(kubelet)
defer func() {
close(stopCh)
}()
// Add pod
kubelet.podManager.SetPods([]*api.Pod{pod})
// Fake node status update
go simulateVolumeInUseUpdate(
api.UniqueVolumeName("fake/vol1"),
stopCh,
kubelet.volumeManager)
// Verify volumes attached
assert.NoError(t, kubelet.volumeManager.WaitForAttachAndMount(pod))
podVolumes := kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
expectedPodVolumes := []string{"vol1"}
assert.Len(t, podVolumes, len(expectedPodVolumes), "Volumes for pod %+v", pod)
for _, name := range expectedPodVolumes {
assert.Contains(t, podVolumes, name, "Volumes for pod %+v", pod)
}
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyWaitForAttachCallCount(
1 /* expectedWaitForAttachCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyZeroAttachCalls(testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifyMountDeviceCallCount(
1 /* expectedMountDeviceCallCount */, testKubelet.volumePlugin))
assert.NoError(t, volumetest.VerifySetUpCallCount(
1 /* expectedSetUpCallCount */, testKubelet.volumePlugin))
// Remove pod
kubelet.podManager.SetPods([]*api.Pod{})
assert.NoError(t, waitForVolumeUnmount(kubelet.volumeManager, pod))
// Verify volumes unmounted
podVolumes = kubelet.volumeManager.GetMountedVolumesForPod(
volumehelper.GetUniquePodName(pod))
assert.Len(t, podVolumes, 0,
"Expected volumes to be unmounted and detached. But some volumes are still mounted: %#v", podVolumes)
assert.NoError(t, volumetest.VerifyTearDownCallCount(
1 /* expectedTearDownCallCount */, testKubelet.volumePlugin))
// Verify volumes detached and no longer reported as in use
assert.NoError(t, waitForVolumeDetach(api.UniqueVolumeName("fake/vol1"), kubelet.volumeManager))
assert.True(t, testKubelet.volumePlugin.GetNewAttacherCallCount() >= 1, "Expected plugin NewAttacher to be called at least once")
assert.NoError(t, volumetest.VerifyZeroDetachCallCount(testKubelet.volumePlugin))
}
type stubVolume struct {
path string
volume.MetricsNil
}
func (f *stubVolume) GetPath() string {
return f.path
}
func (f *stubVolume) GetAttributes() volume.Attributes {
return volume.Attributes{}
}
func (f *stubVolume) SetUp(fsGroup *int64) error {
return nil
}
func (f *stubVolume) SetUpAt(dir string, fsGroup *int64) error {
return nil
}