Report events to apiserver in local volume plugin.

- Add VolumeHost.GetEventRecorder() method
- Add related e2e tests
pull/8/head
Yecheng Fu 2018-04-16 20:39:23 +08:00
parent 2ef566d0c3
commit 55ef18ad42
12 changed files with 120 additions and 25 deletions

View File

@ -607,3 +607,7 @@ func (adc *attachDetachController) GetNodeLabels() (map[string]string, error) {
func (adc *attachDetachController) GetNodeName() types.NodeName {
return ""
}
func (adc *attachDetachController) GetEventRecorder() record.EventRecorder {
return adc.recorder
}

View File

@ -298,3 +298,7 @@ func (expc *expandController) GetNodeLabels() (map[string]string, error) {
func (expc *expandController) GetNodeName() types.NodeName {
return ""
}
func (expc *expandController) GetEventRecorder() record.EventRecorder {
return expc.recorder
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
@ -112,3 +113,7 @@ func (ctrl *PersistentVolumeController) GetNodeLabels() (map[string]string, erro
func (ctrl *PersistentVolumeController) GetNodeName() types.NodeName {
return ""
}
func (ctrl *PersistentVolumeController) GetEventRecorder() record.EventRecorder {
return ctrl.eventRecorder
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/apimachinery/pkg/types"
utilfeature "k8s.io/apiserver/pkg/util/feature"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/configmap"
@ -198,6 +199,10 @@ func (kvh *kubeletVolumeHost) GetNodeName() types.NodeName {
return kvh.kubelet.nodeName
}
func (kvh *kubeletVolumeHost) GetEventRecorder() record.EventRecorder {
return kvh.kubelet.recorder
}
func (kvh *kubeletVolumeHost) GetExec(pluginName string) mount.Exec {
exec, err := kvh.getMountExec(pluginName)
if err != nil {

View File

@ -63,6 +63,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/validation:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -20,7 +20,6 @@ go_library(
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/client-go/kubernetes/scheme:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
],
)

View File

@ -27,7 +27,6 @@ import (
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/util/keymutex"
@ -60,10 +59,7 @@ const (
func (plugin *localVolumePlugin) Init(host volume.VolumeHost) error {
plugin.host = host
plugin.volumeLocks = keymutex.NewKeyMutex()
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "localvolume"})
plugin.recorder = recorder
plugin.recorder = host.GetEventRecorder()
return nil
}

View File

@ -30,6 +30,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/validation"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
"k8s.io/kubernetes/pkg/util/mount"
@ -313,6 +314,9 @@ type VolumeHost interface {
// Returns the name of the node
GetNodeName() types.NodeName
// Returns the event recorder of kubelet.
GetEventRecorder() record.EventRecorder
}
// VolumePluginMgr tracks registered plugins.

View File

@ -28,6 +28,7 @@ go_library(
"//vendor/k8s.io/apimachinery/pkg/types:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//vendor/k8s.io/client-go/kubernetes:go_default_library",
"//vendor/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/client-go/util/testing:go_default_library",
],
)

View File

@ -33,6 +33,7 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/uuid"
clientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/record"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/util/io"
@ -189,6 +190,10 @@ func (f *fakeVolumeHost) GetNodeName() types.NodeName {
return types.NodeName(f.nodeName)
}
func (f *fakeVolumeHost) GetEventRecorder() record.EventRecorder {
return nil
}
func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
if _, ok := config.OtherAttributes["fake-property"]; ok {
return []VolumePlugin{

View File

@ -901,12 +901,16 @@ func MakeNginxPod(ns string, nodeSelector map[string]string, pvclaims []*v1.Pers
// Returns a pod definition based on the namespace. The pod references the PVC's
// name. A slice of BASH commands can be supplied as args to be run by the pod.
// SELinux testing requires to pass HostIPC and HostPID as booleansi arguments.
func MakeSecPod(ns string, pvclaims []*v1.PersistentVolumeClaim, isPrivileged bool, command string, hostIPC bool, hostPID bool, seLinuxLabel *v1.SELinuxOptions) *v1.Pod {
func MakeSecPod(ns string, pvclaims []*v1.PersistentVolumeClaim, isPrivileged bool, command string, hostIPC bool, hostPID bool, seLinuxLabel *v1.SELinuxOptions, fsGroup *int64) *v1.Pod {
if len(command) == 0 {
command = "while true; do sleep 1; done"
}
podName := "security-context-" + string(uuid.NewUUID())
fsGroup := int64(1000)
if fsGroup == nil {
fsGroup = func(i int64) *int64 {
return &i
}(1000)
}
podSpec := &v1.Pod{
TypeMeta: metav1.TypeMeta{
Kind: "Pod",
@ -920,7 +924,7 @@ func MakeSecPod(ns string, pvclaims []*v1.PersistentVolumeClaim, isPrivileged bo
HostIPC: hostIPC,
HostPID: hostPID,
SecurityContext: &v1.PodSecurityContext{
FSGroup: &fsGroup,
FSGroup: fsGroup,
},
Containers: []v1.Container{
{
@ -996,8 +1000,8 @@ func CreateNginxPod(client clientset.Interface, namespace string, nodeSelector m
}
// create security pod with given claims
func CreateSecPod(client clientset.Interface, namespace string, pvclaims []*v1.PersistentVolumeClaim, isPrivileged bool, command string, hostIPC bool, hostPID bool, seLinuxLabel *v1.SELinuxOptions) (*v1.Pod, error) {
pod := MakeSecPod(namespace, pvclaims, isPrivileged, command, hostIPC, hostPID, seLinuxLabel)
func CreateSecPod(client clientset.Interface, namespace string, pvclaims []*v1.PersistentVolumeClaim, isPrivileged bool, command string, hostIPC bool, hostPID bool, seLinuxLabel *v1.SELinuxOptions, fsGroup *int64) (*v1.Pod, error) {
pod := MakeSecPod(namespace, pvclaims, isPrivileged, command, hostIPC, hostPID, seLinuxLabel, fsGroup)
pod, err := client.CoreV1().Pods(namespace).Create(pod)
if err != nil {
return nil, fmt.Errorf("pod Create API error: %v", err)

View File

@ -227,7 +227,7 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
BeforeEach(func() {
By("Creating pod1")
pod1, pod1Err = createLocalPod(config, testVol)
pod1, pod1Err = createLocalPod(config, testVol, nil)
Expect(pod1Err).NotTo(HaveOccurred())
verifyLocalPod(config, testVol, pod1, config.node0.Name)
})
@ -265,6 +265,64 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
})
})
Context("Set fsGroup for local volume", func() {
It("should set fsGroup for one pod", func() {
By("Checking fsGroup is set")
pod := createPodWithFsGroupTest(config, testVol, 1234, 1234)
By("Deleting pod")
framework.DeletePodOrFail(config.client, config.ns, pod.Name)
})
It("should set same fsGroup for two pods simultaneously", func() {
fsGroup := int64(1234)
By("Create first pod and check fsGroup is set")
pod1 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
By("Create second pod with same fsGroup and check fsGroup is correct")
pod2 := createPodWithFsGroupTest(config, testVol, fsGroup, fsGroup)
By("Deleting first pod")
framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
By("Deleting second pod")
framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
})
It("should set different fsGroup for second pod if first pod is deleted", func() {
fsGroup1, fsGroup2 := int64(1234), int64(4321)
By("Create first pod and check fsGroup is set")
pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1)
By("Deleting first pod")
err := framework.DeletePodWithWait(f, config.client, pod1)
Expect(err).NotTo(HaveOccurred(), "while deleting first pod")
By("Create second pod and check fsGroup is the new one")
pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup2)
By("Deleting second pod")
framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
})
It("should not set different fsGroups for two pods simultaneously", func() {
if testVolType == DirectoryLocalVolumeType {
// TODO(cofyc): Test it when bug is fixed.
framework.Skipf("Skipped when volume type is %v", testVolType)
}
fsGroup1, fsGroup2 := int64(1234), int64(4321)
By("Create first pod and check fsGroup is set")
pod1 := createPodWithFsGroupTest(config, testVol, fsGroup1, fsGroup1)
By("Create second pod and check fsGroup is still the old one")
pod2 := createPodWithFsGroupTest(config, testVol, fsGroup2, fsGroup1)
ep := &eventPatterns{
reason: "AlreadyMountedVolume",
pattern: make([]string, 2),
}
ep.pattern = append(ep.pattern, fmt.Sprintf("The requested fsGroup is %d", fsGroup2))
ep.pattern = append(ep.pattern, "The volume may not be shareable.")
checkPodEvents(config, pod2.Name, ep)
By("Deleting first pod")
framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
By("Deleting second pod")
framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
})
})
})
}
@ -286,7 +344,7 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
}
By("Creating local PVC and PV")
createLocalPVCsPVs(config, []*localTestVolume{testVol}, immediateMode)
pod, err := createLocalPod(config, testVol)
pod, err := createLocalPod(config, testVol, nil)
Expect(err).To(HaveOccurred())
checkPodEvents(config, pod.Name, ep)
cleanupLocalPVCsPVs(config, []*localTestVolume{testVol})
@ -567,7 +625,7 @@ var _ = utils.SIGDescribe("PersistentVolumes-local ", func() {
pvcs = append(pvcs, pvc)
}
pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel)
pod := framework.MakeSecPod(config.ns, pvcs, false, "sleep 1", false, false, selinuxLabel, nil)
pod, err := config.client.CoreV1().Pods(config.ns).Create(pod)
Expect(err).NotTo(HaveOccurred())
pods[pod.Name] = pod
@ -682,7 +740,7 @@ func checkPodEvents(config *localTestConfig, podName string, ep *eventPatterns)
// Test two pods at the same time, write from pod1, and read from pod2
func twoPodsReadWriteTest(config *localTestConfig, testVol *localTestVolume) {
By("Creating pod1 to write to the PV")
pod1, pod1Err := createLocalPod(config, testVol)
pod1, pod1Err := createLocalPod(config, testVol, nil)
Expect(pod1Err).NotTo(HaveOccurred())
verifyLocalPod(config, testVol, pod1, config.node0.Name)
@ -690,7 +748,7 @@ func twoPodsReadWriteTest(config *localTestConfig, testVol *localTestVolume) {
testReadFileContent(volumeDir, testFile, testFileContent, pod1, testVol.localVolumeType)
By("Creating pod2 to read from the PV")
pod2, pod2Err := createLocalPod(config, testVol)
pod2, pod2Err := createLocalPod(config, testVol, nil)
Expect(pod2Err).NotTo(HaveOccurred())
verifyLocalPod(config, testVol, pod2, config.node0.Name)
@ -714,7 +772,7 @@ func twoPodsReadWriteTest(config *localTestConfig, testVol *localTestVolume) {
// Test two pods one after other, write from pod1, and read from pod2
func twoPodsReadWriteSerialTest(config *localTestConfig, testVol *localTestVolume) {
By("Creating pod1")
pod1, pod1Err := createLocalPod(config, testVol)
pod1, pod1Err := createLocalPod(config, testVol, nil)
Expect(pod1Err).NotTo(HaveOccurred())
verifyLocalPod(config, testVol, pod1, config.node0.Name)
@ -730,7 +788,7 @@ func twoPodsReadWriteSerialTest(config *localTestConfig, testVol *localTestVolum
framework.DeletePodOrFail(config.client, config.ns, pod1.Name)
By("Creating pod2")
pod2, pod2Err := createLocalPod(config, testVol)
pod2, pod2Err := createLocalPod(config, testVol, nil)
Expect(pod2Err).NotTo(HaveOccurred())
verifyLocalPod(config, testVol, pod2, config.node0.Name)
@ -741,6 +799,15 @@ func twoPodsReadWriteSerialTest(config *localTestConfig, testVol *localTestVolum
framework.DeletePodOrFail(config.client, config.ns, pod2.Name)
}
// Test creating pod with fsGroup, and check fsGroup is expected fsGroup.
func createPodWithFsGroupTest(config *localTestConfig, testVol *localTestVolume, fsGroup int64, expectedFsGroup int64) *v1.Pod {
pod, err := createLocalPod(config, testVol, &fsGroup)
framework.ExpectNoError(err)
_, err = framework.LookForStringInPodExec(config.ns, pod.Name, []string{"stat", "-c", "%g", volumeDir}, strconv.FormatInt(expectedFsGroup, 10), time.Second*3)
Expect(err).NotTo(HaveOccurred(), "failed to get expected fsGroup %d on directory %s in pod %s", fsGroup, volumeDir, pod.Name)
return pod
}
func setupStorageClass(config *localTestConfig, mode *storagev1.VolumeBindingMode) {
sc := &storagev1.StorageClass{
ObjectMeta: metav1.ObjectMeta{
@ -986,7 +1053,7 @@ func createLocalPVCsPVs(config *localTestConfig, volumes []*localTestVolume, mod
}
func makeLocalPod(config *localTestConfig, volume *localTestVolume, cmd string) *v1.Pod {
pod := framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, cmd, false, false, selinuxLabel)
pod := framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, cmd, false, false, selinuxLabel, nil)
if pod == nil {
return pod
}
@ -998,7 +1065,7 @@ func makeLocalPod(config *localTestConfig, volume *localTestVolume, cmd string)
}
func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel)
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
if pod == nil {
return
}
@ -1024,7 +1091,7 @@ func makeLocalPodWithNodeAffinity(config *localTestConfig, volume *localTestVolu
}
func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel)
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
if pod == nil {
return
}
@ -1036,7 +1103,7 @@ func makeLocalPodWithNodeSelector(config *localTestConfig, volume *localTestVolu
}
func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume, nodeName string) (pod *v1.Pod) {
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel)
pod = framework.MakeSecPod(config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, nil)
if pod == nil {
return
}
@ -1046,7 +1113,7 @@ func makeLocalPodWithNodeName(config *localTestConfig, volume *localTestVolume,
// createSecPod should be used when Pod requires non default SELinux labels
func createSecPod(config *localTestConfig, volume *localTestVolume, hostIPC bool, hostPID bool, seLinuxLabel *v1.SELinuxOptions) (*v1.Pod, error) {
pod, err := framework.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", hostIPC, hostPID, seLinuxLabel)
pod, err := framework.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", hostIPC, hostPID, seLinuxLabel, nil)
podNodeName, podNodeNameErr := podNodeName(config, pod)
Expect(podNodeNameErr).NotTo(HaveOccurred())
framework.Logf("Security Context POD %q created on Node %q", pod.Name, podNodeName)
@ -1054,9 +1121,9 @@ func createSecPod(config *localTestConfig, volume *localTestVolume, hostIPC bool
return pod, err
}
func createLocalPod(config *localTestConfig, volume *localTestVolume) (*v1.Pod, error) {
func createLocalPod(config *localTestConfig, volume *localTestVolume, fsGroup *int64) (*v1.Pod, error) {
By("Creating a pod")
return framework.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel)
return framework.CreateSecPod(config.client, config.ns, []*v1.PersistentVolumeClaim{volume.pvc}, false, "", false, false, selinuxLabel, fsGroup)
}
func createAndMountTmpfsLocalVolume(config *localTestConfig, dir string, node *v1.Node) {