Updated the device manager pluginwatcher handler

pull/8/head
Renaud Gaubert 2018-08-12 01:59:39 +02:00
parent 78b55eb5bf
commit 8dd1d27c03
8 changed files with 136 additions and 120 deletions

View File

@ -95,7 +95,11 @@ type ContainerManager interface {
// GetPodCgroupRoot returns the cgroup which contains all pods.
GetPodCgroupRoot() string
GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn
// GetPluginRegistrationHandler returns a plugin registration handler
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
}
type NodeConfig struct {

View File

@ -605,8 +605,8 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
return nil
}
func (cm *containerManagerImpl) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
return cm.deviceManager.GetWatcherCallback()
func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
}
// TODO: move the GetResources logic to PodContainerManager.

View File

@ -77,10 +77,8 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return c
}
func (cm *containerManagerStub) GetPluginRegistrationHandlerCallback() pluginwatcher.RegisterCallbackFn {
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
return nil, nil
}
func (cm *containerManagerStub) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
return nil
}
func (cm *containerManagerStub) GetDevicePluginResourceCapacity() (v1.ResourceList, v1.ResourceList, []string) {

View File

@ -56,7 +56,7 @@ type ManagerImpl struct {
socketname string
socketdir string
endpoints map[string]endpoint // Key is ResourceName
endpoints map[string]endpointInfo // Key is ResourceName
mutex sync.Mutex
server *grpc.Server
@ -86,10 +86,14 @@ type ManagerImpl struct {
// podDevices contains pod to allocated device mapping.
podDevices podDevices
pluginOpts map[string]*pluginapi.DevicePluginOptions
checkpointManager checkpointmanager.CheckpointManager
}
type endpointInfo struct {
e endpoint
opts *pluginapi.DevicePluginOptions
}
type sourcesReadyStub struct{}
func (s *sourcesReadyStub) AddSource(source string) {}
@ -109,13 +113,13 @@ func newManagerImpl(socketPath string) (*ManagerImpl, error) {
dir, file := filepath.Split(socketPath)
manager := &ManagerImpl{
endpoints: make(map[string]endpoint),
endpoints: make(map[string]endpointInfo),
socketname: file,
socketdir: dir,
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
pluginOpts: make(map[string]*pluginapi.DevicePluginOptions),
podDevices: make(podDevices),
}
manager.callback = manager.genericDeviceUpdateCallback
@ -228,8 +232,8 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
return nil
}
// GetWatcherCallback returns callback function to be registered with plugin watcher
func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn {
// GetWatcherHandler returns the plugin handler
func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
glog.Errorf("Failed to create deprecation file at %s", m.socketdir)
} else {
@ -237,16 +241,57 @@ func (m *ManagerImpl) GetWatcherCallback() watcher.RegisterCallbackFn {
glog.V(4).Infof("created deprecation file %s", f.Name())
}
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
if !m.isVersionCompatibleWithPlugin(versions) {
return nil, fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
}
return watcher.PluginHandler(m)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(name)) {
return nil, fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, name))
}
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string) error {
glog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
return m.addEndpointProbeMode(name, sockPath)
if !m.isVersionCompatibleWithPlugin(versions) {
return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
}
if !v1helper.IsExtendedResourceName(v1.ResourceName(pluginName)) {
return fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, pluginName))
}
return nil
}
// RegisterPlugin starts the endpoint and registers it
// TODO: Start the endpoint and wait for the First ListAndWatch call
// before registering the plugin
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string) error {
glog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
e, err := newEndpointImpl(endpoint, pluginName, m.callback)
if err != nil {
return fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", endpoint, err)
}
options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
return fmt.Errorf("Failed to get device plugin options: %v", err)
}
m.registerEndpoint(pluginName, options, e)
go m.runEndpoint(pluginName, e)
return nil
}
// DeRegisterPlugin deregisters the plugin
// TODO work on the behavior for deregistering plugins
// e.g: Should we delete the resource
func (m *ManagerImpl) DeRegisterPlugin(pluginName string) {
m.mutex.Lock()
defer m.mutex.Unlock()
// Note: This will mark the resource unhealthy as per the behavior
// in runEndpoint
if eI, ok := m.endpoints[pluginName]; ok {
eI.e.stop()
}
}
@ -333,8 +378,8 @@ func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest
func (m *ManagerImpl) Stop() error {
m.mutex.Lock()
defer m.mutex.Unlock()
for _, e := range m.endpoints {
e.stop()
for _, eI := range m.endpoints {
eI.e.stop()
}
if m.server == nil {
@ -346,51 +391,26 @@ func (m *ManagerImpl) Stop() error {
return nil
}
func (m *ManagerImpl) addEndpointProbeMode(resourceName string, socketPath string) (chan bool, error) {
chanForAckOfNotification := make(chan bool)
new, err := newEndpointImpl(socketPath, resourceName, m.callback)
if err != nil {
glog.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
return nil, fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", socketPath, err)
}
options, err := new.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
if err != nil {
glog.Errorf("Failed to get device plugin options: %v", err)
return nil, fmt.Errorf("Failed to get device plugin options: %v", err)
}
m.registerEndpoint(resourceName, options, new)
go func() {
select {
case <-chanForAckOfNotification:
close(chanForAckOfNotification)
m.runEndpoint(resourceName, new)
case <-time.After(time.Second):
glog.Errorf("Timed out while waiting for notification ack from plugin")
}
}()
return chanForAckOfNotification, nil
}
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e *endpointImpl) {
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
m.mutex.Lock()
defer m.mutex.Unlock()
m.pluginOpts[resourceName] = options
m.endpoints[resourceName] = e
m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
glog.V(2).Infof("Registered endpoint %v", e)
}
func (m *ManagerImpl) runEndpoint(resourceName string, e *endpointImpl) {
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
e.run()
e.stop()
m.mutex.Lock()
defer m.mutex.Unlock()
if old, ok := m.endpoints[resourceName]; ok && old == e {
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
m.markResourceUnhealthy(resourceName)
}
glog.V(2).Infof("Unregistered endpoint %v", e)
glog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
}
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
@ -437,8 +457,8 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
deletedResources := sets.NewString()
m.mutex.Lock()
for resourceName, devices := range m.healthyDevices {
e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
eI, ok := m.endpoints[resourceName]
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
// The resources contained in endpoints and (un)healthyDevices
// should always be consistent. Otherwise, we run with the risk
// of failing to garbage collect non-existing resources or devices.
@ -455,8 +475,8 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}
}
for resourceName, devices := range m.unhealthyDevices {
e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
eI, ok := m.endpoints[resourceName]
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
if !ok {
glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
}
@ -519,7 +539,7 @@ func (m *ManagerImpl) readCheckpoint() error {
// will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString()
m.unhealthyDevices[resource] = sets.NewString()
m.endpoints[resource] = newStoppedEndpointImpl(resource)
m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
}
return nil
}
@ -652,7 +672,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// plugin Allocate grpc calls if it becomes common that a container may require
// resources from multiple device plugins.
m.mutex.Lock()
e, ok := m.endpoints[resource]
eI, ok := m.endpoints[resource]
m.mutex.Unlock()
if !ok {
m.mutex.Lock()
@ -665,7 +685,7 @@ func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Cont
// TODO: refactor this part of code to just append a ContainerAllocationRequest
// in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
resp, err := e.allocate(devs)
resp, err := eI.e.allocate(devs)
metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))
if err != nil {
// In case of allocation failure, we want to restore m.allocatedDevices
@ -715,11 +735,13 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
// with PreStartRequired option set.
func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
m.mutex.Lock()
opts, ok := m.pluginOpts[resource]
eI, ok := m.endpoints[resource]
if !ok {
m.mutex.Unlock()
return fmt.Errorf("Plugin options not found in cache for resource: %s", resource)
} else if opts == nil || !opts.PreStartRequired {
return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
}
if eI.opts == nil || !eI.opts.PreStartRequired {
m.mutex.Unlock()
glog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource)
return nil
@ -731,16 +753,10 @@ func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource s
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
}
e, ok := m.endpoints[resource]
if !ok {
m.mutex.Unlock()
return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
}
m.mutex.Unlock()
devs := devices.UnsortedList()
glog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
_, err := e.preStartContainer(devs)
_, err := eI.e.preStartContainer(devs)
if err != nil {
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
}

