Merge pull request #11561 from saad-ali/issue11231

Fix GCE PD attach/detach issues
pull/6/head
Vish Kannan 2015-07-23 14:53:45 -07:00
commit cc326c714b
5 changed files with 352 additions and 201 deletions

View File

@ -37,6 +37,9 @@ type OperationManager interface {
// Attempts to send msg to the channel associated with ID.
// Returns an error if no associated channel exists.
Send(id string, msg interface{}) error
// Returns true if an entry with the specified ID already exists.
Exists(id string) bool
}
// Returns a new instance of a channel manager.
@ -90,3 +93,11 @@ func (cm *operationManager) Send(id string, msg interface{}) error {
cm.chanMap[id] <- msg
return nil
}
// Returns true if an entry with the specified ID already exists.
func (cm *operationManager) Exists(id string) (exists bool) {
cm.RLock()
defer cm.RUnlock()
_, exists = cm.chanMap[id]
return
}

View File

@ -32,15 +32,10 @@ func TestStart(t *testing.T) {
sigErr := cm.Send(chanId, testMsg)
// Assert
if startErr != nil {
t.Fatalf("Unexpected error on Start. Expected: <no error> Actual: <%v>", startErr)
}
if sigErr != nil {
t.Fatalf("Unexpected error on Send. Expected: <no error> Actual: <%v>", sigErr)
}
if actual := <-ch; actual != testMsg {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg, actual)
}
verifyNoError(t, startErr, "Start")
verifyNoError(t, sigErr, "Send")
actualMsg := <-ch
verifyMsg(t, testMsg /* expected */, actualMsg.(string) /* actual */)
}
func TestStartIdExists(t *testing.T) {
@ -53,12 +48,8 @@ func TestStartIdExists(t *testing.T) {
_, startErr2 := cm.Start(chanId, 1 /* bufferSize */)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 == nil {
t.Fatalf("Expected error on Start2. Expected: <id already exists error> Actual: <no error>")
}
verifyNoError(t, startErr1, "Start1")
verifyError(t, startErr2, "Start2")
}
func TestStartAndAdd2Chans(t *testing.T) {
@ -76,25 +67,14 @@ func TestStartAndAdd2Chans(t *testing.T) {
sigErr2 := cm.Send(chanId2, testMsg2)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
verifyNoError(t, startErr1, "Start1")
verifyNoError(t, startErr2, "Start2")
verifyNoError(t, sigErr1, "Send1")
verifyNoError(t, sigErr2, "Send2")
actualMsg1 := <-ch1
actualMsg2 := <-ch2
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
}
func TestStartAndAdd2ChansAndClose(t *testing.T) {
@ -114,26 +94,66 @@ func TestStartAndAdd2ChansAndClose(t *testing.T) {
sigErr3 := cm.Send(chanId1, testMsg1)
// Assert
if startErr1 != nil {
t.Fatalf("Unexpected error on Start1. Expected: <no error> Actual: <%v>", startErr1)
}
if startErr2 != nil {
t.Fatalf("Unexpected error on Start2. Expected: <no error> Actual: <%v>", startErr2)
}
if sigErr1 != nil {
t.Fatalf("Unexpected error on Send1. Expected: <no error> Actual: <%v>", sigErr1)
}
if sigErr2 != nil {
t.Fatalf("Unexpected error on Send2. Expected: <no error> Actual: <%v>", sigErr2)
}
if sigErr3 == nil {
t.Fatalf("Expected error on Send3. Expected: <error> Actual: <no error>", sigErr2)
}
if actual := <-ch1; actual != testMsg1 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg1, actual)
}
if actual := <-ch2; actual != testMsg2 {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", testMsg2, actual)
}
verifyNoError(t, startErr1, "Start1")
verifyNoError(t, startErr2, "Start2")
verifyNoError(t, sigErr1, "Send1")
verifyNoError(t, sigErr2, "Send2")
verifyError(t, sigErr3, "Send3")
actualMsg1 := <-ch1
actualMsg2 := <-ch2
verifyMsg(t, testMsg1 /* expected */, actualMsg1.(string) /* actual */)
verifyMsg(t, testMsg2 /* expected */, actualMsg2.(string) /* actual */)
}
func TestExists(t *testing.T) {
// Arrange
cm := NewOperationManager()
chanId1 := "testChanId1"
chanId2 := "testChanId2"
// Act & Assert
verifyExists(t, cm, chanId1, false /* expected */)
verifyExists(t, cm, chanId2, false /* expected */)
_, startErr1 := cm.Start(chanId1, 1 /* bufferSize */)
verifyNoError(t, startErr1, "Start1")
verifyExists(t, cm, chanId1, true /* expected */)
verifyExists(t, cm, chanId2, false /* expected */)
_, startErr2 := cm.Start(chanId2, 1 /* bufferSize */)
verifyNoError(t, startErr2, "Start2")
verifyExists(t, cm, chanId1, true /* expected */)
verifyExists(t, cm, chanId2, true /* expected */)
cm.Close(chanId1)
verifyExists(t, cm, chanId1, false /* expected */)
verifyExists(t, cm, chanId2, true /* expected */)
cm.Close(chanId2)
verifyExists(t, cm, chanId1, false /* expected */)
verifyExists(t, cm, chanId2, false /* expected */)
}
func verifyExists(t *testing.T, cm OperationManager, id string, expected bool) {
if actual := cm.Exists(id); expected != actual {
t.Fatalf("Unexpected Exists(%q) response. Expected: <%v> Actual: <%v>", id, expected, actual)
}
}
func verifyNoError(t *testing.T, err error, name string) {
if err != nil {
t.Fatalf("Unexpected response on %q. Expected: <no error> Actual: <%v>", name, err)
}
}
func verifyError(t *testing.T, err error, name string) {
if err == nil {
t.Fatalf("Unexpected response on %q. Expected: <error> Actual: <no error>")
}
}
func verifyMsg(t *testing.T, expected, actual string) {
if actual != expected {
t.Fatalf("Unexpected testMsg value. Expected: <%v> Actual: <%v>", expected, actual)
}
}

View File

@ -43,6 +43,7 @@ const (
maxChecks = 10
maxRetries = 10
checkSleepDuration = time.Second
errorSleepDuration = 5 * time.Second
)
// Singleton operation manager for managing detach clean up go routines
@ -54,24 +55,17 @@ type GCEDiskUtil struct{}
// Mounts the disk to it's global path.
func (diskUtil *GCEDiskUtil) AttachAndMountDisk(pd *gcePersistentDisk, globalPDPath string) error {
glog.V(5).Infof("AttachAndMountDisk(pd, %q) where pd is %#v\r\n", globalPDPath, pd)
// Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered
// Block execution until any pending detach goroutines for this pd have completed
detachCleanupManager.Send(pd.pdName, true)
sdBefore, err := filepath.Glob(diskSDPattern)
if err != nil {
glog.Errorf("Error filepath.Glob(\"%s\"): %v\r\n", diskSDPattern, err)
}
sdBeforeSet := util.NewStringSet(sdBefore...)
gce, err := cloudprovider.GetCloudProvider("gce", nil)
if err != nil {
return err
}
if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil {
return err
}
devicePath, err := verifyAttached(pd, sdBeforeSet, gce)
devicePath, err := attachDiskAndVerify(pd, sdBeforeSet)
if err != nil {
return err
}
@ -108,33 +102,51 @@ func (util *GCEDiskUtil) DetachDisk(pd *gcePersistentDisk) error {
globalPDPath := makeGlobalPDName(pd.plugin.host, pd.pdName)
glog.V(5).Infof("DetachDisk(pd) where pd is %#v and the globalPDPath is %q\r\n", pd, globalPDPath)
// Terminate any in progress verify detach go routines, this will block until the goroutine is ready to exit because the channel is unbuffered
detachCleanupManager.Send(pd.pdName, true)
if err := pd.mounter.Unmount(globalPDPath); err != nil {
return err
}
if err := os.Remove(globalPDPath); err != nil {
return err
}
// Detach the disk
gce, err := cloudprovider.GetCloudProvider("gce", nil)
if err != nil {
return err
}
if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil {
return err
if detachCleanupManager.Exists(pd.pdName) {
glog.Warningf("Terminating new DetachDisk call for GCE PD %q. A previous detach call for this PD is still pending.", pd.pdName)
return nil
}
// Verify disk detached, retry if needed.
go verifyDetached(pd, gce)
// Detach disk, retry if needed.
go detachDiskAndVerify(pd)
return nil
}
// Verifys the disk device to be created has been succesffully attached, and retries if it fails.
func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloudprovider.Interface) (string, error) {
// Attaches the specified persistent disk device to node, verifies that it is attached, and retries if it fails.
func attachDiskAndVerify(pd *gcePersistentDisk, sdBeforeSet util.StringSet) (string, error) {
devicePaths := getDiskByIdPaths(pd)
var gce cloudprovider.Interface
for numRetries := 0; numRetries < maxRetries; numRetries++ {
if gce == nil {
var err error
gce, err = cloudprovider.GetCloudProvider("gce", nil)
if err != nil || gce == nil {
// Retry on error. See issue #11321
glog.Errorf("Error getting GCECloudProvider while attaching PD %q: %v", pd.pdName, err)
gce = nil
time.Sleep(errorSleepDuration)
continue
}
}
if numRetries > 0 {
glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName)
}
if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil {
// Retry on error. See issue #11321. Continue and verify if disk is attached, because a
// previous attach operation may still succeed.
glog.Errorf("Error attaching PD %q: %v", pd.pdName, err)
}
for numChecks := 0; numChecks < maxChecks; numChecks++ {
if err := udevadmChangeToNewDrives(sdBeforeSet); err != nil {
// udevadm errors should not block disk attachment, log and continue
@ -143,84 +155,107 @@ func verifyAttached(pd *gcePersistentDisk, sdBeforeSet util.StringSet, gce cloud
for _, path := range devicePaths {
if pathExists, err := pathExists(path); err != nil {
return "", err
// Retry on error. See issue #11321
glog.Errorf("Error checking if path exists: %v", err)
} else if pathExists {
// A device path has succesfully been created for the PD
glog.V(5).Infof("Succesfully attached GCE PD %q.", pd.pdName)
glog.Infof("Succesfully attached GCE PD %q.", pd.pdName)
return path, nil
}
}
// Sleep then check again
glog.V(5).Infof("Waiting for GCE PD %q to attach.", pd.pdName)
glog.V(3).Infof("Waiting for GCE PD %q to attach.", pd.pdName)
time.Sleep(checkSleepDuration)
}
// Try attaching the disk again
glog.Warningf("Timed out waiting for GCE PD %q to attach. Retrying attach.", pd.pdName)
if err := gce.(*gce_cloud.GCECloud).AttachDisk(pd.pdName, pd.readOnly); err != nil {
return "", err
}
}
return "", fmt.Errorf("Could not attach GCE PD %q. Timeout waiting for mount paths to be created.", pd.pdName)
}
// Veify the specified persistent disk device has been succesfully detached, and retries if it fails.
// Detaches the specified persistent disk device from node, verifies that it is detached, and retries if it fails.
// This function is intended to be called asynchronously as a go routine.
func verifyDetached(pd *gcePersistentDisk, gce cloudprovider.Interface) {
// It starts the detachCleanupManager with the specified pdName so that callers can wait for completion.
func detachDiskAndVerify(pd *gcePersistentDisk) {
glog.V(5).Infof("detachDiskAndVerify for pd %q.", pd.pdName)
defer util.HandleCrash()
// Setting bufferSize to 0 so that when senders send, they are blocked until we recieve. This avoids the need to have a separate exit check.
// Start operation, so that other threads can wait on this detach operation.
// Set bufferSize to 0 so senders are blocked on send until we recieve.
ch, err := detachCleanupManager.Start(pd.pdName, 0 /* bufferSize */)
if err != nil {
glog.Errorf("Error adding %q to detachCleanupManager: %v", pd.pdName, err)
return
}
defer detachCleanupManager.Close(pd.pdName)
devicePaths := getDiskByIdPaths(pd)
for numRetries := 0; numRetries < maxRetries; numRetries++ {
for numChecks := 0; numChecks < maxChecks; numChecks++ {
defer func() {
// Unblock any callers that have been waiting for this detach routine to complete.
for {
select {
case <-ch:
glog.Warningf("Terminating GCE PD %q detach verification. Another attach/detach call was made for this PD.", pd.pdName)
return
glog.V(5).Infof("detachDiskAndVerify for pd %q clearing chan.", pd.pdName)
default:
allPathsRemoved := true
for _, path := range devicePaths {
if err := udevadmChangeToDrive(path); err != nil {
// udevadm errors should not block disk detachment, log and continue
glog.Errorf("%v", err)
}
if exists, err := pathExists(path); err != nil {
glog.Errorf("Error check path: %v", err)
return
} else {
allPathsRemoved = allPathsRemoved && !exists
}
}
if allPathsRemoved {
// All paths to the PD have been succefully removed
glog.V(5).Infof("Succesfully detached GCE PD %q.", pd.pdName)
return
}
glog.V(5).Infof("detachDiskAndVerify for pd %q done clearing chans.", pd.pdName)
return
}
}
}()
// Sleep then check again
glog.V(5).Infof("Waiting for GCE PD %q to detach.", pd.pdName)
time.Sleep(checkSleepDuration)
devicePaths := getDiskByIdPaths(pd)
var gce cloudprovider.Interface
for numRetries := 0; numRetries < maxRetries; numRetries++ {
if gce == nil {
var err error
gce, err = cloudprovider.GetCloudProvider("gce", nil)
if err != nil || gce == nil {
// Retry on error. See issue #11321
glog.Errorf("Error getting GCECloudProvider while detaching PD %q: %v", pd.pdName, err)
gce = nil
time.Sleep(errorSleepDuration)
continue
}
}
// Try detaching disk again
glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName)
if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil {
glog.Errorf("Error on retry detach PD %q: %v", pd.pdName, err)
return
if numRetries > 0 {
glog.Warningf("Timed out waiting for GCE PD %q to detach. Retrying detach.", pd.pdName)
}
if err := gce.(*gce_cloud.GCECloud).DetachDisk(pd.pdName); err != nil {
// Retry on error. See issue #11321. Continue and verify if disk is detached, because a
// previous detach operation may still succeed.
glog.Errorf("Error detaching PD %q: %v", pd.pdName, err)
}
for numChecks := 0; numChecks < maxChecks; numChecks++ {
allPathsRemoved := true
for _, path := range devicePaths {
if err := udevadmChangeToDrive(path); err != nil {
// udevadm errors should not block disk detachment, log and continue
glog.Errorf("%v", err)
}
if exists, err := pathExists(path); err != nil {
// Retry on error. See issue #11321
glog.Errorf("Error checking if path exists: %v", err)
} else {
allPathsRemoved = allPathsRemoved && !exists
}
}
if allPathsRemoved {
// All paths to the PD have been succefully removed
glog.Infof("Succesfully detached GCE PD %q.", pd.pdName)
return
}
// Sleep then check again
glog.V(3).Infof("Waiting for GCE PD %q to detach.", pd.pdName)
time.Sleep(checkSleepDuration)
}
}
glog.Errorf("Could not detach GCE PD %q. One or more mount paths was not removed.", pd.pdName)
glog.Errorf("Failed to detach GCE PD %q. One or more mount paths was not removed.", pd.pdName)
}
// Returns list of all /dev/disk/by-id/* paths for given PD.
@ -326,7 +361,7 @@ func (mounter *gceSafeFormatAndMount) Mount(source string, target string, fstype
cmd := mounter.runner.Command("/usr/share/google/safe_format_and_mount", args...)
dataOut, err := cmd.CombinedOutput()
if err != nil {
glog.V(5).Infof("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut))
glog.Errorf("error running /usr/share/google/safe_format_and_mount\n%s", string(dataOut))
}
return err
}

View File

@ -17,7 +17,9 @@ limitations under the License.
package e2e
import (
"bytes"
"fmt"
"strings"
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -155,3 +157,50 @@ func (f *Framework) WaitForAnEndpoint(serviceName string) error {
}
}
}
// Write a file using kubectl exec echo <contents> > <path> via specified container
// Because of the primitive technique we're using here, we only allow ASCII alphanumeric characters
func (f *Framework) WriteFileViaContainer(podName, containerName string, path string, contents string) error {
By("writing a file in the container")
allowedCharacters := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
for _, c := range contents {
if !strings.ContainsRune(allowedCharacters, c) {
return fmt.Errorf("Unsupported character in string to write: %v", c)
}
}
command := fmt.Sprintf("echo '%s' > '%s'", contents, path)
stdout, stderr, err := kubectlExec(f.Namespace.Name, podName, containerName, "--", "/bin/sh", "-c", command)
if err != nil {
Logf("error running kubectl exec to write file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return err
}
// Read a file using kubectl exec cat <path>
func (f *Framework) ReadFileViaContainer(podName, containerName string, path string) (string, error) {
By("reading a file in the container")
stdout, stderr, err := kubectlExec(f.Namespace.Name, podName, containerName, "--", "cat", path)
if err != nil {
Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return string(stdout), err
}
func kubectlExec(namespace string, podName, containerName string, args ...string) ([]byte, []byte, error) {
var stdout, stderr bytes.Buffer
cmdArgs := []string{
"exec",
fmt.Sprintf("--namespace=%v", namespace),
podName,
fmt.Sprintf("-c=%v", containerName),
}
cmdArgs = append(cmdArgs, args...)
cmd := kubectlCmd(cmdArgs...)
cmd.Stdout, cmd.Stderr = &stdout, &stderr
Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args, " "))
err := cmd.Run()
return stdout.Bytes(), stderr.Bytes(), err
}

View File

@ -23,9 +23,9 @@ import (
"strings"
"time"
"bytes"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/aws"
"github.com/GoogleCloudPlatform/kubernetes/pkg/fields"
@ -55,6 +55,8 @@ var _ = Describe("Pod Disks", func() {
host0Name = nodes.Items[0].ObjectMeta.Name
host1Name = nodes.Items[1].ObjectMeta.Name
math_rand.Seed(time.Now().UTC().UnixNano())
})
It("should schedule a pod w/ a RW PD, remove it, then schedule it on another host", func() {
@ -64,8 +66,8 @@ var _ = Describe("Pod Disks", func() {
diskName, err := createPD()
expectNoError(err, "Error creating PD")
host0Pod := testPDPod(diskName, host0Name, false)
host1Pod := testPDPod(diskName, host1Name, false)
host0Pod := testPDPod(diskName, host0Name, false /* readOnly */, 1 /* numContainers */)
host1Pod := testPDPod(diskName, host1Name, false /* readOnly */, 1 /* numContainers */)
defer func() {
By("cleaning up PD-RW test environment")
@ -87,7 +89,7 @@ var _ = Describe("Pod Disks", func() {
testFile := "/testpd/tracker"
testFileContents := fmt.Sprintf("%v", math_rand.Int())
expectNoError(writeFileOnPod(framework.Client, host0Pod.Name, testFile, testFileContents))
expectNoError(framework.WriteFileViaContainer(host0Pod.Name, "testpd" /* containerName */, testFile, testFileContents))
Logf("Wrote value: %v", testFileContents)
By("deleting host0Pod")
@ -99,7 +101,7 @@ var _ = Describe("Pod Disks", func() {
expectNoError(framework.WaitForPodRunning(host1Pod.Name))
v, err := readFileOnPod(framework.Client, host1Pod.Name, testFile)
v, err := framework.ReadFileViaContainer(host1Pod.Name, "testpd", testFile)
expectNoError(err)
Logf("Read value: %v", v)
@ -109,15 +111,7 @@ var _ = Describe("Pod Disks", func() {
expectNoError(podClient.Delete(host1Pod.Name, nil), "Failed to delete host1Pod")
By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds (%v)", err)
continue
}
Logf("Deleted PD %v", diskName)
break
}
expectNoError(err, "Error deleting PD")
deletePDWithRetry(diskName)
return
})
@ -129,9 +123,9 @@ var _ = Describe("Pod Disks", func() {
diskName, err := createPD()
expectNoError(err, "Error creating PD")
rwPod := testPDPod(diskName, host0Name, false)
host0ROPod := testPDPod(diskName, host0Name, true)
host1ROPod := testPDPod(diskName, host1Name, true)
rwPod := testPDPod(diskName, host0Name, false /* readOnly */, 1 /* numContainers */)
host0ROPod := testPDPod(diskName, host0Name, true /* readOnly */, 1 /* numContainers */)
host1ROPod := testPDPod(diskName, host1Name, true /* readOnly */, 1 /* numContainers */)
defer func() {
By("cleaning up PD-RO test environment")
@ -171,58 +165,89 @@ var _ = Describe("Pod Disks", func() {
expectNoError(podClient.Delete(host1ROPod.Name, nil), "Failed to delete host1ROPod")
By(fmt.Sprintf("deleting PD %q", diskName))
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD. Sleeping 5 seconds")
continue
}
Logf("Successfully deleted PD %q", diskName)
break
}
deletePDWithRetry(diskName)
expectNoError(err, "Error deleting PD")
})
It("should schedule a pod w/ a RW PD shared between multiple containers, write to PD, delete pod, verify contents, and repeat in rapid succession", func() {
SkipUnlessProviderIs("gce", "gke", "aws")
By("creating PD")
diskName, err := createPD()
expectNoError(err, "Error creating PD")
numContainers := 4
host0Pod := testPDPod(diskName, host0Name, false /* readOnly */, numContainers)
defer func() {
By("cleaning up PD-RW test environment")
// Teardown pods, PD. Ignore errors.
// Teardown should do nothing unless test failed.
podClient.Delete(host0Pod.Name, nil)
detachPD(host0Name, diskName)
deletePD(diskName)
}()
fileAndContentToVerify := make(map[string]string)
for i := 0; i < 3; i++ {
Logf("PD Read/Writer Iteration #%v", i)
By("submitting host0Pod to kubernetes")
_, err = podClient.Create(host0Pod)
expectNoError(err, fmt.Sprintf("Failed to create host0Pod: %v", err))
expectNoError(framework.WaitForPodRunning(host0Pod.Name))
// randomly select a container and read/verify pd contents from it
containerName := fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1)
verifyPDContentsViaContainer(framework, host0Pod.Name, containerName, fileAndContentToVerify)
// Randomly select a container to write a file to PD from
containerName = fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1)
testFile := fmt.Sprintf("/testpd/tracker%v", i)
testFileContents := fmt.Sprintf("%v", math_rand.Int())
fileAndContentToVerify[testFile] = testFileContents
expectNoError(framework.WriteFileViaContainer(host0Pod.Name, containerName, testFile, testFileContents))
Logf("Wrote value: \"%v\" to PD %q from pod %q container %q", testFileContents, diskName, host0Pod.Name, containerName)
// Randomly select a container and read/verify pd contents from it
containerName = fmt.Sprintf("testpd%v", math_rand.Intn(numContainers)+1)
verifyPDContentsViaContainer(framework, host0Pod.Name, containerName, fileAndContentToVerify)
By("deleting host0Pod")
expectNoError(podClient.Delete(host0Pod.Name, nil), "Failed to delete host0Pod")
}
By(fmt.Sprintf("deleting PD %q", diskName))
deletePDWithRetry(diskName)
return
})
})
func kubectlExec(namespace string, podName string, args ...string) ([]byte, []byte, error) {
var stdout, stderr bytes.Buffer
cmdArgs := []string{"exec", fmt.Sprintf("--namespace=%v", namespace), podName}
cmdArgs = append(cmdArgs, args...)
cmd := kubectlCmd(cmdArgs...)
cmd.Stdout, cmd.Stderr = &stdout, &stderr
Logf("Running '%s %s'", cmd.Path, strings.Join(cmd.Args, " "))
err := cmd.Run()
return stdout.Bytes(), stderr.Bytes(), err
}
// Write a file using kubectl exec echo <contents> > <path>
// Because of the primitive technique we're using here, we only allow ASCII alphanumeric characters
func writeFileOnPod(c *client.Client, podName string, path string, contents string) error {
By("writing a file in the container")
allowedCharacters := "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"
for _, c := range contents {
if !strings.ContainsRune(allowedCharacters, c) {
return fmt.Errorf("Unsupported character in string to write: %v", c)
func deletePDWithRetry(diskName string) {
var err error
for start := time.Now(); time.Since(start) < 180*time.Second; time.Sleep(5 * time.Second) {
if err = deletePD(diskName); err != nil {
Logf("Couldn't delete PD %q. Sleeping 5 seconds (%v)", diskName, err)
continue
}
Logf("Deleted PD %v", diskName)
break
}
command := fmt.Sprintf("echo '%s' > '%s'", contents, path)
stdout, stderr, err := kubectlExec(api.NamespaceDefault, podName, "--", "/bin/sh", "-c", command)
if err != nil {
Logf("error running kubectl exec to write file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
}
return err
expectNoError(err, "Error deleting PD")
}
// Read a file using kubectl exec cat <path>
func readFileOnPod(c *client.Client, podName string, path string) (string, error) {
By("reading a file in the container")
stdout, stderr, err := kubectlExec(api.NamespaceDefault, podName, "--", "cat", path)
if err != nil {
Logf("error running kubectl exec to read file: %v\nstdout=%v\nstderr=%v)", err, string(stdout), string(stderr))
func verifyPDContentsViaContainer(f *Framework, podName, containerName string, fileAndContentToVerify map[string]string) {
for filePath, expectedContents := range fileAndContentToVerify {
v, err := f.ReadFileViaContainer(podName, containerName, filePath)
if err != nil {
Logf("Error reading file: %v", err)
}
expectNoError(err)
Logf("Read file %q with content: %v", filePath, v)
Expect(strings.TrimSpace(v)).To(Equal(strings.TrimSpace(expectedContents)))
}
return string(stdout), err
}
func createPD() (string, error) {
@ -284,7 +309,30 @@ func detachPD(hostName, pdName string) error {
}
}
func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
func testPDPod(diskName, targetHost string, readOnly bool, numContainers int) *api.Pod {
containers := make([]api.Container, numContainers)
for i := range containers {
containers[i].Name = "testpd"
if numContainers > 1 {
containers[i].Name = fmt.Sprintf("testpd%v", i+1)
}
containers[i].Image = "gcr.io/google_containers/busybox"
containers[i].Command = []string{"sleep", "6000"}
containers[i].VolumeMounts = []api.VolumeMount{
{
Name: "testpd",
MountPath: "/testpd",
},
}
containers[i].Resources.Limits = api.ResourceList{}
containers[i].Resources.Limits[api.ResourceCPU] = *resource.NewQuantity(int64(0), resource.DecimalSI)
}
pod := &api.Pod{
TypeMeta: api.TypeMeta{
Kind: "Pod",
@ -294,20 +342,8 @@ func testPDPod(diskName, targetHost string, readOnly bool) *api.Pod {
Name: "pd-test-" + string(util.NewUUID()),
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "testpd",
Image: "gcr.io/google_containers/busybox",
Command: []string{"sleep", "600"},
VolumeMounts: []api.VolumeMount{
{
Name: "testpd",
MountPath: "/testpd",
},
},
},
},
NodeName: targetHost,
Containers: containers,
NodeName: targetHost,
},
}