Use probe based plugin discovery mechanism in device manager

pull/8/head
vikaschoudhary16 2018-01-24 12:06:07 -05:00 committed by vikaschoudhary16
parent e35ecf1618
commit a5842503eb
14 changed files with 326 additions and 87 deletions

View File

@ -44,6 +44,7 @@ go_library(
"//pkg/kubelet/apis/cri:go_default_library", "//pkg/kubelet/apis/cri:go_default_library",
"//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library", "//pkg/kubelet/apis/cri/runtime/v1alpha2:go_default_library",
"//pkg/kubelet/apis/kubeletconfig:go_default_library", "//pkg/kubelet/apis/kubeletconfig:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library", "//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/certificate:go_default_library", "//pkg/kubelet/certificate:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",

View File

@ -32,6 +32,7 @@ go_library(
"//pkg/kubelet/eviction/api:go_default_library", "//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/status:go_default_library", "//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -28,6 +28,7 @@ import (
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api" evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
"fmt" "fmt"
@ -94,6 +95,7 @@ type ContainerManager interface {
// GetPodCgroupRoot returns the cgroup which contains all pods. // GetPodCgroupRoot returns the cgroup which contains all pods.
GetPodCgroupRoot() string GetPodCgroupRoot() string
GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn
} }
type NodeConfig struct { type NodeConfig struct {

View File

@ -52,6 +52,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
utilfile "k8s.io/kubernetes/pkg/util/file" utilfile "k8s.io/kubernetes/pkg/util/file"
"k8s.io/kubernetes/pkg/util/mount" "k8s.io/kubernetes/pkg/util/mount"
@ -600,6 +601,10 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
return nil return nil
} }
func (cm *containerManagerImpl) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
return cm.deviceManager.GetWatcherCallback()
}
// TODO: move the GetResources logic to PodContainerManager. // TODO: move the GetResources logic to PodContainerManager.
func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) { func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Container) (*kubecontainer.RunContainerOptions, error) {
opts := &kubecontainer.RunContainerOptions{} opts := &kubecontainer.RunContainerOptions{}

View File

@ -27,6 +27,7 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/status" "k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
@ -76,6 +77,12 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return c return c
} }
func (cm *containerManagerStub) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
return nil, nil
}
}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) { func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{} return nil, nil, []string{}
} }

View File

