From a5842503eb6932f72c3e14f6bf47c6bc65a0a87f Mon Sep 17 00:00:00 2001 From: vikaschoudhary16 Date: Wed, 24 Jan 2018 12:06:07 -0500 Subject: [PATCH] Use probe based plugin discovery mechanism in device manager --- pkg/kubelet/BUILD | 1 + pkg/kubelet/cm/BUILD | 1 + pkg/kubelet/cm/container_manager.go | 2 + pkg/kubelet/cm/container_manager_linux.go | 5 + pkg/kubelet/cm/container_manager_stub.go | 7 + pkg/kubelet/cm/devicemanager/BUILD | 4 + .../cm/devicemanager/device_plugin_stub.go | 54 ++++++- pkg/kubelet/cm/devicemanager/endpoint_test.go | 2 +- pkg/kubelet/cm/devicemanager/manager.go | 132 ++++++++++----- pkg/kubelet/cm/devicemanager/manager_stub.go | 8 + pkg/kubelet/cm/devicemanager/manager_test.go | 152 ++++++++++++++---- pkg/kubelet/cm/devicemanager/types.go | 2 + pkg/kubelet/kubelet.go | 3 + test/e2e_node/device_plugin.go | 40 +++-- 14 files changed, 326 insertions(+), 87 deletions(-) diff --git a/pkg/kubelet/BUILD b/pkg/kubelet/BUILD index be3cb4fd18..c268a69f91 100644 --- a/pkg/kubelet/BUILD +++ b/pkg/kubelet/BUILD @@ -44,6 +44,7 @@ go_library( "//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library", + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", "//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", diff --git a/pkg/kubelet/cm/BUILD b/pkg/kubelet/cm/BUILD index e9318821e0..132c08b8b1 100644 --- a/pkg/kubelet/cm/BUILD +++ b/pkg/kubelet/cm/BUILD @@ -32,6 +32,7 @@ go_library( "//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/status:go_default_library", + "//pkg/kubelet/util/pluginwatcher:go_default_library", "//pkg/scheduler/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/cm/container_manager.go b/pkg/kubelet/cm/container_manager.go index 14a17396ab..60382616a1 100644 --- a/pkg/kubelet/cm/container_manager.go +++ b/pkg/kubelet/cm/container_manager.go @@ -28,6 +28,7 @@ import ( evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" "fmt" @@ -94,6 +95,7 @@ type ContainerManager interface { // GetPodCgroupRoot returns the cgroup which contains all pods. GetPodCgroupRoot() string + GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn } type NodeConfig struct { diff --git a/pkg/kubelet/cm/container_manager_linux.go b/pkg/kubelet/cm/container_manager_linux.go index fc27adb762..d2a9b9115f 100644 --- a/pkg/kubelet/cm/container_manager_linux.go +++ b/pkg/kubelet/cm/container_manager_linux.go @@ -52,6 +52,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" utilfile "k8s.io/kubernetes/pkg/util/file" "k8s.io/kubernetes/pkg/util/mount" @@ -600,6 +601,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node, return nil } +func (cm *containerManagerImpl) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn { + return cm.deviceManager.GetWatcherCallback() +} + // TODO: move the GetResources logic to PodContainerManager. func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { opts := &kubecontainer.RunContainerOptions{} diff --git a/pkg/kubelet/cm/container_manager_stub.go b/pkg/kubelet/cm/container_manager_stub.go index 8a464adc0d..ed21980849 100644 --- a/pkg/kubelet/cm/container_manager_stub.go +++ b/pkg/kubelet/cm/container_manager_stub.go @@ -27,6 +27,7 @@ import ( kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/status" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -76,6 +77,12 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList { return c } +func (cm *containerManagerStub) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn { + return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { + return nil, nil + } +} + func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { return nil, nil, []string{} } diff --git a/pkg/kubelet/cm/devicemanager/BUILD b/pkg/kubelet/cm/devicemanager/BUILD index 0152617407..719dbbd65e 100644 --- a/pkg/kubelet/cm/devicemanager/BUILD +++ b/pkg/kubelet/cm/devicemanager/BUILD @@ -15,6 +15,7 @@ go_library( deps = [ "//pkg/apis/core/v1/helper:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library", "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", @@ -22,6 +23,7 @@ go_library( "//pkg/kubelet/container:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/metrics:go_default_library", + "//pkg/kubelet/util/pluginwatcher:go_default_library", "//pkg/scheduler/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", @@ -40,8 +42,10 @@ go_test( embed = [":go_default_library"], deps = [ "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", + "//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/lifecycle:go_default_library", + "//pkg/kubelet/util/pluginwatcher:go_default_library", "//pkg/scheduler/cache:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", diff --git a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go index 73943039d9..62072ba555 100644 --- a/pkg/kubelet/cm/devicemanager/device_plugin_stub.go +++ b/pkg/kubelet/cm/devicemanager/device_plugin_stub.go @@ -28,12 +28,15 @@ import ( "google.golang.org/grpc" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" ) // Stub implementation for DevicePlugin. type Stub struct { - devs []*pluginapi.Device - socket string + devs []*pluginapi.Device + socket string + resourceName string + preStartContainerFlag bool stop chan interface{} wg sync.WaitGroup @@ -43,6 +46,10 @@ type Stub struct { // allocFunc is used for handling allocation request allocFunc stubAllocFunc + + registrationStatus chan watcherapi.RegistrationStatus // for testing + endpoint string // for testing + } // stubAllocFunc is the function called when receive an allocation request from Kubelet @@ -55,10 +62,12 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De } // NewDevicePluginStub returns an initialized DevicePlugin Stub. -func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub { +func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool) *Stub { return &Stub{ - devs: devs, - socket: socket, + devs: devs, + socket: socket, + resourceName: name, + preStartContainerFlag: preStartContainerFlag, stop: make(chan interface{}), update: make(chan []*pluginapi.Device), @@ -88,6 +97,7 @@ func (m *Stub) Start() error { m.wg.Add(1) m.server = grpc.NewServer([]grpc.ServerOption{}...) pluginapi.RegisterDevicePluginServer(m.server, m) + watcherapi.RegisterRegistrationServer(m.server, m) go func() { defer m.wg.Done() @@ -118,8 +128,36 @@ func (m *Stub) Stop() error { return m.cleanup() } +// GetInfo is the RPC which return pluginInfo +func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) { + log.Println("GetInfo") + return &watcherapi.PluginInfo{ + Type: watcherapi.DevicePlugin, + Name: m.resourceName, + Endpoint: m.endpoint, + SupportedVersions: []string{pluginapi.Version}}, nil +} + +// NotifyRegistrationStatus receives the registration notification from watcher +func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.RegistrationStatus) (*watcherapi.RegistrationStatusResponse, error) { + if m.registrationStatus != nil { + m.registrationStatus <- *status + } + if !status.PluginRegistered { + log.Println("Registration failed: ", status.Error) + } + return &watcherapi.RegistrationStatusResponse{}, nil +} + // Register registers the device plugin for the given resourceName with Kubelet. -func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error { +func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error { + if pluginSockDir != "" { + if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil { + log.Println("Deprecation file found. Skip registration.") + return nil + } + } + log.Println("Deprecation file not found. Invoke registration") ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() @@ -136,7 +174,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF Version: pluginapi.Version, Endpoint: path.Base(m.socket), ResourceName: resourceName, - Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag}, + Options: &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, } _, err = client.Register(context.Background(), reqt) @@ -148,7 +186,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF // GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin. func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { - return &pluginapi.DevicePluginOptions{}, nil + return &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, nil } // PreStartContainer resets the devices received diff --git a/pkg/kubelet/cm/devicemanager/endpoint_test.go b/pkg/kubelet/cm/devicemanager/endpoint_test.go index 59edf6baf2..80ffb68d13 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint_test.go +++ b/pkg/kubelet/cm/devicemanager/endpoint_test.go @@ -180,7 +180,7 @@ func TestGetDevices(t *testing.T) { } func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { - p := NewDevicePluginStub(devs, socket) + p := NewDevicePluginStub(devs, socket, resourceName, false) err := p.Start() require.NoError(t, err) diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index 90ae034c61..df97edb719 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -39,6 +39,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/metrics" + watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -239,6 +240,43 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc return nil } +// GetWatcherCallback returns callback function to be registered with plugin watcher +func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn { + if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil { + glog.Errorf("Failed to create deprecation file at %s", m.socketdir) + } else { + f.Close() + glog.V(4).Infof("created deprecation file %s", f.Name()) + } + + return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { + if !m.isVersionCompatibleWithPlugin(versions) { + return nil, fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions) + } + + if !v1helper.IsExtendedResourceName(v1.ResourceName(name)) { + return nil, fmt.Errorf("invalid name of device plugin socket: %v", fmt.Sprintf(errInvalidResourceName, name)) + } + + return m.addEndpointProbeMode(name, sockPath) + } +} + +func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool { + // TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support + // multiple versions in the future, we may need to extend this function to return a supported version. + // E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin, + // this function should return v1beta1 + for _, version := range versions { + for _, supportedVersion := range pluginapi.SupportedVersions { + if version == supportedVersion { + return true + } + } + } + return false +} + // Devices is the map of devices that are known by the Device // Plugin manager with the kind of the devices as key func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { @@ -335,10 +373,41 @@ func (m *ManagerImpl) Stop() error { return nil } -func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { - existingDevs := make(map[string]pluginapi.Device) +func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) { + chanForAckOfNotification := make(chan bool) + + new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback) + if err != nil { + glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) + return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err) + } + + options, err := new.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{}) + if err != nil { + glog.Errorf("Failed to get device plugin options: %v", err) + return nil, fmt.Errorf("Failed to get device plugin options: %v", err) + } + m.registerEndpoint(resourceName, options, new) + + go func() { + select { + case <-chanForAckOfNotification: + close(chanForAckOfNotification) + m.runEndpoint(resourceName, new) + case <-time.After(time.Second): + glog.Errorf("Timed out while waiting for notification ack from plugin") + } + }() + return chanForAckOfNotification, nil +} + +func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e *endpointImpl) { m.mutex.Lock() - old, ok := m.endpoints[r.ResourceName] + defer m.mutex.Unlock() + if options != nil { + m.pluginOpts[resourceName] = options + } + old, ok := m.endpoints[resourceName] if ok && old != nil { // Pass devices of previous endpoint into re-registered one, // to avoid potential orphaned devices upon re-registration @@ -347,49 +416,40 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { device.Health = pluginapi.Unhealthy devices[device.ID] = device } - existingDevs = devices - } - m.mutex.Unlock() - - socketPath := filepath.Join(m.socketdir, r.Endpoint) - e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback) - if err != nil { - glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) - return - } - m.mutex.Lock() - if r.Options != nil { - m.pluginOpts[r.ResourceName] = r.Options - } - // Check for potential re-registration during the initialization of new endpoint, - // and skip updating if re-registration happens. - // TODO: simplify the part once we have a better way to handle registered devices - ext := m.endpoints[r.ResourceName] - if ext != old { - glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e) - m.mutex.Unlock() - e.stop() - return + e.devices = devices } // Associates the newly created endpoint with the corresponding resource name. // Stops existing endpoint if there is any. - m.endpoints[r.ResourceName] = e + m.endpoints[resourceName] = e glog.V(2).Infof("Registered endpoint %v", e) - m.mutex.Unlock() if old != nil { old.stop() } + return +} + +func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) { + e.run() + e.stop() + m.mutex.Lock() + defer m.mutex.Unlock() + if old, ok := m.endpoints[resourceName]; ok && old == e { + m.markResourceUnhealthy(resourceName) + } + glog.V(2).Infof("Unregistered endpoint %v", e) +} + +func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { + new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback) + if err != nil { + glog.Errorf("Failed to dial device plugin with request %v: %v", r, err) + return + } + m.registerEndpoint(r.ResourceName, r.Options, new) go func() { - e.run() - e.stop() - m.mutex.Lock() - if old, ok := m.endpoints[r.ResourceName]; ok && old == e { - m.markResourceUnhealthy(r.ResourceName) - } - glog.V(2).Infof("Unregistered endpoint %v", e) - m.mutex.Unlock() + m.runEndpoint(r.ResourceName, new) }() } diff --git a/pkg/kubelet/cm/devicemanager/manager_stub.go b/pkg/kubelet/cm/devicemanager/manager_stub.go index b42c1b3eab..ce852f697a 100644 --- a/pkg/kubelet/cm/devicemanager/manager_stub.go +++ b/pkg/kubelet/cm/devicemanager/manager_stub.go @@ -21,6 +21,7 @@ import ( pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -61,3 +62,10 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { return nil, nil, []string{} } + +// GetWatcherCallback returns plugin watcher callback +func (h *ManagerStub) GetWatcherCallback() pluginwatcher.RegisterCallbackFn { + return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) { + return nil, nil + } +} diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index d27f6b5f56..da64d5ae63 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -20,6 +20,7 @@ import ( "fmt" "io/ioutil" "os" + "path/filepath" "reflect" "sync/atomic" "testing" @@ -33,8 +34,10 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" + watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -65,31 +68,16 @@ func TestNewManagerImplStart(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) - cleanup(t, m, p) - // Stop should tolerate being called more than once. - cleanup(t, m, p) + m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false) + cleanup(t, m, p, nil) } -func TestNewManagerImplStop(t *testing.T) { +func TestNewManagerImplStartProbeMode(t *testing.T) { socketDir, socketName, pluginSocketName, err := tmpSocketDir() require.NoError(t, err) defer os.RemoveAll(socketDir) - - m, err := newManagerImpl(socketName) - require.NoError(t, err) - // No prior Start, but that should be okay. - err = m.Stop() - require.NoError(t, err) - - devs := []*pluginapi.Device{ - {ID: "Dev1", Health: pluginapi.Healthy}, - {ID: "Dev2", Health: pluginapi.Healthy}, - } - p := NewDevicePluginStub(devs, pluginSocketName) - // Same here. - err = p.Stop() - require.NoError(t, err) + m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) + cleanup(t, m, p, w) } // Tests that the device plugin manager correctly handles registration and re-registration by @@ -118,9 +106,9 @@ func TestDevicePluginReRegistration(t *testing.T) { } callbackChan <- callbackCount } - m, p1 := setup(t, devs, callback, socketName, pluginSocketName) + m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag) atomic.StoreInt32(&expCallbackCount, 1) - p1.Register(socketName, testResourceName, preStartContainerFlag) + p1.Register(socketName, testResourceName, "") // Wait for the first callback to be issued. select { @@ -132,11 +120,11 @@ func TestDevicePluginReRegistration(t *testing.T) { devices := m.Devices() require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") - p2 := NewDevicePluginStub(devs, pluginSocketName+".new") + p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag) err = p2.Start() require.NoError(t, err) atomic.StoreInt32(&expCallbackCount, 2) - p2.Register(socketName, testResourceName, preStartContainerFlag) + p2.Register(socketName, testResourceName, "") // Wait for the second callback to be issued. select { case <-callbackChan: @@ -149,11 +137,11 @@ func TestDevicePluginReRegistration(t *testing.T) { require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") // Test the scenario that a plugin re-registers with different devices. - p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third") + p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag) err = p3.Start() require.NoError(t, err) atomic.StoreInt32(&expCallbackCount, 3) - p3.Register(socketName, testResourceName, preStartContainerFlag) + p3.Register(socketName, testResourceName, "") // Wait for the second callback to be issued. select { case <-callbackChan: @@ -161,16 +149,93 @@ func TestDevicePluginReRegistration(t *testing.T) { case <-time.After(time.Second): t.FailNow() } + devices3 := m.Devices() require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") p2.Stop() p3.Stop() - cleanup(t, m, p1) + cleanup(t, m, p1, nil) close(callbackChan) + } } -func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) { +// Tests that the device plugin manager correctly handles registration and re-registration by +// making sure that after registration, devices are correctly updated and if a re-registration +// happens, we will NOT delete devices; and no orphaned devices left. +// While testing above scenario, plugin discovery and registration will be done using +// Kubelet probe based mechanism +func TestDevicePluginReRegistrationProbeMode(t *testing.T) { + socketDir, socketName, pluginSocketName, err := tmpSocketDir() + require.NoError(t, err) + defer os.RemoveAll(socketDir) + devs := []*pluginapi.Device{ + {ID: "Dev1", Health: pluginapi.Healthy}, + {ID: "Dev2", Health: pluginapi.Healthy}, + } + devsForRegistration := []*pluginapi.Device{ + {ID: "Dev3", Health: pluginapi.Healthy}, + } + + expCallbackCount := int32(0) + callbackCount := int32(0) + callbackChan := make(chan int32) + callback := func(n string, a, u, r []pluginapi.Device) { + callbackCount++ + if callbackCount > atomic.LoadInt32(&expCallbackCount) { + t.FailNow() + } + callbackChan <- callbackCount + } + m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName) + atomic.StoreInt32(&expCallbackCount, 1) + // Wait for the first callback to be issued. + select { + case <-callbackChan: + break + case <-time.After(time.Second): + t.FailNow() + } + devices := m.Devices() + require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") + + p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false) + err = p2.Start() + require.NoError(t, err) + atomic.StoreInt32(&expCallbackCount, 2) + // Wait for the second callback to be issued. + select { + case <-callbackChan: + break + case <-time.After(time.Second): + t.FailNow() + } + + devices2 := m.Devices() + require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") + + // Test the scenario that a plugin re-registers with different devices. + p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false) + err = p3.Start() + require.NoError(t, err) + atomic.StoreInt32(&expCallbackCount, 3) + // Wait for the second callback to be issued. + select { + case <-callbackChan: + break + case <-time.After(time.Second): + t.FailNow() + } + + devices3 := m.Devices() + require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") + p2.Stop() + p3.Stop() + cleanup(t, m, p1, w) + close(callbackChan) +} + +func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) { m, err := newManagerImpl(socketName) require.NoError(t, err) @@ -182,16 +247,43 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc err = m.Start(activePods, &sourcesReadyStub{}) require.NoError(t, err) - p := NewDevicePluginStub(devs, pluginSocketName) + p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag) err = p.Start() require.NoError(t, err) return m, p } -func cleanup(t *testing.T, m Manager, p *Stub) { +func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) { + w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName)) + + m, err := newManagerImpl(socketName) + require.NoError(t, err) + + w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback()) + w.Start() + + m.callback = callback + + activePods := func() []*v1.Pod { + return []*v1.Pod{} + } + err = m.Start(activePods, &sourcesReadyStub{}) + require.NoError(t, err) + + p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/) + err = p.Start() + require.NoError(t, err) + + return m, p, &w +} + +func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) { p.Stop() m.Stop() + if w != nil { + require.NoError(t, w.Stop()) + } } func TestUpdateCapacityAllocatable(t *testing.T) { diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index 4e34749b67..c6ba5f6490 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/kubelet/config" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/lifecycle" + watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" ) @@ -58,6 +59,7 @@ type Manager interface { // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // and inactive device plugin resources previously registered on the node. GetCapacity() (v1.ResourceList, v1.ResourceList, []string) + GetWatcherCallback() watcher.RegisterCallbackFn } // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0ca7771b0a..1f596b648d 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -58,6 +58,7 @@ import ( "k8s.io/kubernetes/pkg/features" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" + pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1" "k8s.io/kubernetes/pkg/kubelet/cadvisor" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -1352,6 +1353,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() { if kl.enablePluginsWatcher { // Adding Registration Callback function for CSI Driver kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) + // Adding Registration Callback function for Device Manager + kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback()) // Start the plugin watcher glog.V(4).Infof("starting watcher") if err := kl.pluginWatcher.Start(); err != nil { diff --git a/test/e2e_node/device_plugin.go b/test/e2e_node/device_plugin.go index 2f928acb63..6315404aa9 100644 --- a/test/e2e_node/device_plugin.go +++ b/test/e2e_node/device_plugin.go @@ -26,9 +26,10 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/util/uuid" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + "k8s.io/kubernetes/pkg/features" + "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" "k8s.io/kubernetes/test/e2e/framework" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" @@ -40,14 +41,28 @@ import ( const ( // fake resource name - resourceName = "fake.com/resource" + resourceName = "fake.com/resource" + resourceNameWithProbeSupport = "fake.com/resource2" ) // Serial because the test restarts Kubelet var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature:DevicePlugin][Serial]", func() { f := framework.NewDefaultFramework("device-plugin-errors") + testDevicePlugin(f, false, pluginapi.DevicePluginPath) +}) +var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:DevicePluginProbe][Serial]", func() { + f := framework.NewDefaultFramework("device-plugin-errors") + testDevicePlugin(f, true, "/var/lib/kubelet/plugins/") +}) + +func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSockDir string) { Context("DevicePlugin", func() { + By("Enabling support for Kubelet Plugins Watcher") + tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) { + initialConfig.FeatureGates[string(features.KubeletPluginsWatcher)] = enablePluginWatcher + }) + //devicePluginSockPaths := []string{pluginapi.DevicePluginPath} It("Verifies the Kubelet device plugin functionality.", func() { By("Start stub device plugin") // fake devices for e2e test @@ -56,15 +71,16 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature {ID: "Dev-2", Health: pluginapi.Healthy}, } - socketPath := pluginapi.DevicePluginPath + "dp." + fmt.Sprintf("%d", time.Now().Unix()) + socketPath := pluginSockDir + "dp." + fmt.Sprintf("%d", time.Now().Unix()) + framework.Logf("socketPath %v", socketPath) - dp1 := dm.NewDevicePluginStub(devs, socketPath) + dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false) dp1.SetAllocFunc(stubAllocFunc) err := dp1.Start() framework.ExpectNoError(err) By("Register resources") - err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) + err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath) framework.ExpectNoError(err) By("Waiting for the resource exported by the stub device plugin to become available on the local node") @@ -103,13 +119,13 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature By("Wait for node is ready") framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) - By("Re-Register resources after kubelet restart") - dp1 = dm.NewDevicePluginStub(devs, socketPath) + By("Re-Register resources") + dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false) dp1.SetAllocFunc(stubAllocFunc) err = dp1.Start() framework.ExpectNoError(err) - err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) + err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir) framework.ExpectNoError(err) By("Waiting for resource to become available on the local node after re-registration") @@ -149,12 +165,12 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature Expect(devIdRestart2).To(Equal(devId2)) By("Re-register resources") - dp1 = dm.NewDevicePluginStub(devs, socketPath) + dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false) dp1.SetAllocFunc(stubAllocFunc) err = dp1.Start() framework.ExpectNoError(err) - err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) + err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir) framework.ExpectNoError(err) By("Waiting for the resource exported by the stub device plugin to become healthy on the local node") @@ -192,7 +208,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) }) }) -}) +} // makeBusyboxPod returns a simple Pod spec with a busybox container // that requests resourceName and runs the specified command.