View File

@ -57,9 +57,7 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
return nil, nil, []string{}
}
// GetWatcherCallback returns plugin watcher callback
func (h *ManagerStub) GetWatcherCallback() pluginwatcher.RegisterCallbackFn {
return func(name string, endpoint string, versions []string, sockPath string) (chan bool, error) {
return nil, nil
}
// GetWatcherHandler returns plugin watcher interface
func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
return nil
}

View File

@ -249,9 +249,10 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName))
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherCallback())
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()
return &w
return w
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
@ -295,7 +296,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Expects capacity for resource1 to be 2.
resourceName1 := "domain1.com/resource1"
e1 := &endpointImpl{}
testManager.endpoints[resourceName1] = e1
testManager.endpoints[resourceName1] = endpointInfo{e: e1, opts: nil}
callback(resourceName1, devs)
capacity, allocatable, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
@ -345,7 +346,7 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Tests adding another resource.
resourceName2 := "resource2"
e2 := &endpointImpl{}
testManager.endpoints[resourceName2] = e2
testManager.endpoints[resourceName2] = endpointInfo{e: e2, opts: nil}
callback(resourceName2, devs)
capacity, allocatable, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity))
@ -456,7 +457,7 @@ func TestCheckpoint(t *testing.T) {
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
endpoints: make(map[string]endpoint),
endpoints: make(map[string]endpointInfo),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
@ -577,7 +578,7 @@ func makePod(limits v1.ResourceList) *v1.Pod {
}
}
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource, opts map[string]*pluginapi.DevicePluginOptions) (*ManagerImpl, error) {
func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestResource) (*ManagerImpl, error) {
monitorCallback := func(resourceName string, devices []pluginapi.Device) {}
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
@ -589,41 +590,45 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint),
pluginOpts: opts,
endpoints: make(map[string]endpointInfo),
podDevices: make(podDevices),
activePods: activePods,
sourcesReady: &sourcesReadyStub{},
checkpointManager: ckm,
}
for _, res := range testRes {
testManager.healthyDevices[res.resourceName] = sets.NewString()
for _, dev := range res.devs {
testManager.healthyDevices[res.resourceName].Insert(dev)
}
if res.resourceName == "domain1.com/resource1" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
allocateFunc: allocateStubFunc(),
testManager.endpoints[res.resourceName] = endpointInfo{
e: &MockEndpoint{allocateFunc: allocateStubFunc()},
opts: nil,
}
}
if res.resourceName == "domain2.com/resource2" {
testManager.endpoints[res.resourceName] = &MockEndpoint{
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.ContainerAllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev3":
resp.Envs["key2"] = "val2"
testManager.endpoints[res.resourceName] = endpointInfo{
e: &MockEndpoint{
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.ContainerAllocateResponse)
resp.Envs = make(map[string]string)
for _, dev := range devs {
switch dev {
case "dev3":
resp.Envs["key2"] = "val2"
case "dev4":
resp.Envs["key2"] = "val3"
case "dev4":
resp.Envs["key2"] = "val3"
}
}
}
resps := new(pluginapi.AllocateResponse)
resps.ContainerResponses = append(resps.ContainerResponses, resp)
return resps, nil
resps := new(pluginapi.AllocateResponse)
resps.ContainerResponses = append(resps.ContainerResponses, resp)
return resps, nil
},
},
opts: nil,
}
}
}
@ -669,10 +674,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = nil
pluginOpts[res2.resourceName] = nil
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
as.Nil(err)
testPods := []*v1.Pod{
@ -767,10 +769,8 @@ func TestInitContainerDeviceAllocation(t *testing.T) {
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
defer os.RemoveAll(tmpDir)
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = nil
pluginOpts[res2.resourceName] = nil
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, testResources)
as.Nil(err)
podWithPluginResourcesInInitContainers := &v1.Pod{
@ -904,18 +904,18 @@ func TestDevicePreStartContainer(t *testing.T) {
as.Nil(err)
defer os.RemoveAll(tmpDir)
nodeInfo := getTestNodeInfo(v1.ResourceList{})
pluginOpts := make(map[string]*pluginapi.DevicePluginOptions)
pluginOpts[res1.resourceName] = &pluginapi.DevicePluginOptions{PreStartRequired: true}
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1}, pluginOpts)
testManager, err := getTestManager(tmpDir, podsStub.getActivePods, []TestResource{res1})
as.Nil(err)
ch := make(chan []string, 1)
testManager.endpoints[res1.resourceName] = &MockEndpoint{
initChan: ch,
allocateFunc: allocateStubFunc(),
testManager.endpoints[res1.resourceName] = endpointInfo{
e: &MockEndpoint{
initChan: ch,
allocateFunc: allocateStubFunc(),
},
opts: &pluginapi.DevicePluginOptions{PreStartRequired: true},
}
pod := makePod(v1.ResourceList{
v1.ResourceName(res1.resourceName): res1.resourceQuantity})
activePods := []*v1.Pod{}

View File

@ -53,7 +53,7 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherCallback() watcher.RegisterCallbackFn
GetWatcherHandler() watcher.PluginHandler
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -1367,7 +1367,7 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
// Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler("CSIPlugin", pluginwatcher.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for Device Manager
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandlerCallback())
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin watcher
glog.V(4).Infof("starting watcher")
if err := kl.pluginWatcher.Start(); err != nil {