@ -15,6 +15,7 @@ go_library(
deps = [ deps = [
"//pkg/apis/core/v1/helper:go_default_library", "//pkg/apis/core/v1/helper:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/checkpointmanager/errors:go_default_library", "//pkg/kubelet/checkpointmanager/errors:go_default_library",
"//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library", "//pkg/kubelet/cm/devicemanager/checkpoint:go_default_library",
@ -22,6 +23,7 @@ go_library(
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library", "//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
@ -40,8 +42,10 @@ go_test(
embed = [":go_default_library"], embed = [":go_default_library"],
deps = [ deps = [
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library", "//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1alpha1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library", "//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/scheduler/cache:go_default_library", "//pkg/scheduler/cache:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -28,12 +28,15 @@ import (
"google.golang.org/grpc" "google.golang.org/grpc"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
) )
// Stub implementation for DevicePlugin. // Stub implementation for DevicePlugin.
type Stub struct { type Stub struct {
devs []*pluginapi.Device devs []*pluginapi.Device
socket string socket string
resourceName string
preStartContainerFlag bool
stop chan interface{} stop chan interface{}
wg sync.WaitGroup wg sync.WaitGroup
@ -43,6 +46,10 @@ type Stub struct {
// allocFunc is used for handling allocation request // allocFunc is used for handling allocation request
allocFunc stubAllocFunc allocFunc stubAllocFunc
registrationStatus chan watcherapi.RegistrationStatus // for testing
endpoint string // for testing
} }
// stubAllocFunc is the function called when receive an allocation request from Kubelet // stubAllocFunc is the function called when receive an allocation request from Kubelet
@ -55,10 +62,12 @@ func defaultAllocFunc(r *pluginapi.AllocateRequest, devs map[string]pluginapi.De
} }
// NewDevicePluginStub returns an initialized DevicePlugin Stub. // NewDevicePluginStub returns an initialized DevicePlugin Stub.
func NewDevicePluginStub(devs []*pluginapi.Device, socket string) *Stub { func NewDevicePluginStub(devs []*pluginapi.Device, socket string, name string, preStartContainerFlag bool) *Stub {
return &Stub{ return &Stub{
devs: devs, devs: devs,
socket: socket, socket: socket,
resourceName: name,
preStartContainerFlag: preStartContainerFlag,
stop: make(chan interface{}), stop: make(chan interface{}),
update: make(chan []*pluginapi.Device), update: make(chan []*pluginapi.Device),
@ -88,6 +97,7 @@ func (m *Stub) Start() error {
m.wg.Add(1) m.wg.Add(1)
m.server = grpc.NewServer([]grpc.ServerOption{}...) m.server = grpc.NewServer([]grpc.ServerOption{}...)
pluginapi.RegisterDevicePluginServer(m.server, m) pluginapi.RegisterDevicePluginServer(m.server, m)
watcherapi.RegisterRegistrationServer(m.server, m)
go func() { go func() {
defer m.wg.Done() defer m.wg.Done()
@ -118,8 +128,36 @@ func (m *Stub) Stop() error {
return m.cleanup() return m.cleanup()
} }
// GetInfo is the RPC which return pluginInfo
func (m *Stub) GetInfo(ctx context.Context, req *watcherapi.InfoRequest) (*watcherapi.PluginInfo, error) {
log.Println("GetInfo")
return &watcherapi.PluginInfo{
Type: watcherapi.DevicePlugin,
Name: m.resourceName,
Endpoint: m.endpoint,
SupportedVersions: []string{pluginapi.Version}}, nil
}
// NotifyRegistrationStatus receives the registration notification from watcher
func (m *Stub) NotifyRegistrationStatus(ctx context.Context, status *watcherapi.RegistrationStatus) (*watcherapi.RegistrationStatusResponse, error) {
if m.registrationStatus != nil {
m.registrationStatus <- *status
}
if !status.PluginRegistered {
log.Println("Registration failed: ", status.Error)
}
return &watcherapi.RegistrationStatusResponse{}, nil
}
// Register registers the device plugin for the given resourceName with Kubelet. // Register registers the device plugin for the given resourceName with Kubelet.
func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerFlag bool) error { func (m *Stub) Register(kubeletEndpoint, resourceName string, pluginSockDir string) error {
if pluginSockDir != "" {
if _, err := os.Stat(pluginSockDir + "DEPRECATION"); err == nil {
log.Println("Deprecation file found. Skip registration.")
return nil
}
}
log.Println("Deprecation file not found. Invoke registration")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel() defer cancel()
@ -136,7 +174,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF
Version: pluginapi.Version, Version: pluginapi.Version,
Endpoint: path.Base(m.socket), Endpoint: path.Base(m.socket),
ResourceName: resourceName, ResourceName: resourceName,
Options: &pluginapi.DevicePluginOptions{PreStartRequired: preStartContainerFlag}, Options: &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag},
} }
_, err = client.Register(context.Background(), reqt) _, err = client.Register(context.Background(), reqt)
@ -148,7 +186,7 @@ func (m *Stub) Register(kubeletEndpoint, resourceName string, preStartContainerF
// GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin. // GetDevicePluginOptions returns DevicePluginOptions settings for the device plugin.
func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) { func (m *Stub) GetDevicePluginOptions(ctx context.Context, e *pluginapi.Empty) (*pluginapi.DevicePluginOptions, error) {
return &pluginapi.DevicePluginOptions{}, nil return &pluginapi.DevicePluginOptions{PreStartRequired: m.preStartContainerFlag}, nil
} }
// PreStartContainer resets the devices received // PreStartContainer resets the devices received

View File

@ -180,7 +180,7 @@ func TestGetDevices(t *testing.T) {
} }
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) { func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
p := NewDevicePluginStub(devs, socket) p := NewDevicePluginStub(devs, socket, resourceName, false)
err := p.Start() err := p.Start()
require.NoError(t, err) require.NoError(t, err)

View File

@ -39,6 +39,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/metrics"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
@ -239,6 +240,43 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
return nil return nil
} }
// GetWatcherCallback returns callback function to be registered with plugin watcher
func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn {
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
glog.Errorf("Failed to create deprecation file at %s", m.socketdir)
} else {
f.Close()
glog.V(4).Infof("created deprecation file %s", f.Name())
}
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
if !m.isVersionCompatibleWithPlugin(versions) {
return nil, fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(name)) {
return nil, fmt.Errorf("invalid name of device plugin socket: %v", fmt.Sprintf(errInvalidResourceName, name))
}
return m.addEndpointProbeMode(name, sockPath)
}
}
func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
// TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support
// multiple versions in the future, we may need to extend this function to return a supported version.
// E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
// this function should return v1beta1
for _, version := range versions {
for _, supportedVersion := range pluginapi.SupportedVersions {
if version == supportedVersion {
return true
}
}
}
return false
}
// Devices is the map of devices that are known by the Device // Devices is the map of devices that are known by the Device
// Plugin manager with the kind of the devices as key // Plugin manager with the kind of the devices as key
func (m *ManagerImpl) Devices() map[string][]pluginapi.Device { func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
@ -335,10 +373,41 @@ func (m *ManagerImpl) Stop() error {
return nil return nil
} }
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) {
existingDevs := make(map[string]pluginapi.Device) chanForAckOfNotification := make(chan bool)
new, err := newEndpointImpl(socketPath, resourceName, make(map[string]pluginapi.Device), m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
}
options, err := new.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
glog.Errorf("Failed to get device plugin options: %v", err)
return nil, fmt.Errorf("Failed to get device plugin options: %v", err)
}
m.registerEndpoint(resourceName, options, new)
go func() {
select {
case <-chanForAckOfNotification:
close(chanForAckOfNotification)
m.runEndpoint(resourceName, new)
case <-time.After(time.Second):
glog.Errorf("Timed out while waiting for notification ack from plugin")
}
}()
return chanForAckOfNotification, nil
}
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e *endpointImpl) {
m.mutex.Lock() m.mutex.Lock()
old, ok := m.endpoints[r.ResourceName] defer m.mutex.Unlock()
if options != nil {
m.pluginOpts[resourceName] = options
}
old, ok := m.endpoints[resourceName]
if ok && old != nil { if ok && old != nil {
// Pass devices of previous endpoint into re-registered one, // Pass devices of previous endpoint into re-registered one,
// to avoid potential orphaned devices upon re-registration // to avoid potential orphaned devices upon re-registration
@ -347,49 +416,40 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
device.Health = pluginapi.Unhealthy device.Health = pluginapi.Unhealthy
devices[device.ID] = device devices[device.ID] = device
} }
existingDevs = devices e.devices = devices
}
m.mutex.Unlock()
socketPath := filepath.Join(m.socketdir, r.Endpoint)
e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.mutex.Lock()
if r.Options != nil {
m.pluginOpts[r.ResourceName] = r.Options
}
// Check for potential re-registration during the initialization of new endpoint,
// and skip updating if re-registration happens.
// TODO: simplify the part once we have a better way to handle registered devices
ext := m.endpoints[r.ResourceName]
if ext != old {
glog.Warningf("Some other endpoint %v is added while endpoint %v is initialized", ext, e)
m.mutex.Unlock()
e.stop()
return
} }
// Associates the newly created endpoint with the corresponding resource name. // Associates the newly created endpoint with the corresponding resource name.
// Stops existing endpoint if there is any. // Stops existing endpoint if there is any.
m.endpoints[r.ResourceName] = e m.endpoints[resourceName] = e
glog.V(2).Infof("Registered endpoint %v", e) glog.V(2).Infof("Registered endpoint %v", e)
m.mutex.Unlock()
if old != nil { if old != nil {
old.stop() old.stop()
} }
return
}
func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
e.run()
e.stop()
m.mutex.Lock()
defer m.mutex.Unlock()
if old, ok := m.endpoints[resourceName]; ok && old == e {
m.markResourceUnhealthy(resourceName)
}
glog.V(2).Infof("Unregistered endpoint %v", e)
}
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, make(map[string]pluginapi.Device), m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
return
}
m.registerEndpoint(r.ResourceName, r.Options, new)
go func() { go func() {
e.run() m.runEndpoint(r.ResourceName, new)
e.stop()
m.mutex.Lock()
if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
m.markResourceUnhealthy(r.ResourceName)
}
glog.V(2).Infof("Unregistered endpoint %v", e)
m.mutex.Unlock()
}() }()
} }

View File

@ -21,6 +21,7 @@ import (
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
@ -61,3 +62,10 @@ func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) { func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
return nil, nil, []string{} return nil, nil, []string{}
} }
// GetWatcherCallback returns plugin watcher callback
func (h *ManagerStub) GetWatcherCallback() pluginwatcher.RegisterCallbackFn {
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
return nil, nil
}
}

View File

@ -20,6 +20,7 @@ import (
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"os" "os"
"path/filepath"
"reflect" "reflect"
"sync/atomic" "sync/atomic"
"testing" "testing"
@ -33,8 +34,10 @@ import (
"k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/uuid"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
@ -65,31 +68,16 @@ func TestNewManagerImplStart(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir() socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName) m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName, false)
cleanup(t, m, p) cleanup(t, m, p, nil)
// Stop should tolerate being called more than once.
cleanup(t, m, p)
} }
func TestNewManagerImplStop(t *testing.T) { func TestNewManagerImplStartProbeMode(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir() socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err) require.NoError(t, err)
defer os.RemoveAll(socketDir) defer os.RemoveAll(socketDir)
m, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {}, socketName, pluginSocketName)
m, err := newManagerImpl(socketName) cleanup(t, m, p, w)
require.NoError(t, err)
// No prior Start, but that should be okay.
err = m.Stop()
require.NoError(t, err)
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}
p := NewDevicePluginStub(devs, pluginSocketName)
// Same here.
err = p.Stop()
require.NoError(t, err)
} }
// Tests that the device plugin manager correctly handles registration and re-registration by // Tests that the device plugin manager correctly handles registration and re-registration by
@ -118,9 +106,9 @@ func TestDevicePluginReRegistration(t *testing.T) {
} }
callbackChan <- callbackCount callbackChan <- callbackCount
} }
m, p1 := setup(t, devs, callback, socketName, pluginSocketName) m, p1 := setup(t, devs, callback, socketName, pluginSocketName, preStartContainerFlag)
atomic.StoreInt32(&expCallbackCount, 1) atomic.StoreInt32(&expCallbackCount, 1)
p1.Register(socketName, testResourceName, preStartContainerFlag) p1.Register(socketName, testResourceName, "")
// Wait for the first callback to be issued. // Wait for the first callback to be issued.
select { select {
@ -132,11 +120,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
devices := m.Devices() devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.") require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new") p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, preStartContainerFlag)
err = p2.Start() err = p2.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2) atomic.StoreInt32(&expCallbackCount, 2)
p2.Register(socketName, testResourceName, preStartContainerFlag) p2.Register(socketName, testResourceName, "")
// Wait for the second callback to be issued. // Wait for the second callback to be issued.
select { select {
case <-callbackChan: case <-callbackChan:
@ -149,11 +137,11 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.") require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices. // Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third") p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, preStartContainerFlag)
err = p3.Start() err = p3.Start()
require.NoError(t, err) require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3) atomic.StoreInt32(&expCallbackCount, 3)
p3.Register(socketName, testResourceName, preStartContainerFlag) p3.Register(socketName, testResourceName, "")
// Wait for the second callback to be issued. // Wait for the second callback to be issued.
select { select {
case <-callbackChan: case <-callbackChan:
@ -161,16 +149,93 @@ func TestDevicePluginReRegistration(t *testing.T) {
case <-time.After(time.Second): case <-time.After(time.Second):
t.FailNow() t.FailNow()
} }
devices3 := m.Devices() devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.") require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop() p2.Stop()
p3.Stop() p3.Stop()
cleanup(t, m, p1) cleanup(t, m, p1, nil)
close(callbackChan) close(callbackChan)
} }
} }
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub) { // Tests that the device plugin manager correctly handles registration and re-registration by
// making sure that after registration, devices are correctly updated and if a re-registration
// happens, we will NOT delete devices; and no orphaned devices left.
// While testing above scenario, plugin discovery and registration will be done using
// Kubelet probe based mechanism
func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
devs := []*pluginapi.Device{
{ID: "Dev1", Health: pluginapi.Healthy},
{ID: "Dev2", Health: pluginapi.Healthy},
}
devsForRegistration := []*pluginapi.Device{
{ID: "Dev3", Health: pluginapi.Healthy},
}
expCallbackCount := int32(0)
callbackCount := int32(0)
callbackChan := make(chan int32)
callback := func(n string, a, u, r []pluginapi.Device) {
callbackCount++
if callbackCount > atomic.LoadInt32(&expCallbackCount) {
t.FailNow()
}
callbackChan <- callbackCount
}
m, p1, w := setupInProbeMode(t, devs, callback, socketName, pluginSocketName)
atomic.StoreInt32(&expCallbackCount, 1)
// Wait for the first callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices := m.Devices()
require.Equal(t, 2, len(devices[testResourceName]), "Devices are not updated.")
p2 := NewDevicePluginStub(devs, pluginSocketName+".new", testResourceName, false)
err = p2.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 2)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices2 := m.Devices()
require.Equal(t, 2, len(devices2[testResourceName]), "Devices shouldn't change.")
// Test the scenario that a plugin re-registers with different devices.
p3 := NewDevicePluginStub(devsForRegistration, pluginSocketName+".third", testResourceName, false)
err = p3.Start()
require.NoError(t, err)
atomic.StoreInt32(&expCallbackCount, 3)
// Wait for the second callback to be issued.
select {
case <-callbackChan:
break
case <-time.After(time.Second):
t.FailNow()
}
devices3 := m.Devices()
require.Equal(t, 1, len(devices3[testResourceName]), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1, w)
close(callbackChan)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string, preStartContainerFlag bool) (Manager, *Stub) {
m, err := newManagerImpl(socketName) m, err := newManagerImpl(socketName)
require.NoError(t, err) require.NoError(t, err)
@ -182,16 +247,43 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc
err = m.Start(activePods, &sourcesReadyStub{}) err = m.Start(activePods, &sourcesReadyStub{})
require.NoError(t, err) require.NoError(t, err)
p := NewDevicePluginStub(devs, pluginSocketName) p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, preStartContainerFlag)
err = p.Start() err = p.Start()
require.NoError(t, err) require.NoError(t, err)
return m, p return m, p
} }
func cleanup(t *testing.T, m Manager, p *Stub) { func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, *Stub, *pluginwatcher.Watcher) {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
m, err := newManagerImpl(socketName)
require.NoError(t, err)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback())
w.Start()
m.callback = callback
activePods := func() []*v1.Pod {
return []*v1.Pod{}
}
err = m.Start(activePods, &sourcesReadyStub{})
require.NoError(t, err)
p := NewDevicePluginStub(devs, pluginSocketName, testResourceName, false /*preStart*/)
err = p.Start()
require.NoError(t, err)
return m, p, &w
}
func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
p.Stop() p.Stop()
m.Stop() m.Stop()
if w != nil {
require.NoError(t, w.Stop())
}
} }
func TestUpdateCapacityAllocatable(t *testing.T) { func TestUpdateCapacityAllocatable(t *testing.T) {

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle" "k8s.io/kubernetes/pkg/kubelet/lifecycle"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache" schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
) )
@ -58,6 +59,7 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable // GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node. // and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherCallback() watcher.RegisterCallbackFn
} }
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices. // DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -58,6 +58,7 @@ import (
"k8s.io/kubernetes/pkg/features" "k8s.io/kubernetes/pkg/features"
internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri" internalapi "k8s.io/kubernetes/pkg/kubelet/apis/cri"
kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig" kubeletconfiginternal "k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/cadvisor" "k8s.io/kubernetes/pkg/kubelet/cadvisor"
kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate" kubeletcertificate "k8s.io/kubernetes/pkg/kubelet/certificate"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@ -1352,6 +1353,8 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
if kl.enablePluginsWatcher { if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver // Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback) kl.pluginWatcher.AddHandler("CSIPlugin", csi.RegistrationCallback)
// Adding Registration Callback function for Device Manager
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback())
// Start the plugin watcher // Start the plugin watcher
glog.V(4).Infof("starting watcher") glog.V(4).Infof("starting watcher")
if err := kl.pluginWatcher.Start(); err != nil { if err := kl.pluginWatcher.Start(); err != nil {

View File

@ -26,9 +26,10 @@ import (
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource" "k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/uuid"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/kubernetes/pkg/features"
"k8s.io/kubernetes/pkg/kubelet/apis/kubeletconfig"
"k8s.io/kubernetes/test/e2e/framework" "k8s.io/kubernetes/test/e2e/framework"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
@ -40,14 +41,28 @@ import (
const ( const (
// fake resource name // fake resource name
resourceName = "fake.com/resource" resourceName = "fake.com/resource"
resourceNameWithProbeSupport = "fake.com/resource2"
) )
// Serial because the test restarts Kubelet // Serial because the test restarts Kubelet
var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature:DevicePlugin][Serial]", func() { var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature:DevicePlugin][Serial]", func() {
f := framework.NewDefaultFramework("device-plugin-errors") f := framework.NewDefaultFramework("device-plugin-errors")
testDevicePlugin(f, false, pluginapi.DevicePluginPath)
})
var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePluginProbe][NodeFeature:DevicePluginProbe][Serial]", func() {
f := framework.NewDefaultFramework("device-plugin-errors")
testDevicePlugin(f, true, "/var/lib/kubelet/plugins/")
})
func testDevicePlugin(f *framework.Framework, enablePluginWatcher bool, pluginSockDir string) {
Context("DevicePlugin", func() { Context("DevicePlugin", func() {
By("Enabling support for Kubelet Plugins Watcher")
tempSetCurrentKubeletConfig(f, func(initialConfig *kubeletconfig.KubeletConfiguration) {
initialConfig.FeatureGates[string(features.KubeletPluginsWatcher)] = enablePluginWatcher
})
//devicePluginSockPaths := []string{pluginapi.DevicePluginPath}
It("Verifies the Kubelet device plugin functionality.", func() { It("Verifies the Kubelet device plugin functionality.", func() {
By("Start stub device plugin") By("Start stub device plugin")
// fake devices for e2e test // fake devices for e2e test
@ -56,15 +71,16 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
{ID: "Dev-2", Health: pluginapi.Healthy}, {ID: "Dev-2", Health: pluginapi.Healthy},
} }
socketPath := pluginapi.DevicePluginPath + "dp." + fmt.Sprintf("%d", time.Now().Unix()) socketPath := pluginSockDir + "dp." + fmt.Sprintf("%d", time.Now().Unix())
framework.Logf("socketPath %v", socketPath)
dp1 := dm.NewDevicePluginStub(devs, socketPath) dp1 := dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc) dp1.SetAllocFunc(stubAllocFunc)
err := dp1.Start() err := dp1.Start()
framework.ExpectNoError(err) framework.ExpectNoError(err)
By("Register resources") By("Register resources")
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginapi.DevicePluginPath)
framework.ExpectNoError(err) framework.ExpectNoError(err)
By("Waiting for the resource exported by the stub device plugin to become available on the local node") By("Waiting for the resource exported by the stub device plugin to become available on the local node")
@ -103,13 +119,13 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
By("Wait for node is ready") By("Wait for node is ready")
framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout)
By("Re-Register resources after kubelet restart") By("Re-Register resources")
dp1 = dm.NewDevicePluginStub(devs, socketPath) dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc) dp1.SetAllocFunc(stubAllocFunc)
err = dp1.Start() err = dp1.Start()
framework.ExpectNoError(err) framework.ExpectNoError(err)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir)
framework.ExpectNoError(err) framework.ExpectNoError(err)
By("Waiting for resource to become available on the local node after re-registration") By("Waiting for resource to become available on the local node after re-registration")
@ -149,12 +165,12 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
Expect(devIdRestart2).To(Equal(devId2)) Expect(devIdRestart2).To(Equal(devId2))
By("Re-register resources") By("Re-register resources")
dp1 = dm.NewDevicePluginStub(devs, socketPath) dp1 = dm.NewDevicePluginStub(devs, socketPath, resourceName, false)
dp1.SetAllocFunc(stubAllocFunc) dp1.SetAllocFunc(stubAllocFunc)
err = dp1.Start() err = dp1.Start()
framework.ExpectNoError(err) framework.ExpectNoError(err)
err = dp1.Register(pluginapi.KubeletSocket, resourceName, false) err = dp1.Register(pluginapi.KubeletSocket, resourceName, pluginSockDir)
framework.ExpectNoError(err) framework.ExpectNoError(err)
By("Waiting for the resource exported by the stub device plugin to become healthy on the local node") By("Waiting for the resource exported by the stub device plugin to become healthy on the local node")
@ -192,7 +208,7 @@ var _ = framework.KubeDescribe("Device Plugin [Feature:DevicePlugin][NodeFeature
f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout) f.PodClient().DeleteSync(pod2.Name, &metav1.DeleteOptions{}, framework.DefaultPodDeletionTimeout)
}) })
}) })
}) }
// makeBusyboxPod returns a simple Pod spec with a busybox container // makeBusyboxPod returns a simple Pod spec with a busybox container
// that requests resourceName and runs the specified command. // that requests resourceName and runs the specified command.