mirror of https://github.com/k3s-io/k3s
Merge pull request #55730 from nqn/niklas/device-move
Automatic merge from submit-queue (batch tested with PRs 54824, 55911, 55730, 55979, 55961). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. Device plugin API merge of handler and manager **What this PR does / why we need it**: We are trying different approaches to make the device plugin implementation simpler and more robust. One option is to merge the notion of the `device_plugin_handler` into the `device_manager`. This is for several reasons: 1) Some calls go directly from handler to manager, adding little value. 2) The separation of concern is not clear between the two components. They have a 1:1 relationship. 3) The separation and abstractions needed are at a different level. Code that can be refactored will most likely live in abstractions which hide details around lock acquisition and check pointing. In this PR, we will **just** merge the two interfaces. After this, there is several opportunities for simplifying and cleaning up the device plugin. Fixes #55180 **Special notes for your reviewer**: This is a WIP. May very well get dropped, but keeping up for the sake of early sharing and showing the progress of the code move. **Release note**: ```release-note NONE ```pull/6/head
commit
bb0dccf602
|
@ -128,7 +128,7 @@ type containerManagerImpl struct {
|
|||
// Interface for QoS cgroup management
|
||||
qosContainerManager QOSContainerManager
|
||||
// Interface for exporting and allocating devices reported by device plugins.
|
||||
devicePluginHandler deviceplugin.Handler
|
||||
devicePluginManager deviceplugin.Manager
|
||||
// Interface for CPU affinity management.
|
||||
cpuManager cpumanager.Manager
|
||||
}
|
||||
|
@ -274,11 +274,11 @@ func NewContainerManager(mountUtil mount.Interface, cadvisorInterface cadvisor.I
|
|||
}
|
||||
}
|
||||
|
||||
glog.Infof("Creating device plugin handler: %t", devicePluginEnabled)
|
||||
glog.Infof("Creating device plugin manager: %t", devicePluginEnabled)
|
||||
if devicePluginEnabled {
|
||||
cm.devicePluginHandler, err = deviceplugin.NewHandlerImpl(updateDeviceCapacityFunc)
|
||||
cm.devicePluginManager, err = deviceplugin.NewManagerImpl(updateDeviceCapacityFunc)
|
||||
} else {
|
||||
cm.devicePluginHandler, err = deviceplugin.NewHandlerStub()
|
||||
cm.devicePluginManager, err = deviceplugin.NewManagerStub()
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
@ -597,7 +597,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
|
|||
}, time.Second, stopChan)
|
||||
|
||||
// Starts device plugin manager.
|
||||
if err := cm.devicePluginHandler.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
||||
if err := cm.devicePluginManager.Start(deviceplugin.ActivePodsFunc(activePods)); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
@ -622,7 +622,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
|
|||
opts := &kubecontainer.RunContainerOptions{}
|
||||
// Allocate should already be called during predicateAdmitHandler.Admit(),
|
||||
// just try to fetch device runtime information from cached state here
|
||||
devOpts := cm.devicePluginHandler.GetDeviceRunContainerOptions(pod, container)
|
||||
devOpts := cm.devicePluginManager.GetDeviceRunContainerOptions(pod, container)
|
||||
if devOpts == nil {
|
||||
return opts, nil
|
||||
}
|
||||
|
@ -633,7 +633,7 @@ func (cm *containerManagerImpl) GetResources(pod *v1.Pod, container *v1.Containe
|
|||
}
|
||||
|
||||
func (cm *containerManagerImpl) UpdatePluginResources(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
return cm.devicePluginHandler.Allocate(node, attrs)
|
||||
return cm.devicePluginManager.Allocate(node, attrs)
|
||||
}
|
||||
|
||||
func (cm *containerManagerImpl) SystemCgroupsLimit() v1.ResourceList {
|
||||
|
|
|
@ -9,11 +9,10 @@ load(
|
|||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = [
|
||||
"device_plugin_handler.go",
|
||||
"device_plugin_handler_stub.go",
|
||||
"device_plugin_stub.go",
|
||||
"endpoint.go",
|
||||
"manager.go",
|
||||
"manager_stub.go",
|
||||
"pod_devices.go",
|
||||
"types.go",
|
||||
],
|
||||
|
@ -49,7 +48,6 @@ filegroup(
|
|||
go_test(
|
||||
name = "go_default_test",
|
||||
srcs = [
|
||||
"device_plugin_handler_test.go",
|
||||
"endpoint_test.go",
|
||||
"manager_test.go",
|
||||
],
|
||||
|
|
|
@ -1,369 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"sync"
|
||||
|
||||
"github.com/golang/glog"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
||||
type ActivePodsFunc func() []*v1.Pod
|
||||
|
||||
// Handler defines the functions used to manage and access device plugin resources.
|
||||
type Handler interface {
|
||||
// Start starts device plugin registration service.
|
||||
Start(activePods ActivePodsFunc) error
|
||||
// Devices returns all of registered devices keyed by resourceName.
|
||||
Devices() map[string][]pluginapi.Device
|
||||
// Allocate scans through containers in the pod spec
|
||||
// If it finds the container requires device plugin resource, it:
|
||||
// 1. Checks whether it already has this information in its cached state.
|
||||
// 2. If not, it calls Allocate and populate its cached state afterwards.
|
||||
// 3. If there is no cached state and Allocate fails, it returns an error.
|
||||
// 4. Otherwise, it updates allocatableResource in nodeInfo if necessary,
|
||||
// to make sure it is at least equal to the pod's requested capacity for
|
||||
// any registered device plugin resource
|
||||
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
|
||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||
// for the found one. An empty struct is returned in case no cached state is found.
|
||||
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
|
||||
}
|
||||
|
||||
// HandlerImpl implements the actual functionality to manage device plugin resources.
|
||||
type HandlerImpl struct {
|
||||
// TODO: consider to change this to RWMutex.
|
||||
sync.Mutex
|
||||
// devicePluginManager is an implementation of deviceplugin.Manager interface.
|
||||
devicePluginManager Manager
|
||||
// activePods is a method for listing active pods on the node
|
||||
// so the amount of pluginResources requested by existing pods
|
||||
// could be counted when updating allocated devices
|
||||
activePods ActivePodsFunc
|
||||
// devicePluginManagerMonitorCallback is used for updating devices' states in one time call.
|
||||
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
||||
devicePluginManagerMonitorCallback MonitorCallback
|
||||
// allDevices contains all of registered resourceNames and their exported device IDs.
|
||||
allDevices map[string]sets.String
|
||||
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
|
||||
allocatedDevices map[string]sets.String
|
||||
// podDevices contains pod to allocated device mapping.
|
||||
podDevices podDevices
|
||||
}
|
||||
|
||||
// NewHandlerImpl creates a HandlerImpl to manage device plugin resources.
|
||||
// updateCapacityFunc is called to update ContainerManager capacity when
|
||||
// device capacity changes.
|
||||
func NewHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*HandlerImpl, error) {
|
||||
glog.V(2).Infof("Creating Device Plugin Handler")
|
||||
handler := &HandlerImpl{
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
|
||||
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
var capacity = v1.ResourceList{}
|
||||
kept := append(updated, added...)
|
||||
|
||||
handler.Lock()
|
||||
defer handler.Unlock()
|
||||
|
||||
if _, ok := handler.allDevices[resourceName]; !ok {
|
||||
handler.allDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
// For now, Handler only keeps track of healthy devices.
|
||||
// We can revisit this later when the need comes to track unhealthy devices here.
|
||||
for _, dev := range kept {
|
||||
if dev.Health == pluginapi.Healthy {
|
||||
handler.allDevices[resourceName].Insert(dev.ID)
|
||||
} else {
|
||||
handler.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
}
|
||||
for _, dev := range deleted {
|
||||
handler.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(handler.allDevices[resourceName].Len()), resource.DecimalSI)
|
||||
updateCapacityFunc(capacity)
|
||||
}
|
||||
|
||||
mgr, err := NewManagerImpl(pluginapi.KubeletSocket, deviceManagerMonitorCallback)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Failed to initialize device plugin manager: %+v", err)
|
||||
}
|
||||
|
||||
handler.devicePluginManager = mgr
|
||||
handler.devicePluginManagerMonitorCallback = deviceManagerMonitorCallback
|
||||
|
||||
return handler, nil
|
||||
}
|
||||
|
||||
// Start initializes podDevices and allocatedDevices information from checkpoint-ed state
|
||||
// and starts device plugin registration service.
|
||||
func (h *HandlerImpl) Start(activePods ActivePodsFunc) error {
|
||||
h.activePods = activePods
|
||||
|
||||
// Loads in allocatedDevices information from disk.
|
||||
err := h.readCheckpoint()
|
||||
if err != nil {
|
||||
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||
}
|
||||
|
||||
return h.devicePluginManager.Start()
|
||||
}
|
||||
|
||||
// Devices returns all of registered devices keyed by resourceName.
|
||||
func (h *HandlerImpl) Devices() map[string][]pluginapi.Device {
|
||||
return h.devicePluginManager.Devices()
|
||||
}
|
||||
|
||||
// Returns list of device Ids we need to allocate with Allocate rpc call.
|
||||
// Returns empty list in case we don't need to issue the Allocate rpc call.
|
||||
func (h *HandlerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
needed := required
|
||||
// Gets list of devices that have already been allocated.
|
||||
// This can happen if a container restarts for example.
|
||||
devices := h.podDevices.containerDevices(podUID, contName, resource)
|
||||
if devices != nil {
|
||||
glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
|
||||
needed = needed - devices.Len()
|
||||
// A pod's resource is not expected to change once admitted by the API server,
|
||||
// so just fail loudly here. We can revisit this part if this no longer holds.
|
||||
if needed != 0 {
|
||||
return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required)
|
||||
}
|
||||
}
|
||||
if needed == 0 {
|
||||
// No change, no work.
|
||||
return nil, nil
|
||||
}
|
||||
devices = sets.NewString()
|
||||
// Needs to allocate additional devices.
|
||||
if h.allocatedDevices[resource] == nil {
|
||||
h.allocatedDevices[resource] = sets.NewString()
|
||||
}
|
||||
// Gets Devices in use.
|
||||
devicesInUse := h.allocatedDevices[resource]
|
||||
// Gets a list of available devices.
|
||||
available := h.allDevices[resource].Difference(devicesInUse)
|
||||
if int(available.Len()) < needed {
|
||||
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
|
||||
}
|
||||
allocated := available.UnsortedList()[:needed]
|
||||
// Updates h.allocatedDevices with allocated devices to prevent them
|
||||
// from being allocated to other pods/containers, given that we are
|
||||
// not holding lock during the rpc call.
|
||||
for _, device := range allocated {
|
||||
h.allocatedDevices[resource].Insert(device)
|
||||
devices.Insert(device)
|
||||
}
|
||||
return devices, nil
|
||||
}
|
||||
|
||||
// allocateContainerResources attempts to allocate all of required device
|
||||
// plugin resources for the input container, issues an Allocate rpc request
|
||||
// for each new device resource requirement, processes their AllocateResponses,
|
||||
// and updates the cached containerDevices on success.
|
||||
func (h *HandlerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error {
|
||||
podUID := string(pod.UID)
|
||||
contName := container.Name
|
||||
allocatedDevicesUpdated := false
|
||||
for k, v := range container.Resources.Limits {
|
||||
resource := string(k)
|
||||
needed := int(v.Value())
|
||||
glog.V(3).Infof("needs %d %s", needed, resource)
|
||||
if _, registeredResource := h.allDevices[resource]; !registeredResource {
|
||||
continue
|
||||
}
|
||||
// Updates allocatedDevices to garbage collect any stranded resources
|
||||
// before doing the device plugin allocation.
|
||||
if !allocatedDevicesUpdated {
|
||||
h.updateAllocatedDevices(h.activePods())
|
||||
allocatedDevicesUpdated = true
|
||||
}
|
||||
allocDevices, err := h.devicesToAllocate(podUID, contName, resource, needed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if allocDevices == nil || len(allocDevices) <= 0 {
|
||||
continue
|
||||
}
|
||||
// devicePluginManager.Allocate involves RPC calls to device plugin, which
|
||||
// could be heavy-weight. Therefore we want to perform this operation outside
|
||||
// mutex lock. Note if Allcate call fails, we may leave container resources
|
||||
// partially allocated for the failed container. We rely on updateAllocatedDevices()
|
||||
// to garbage collect these resources later. Another side effect is that if
|
||||
// we have X resource A and Y resource B in total, and two containers, container1
|
||||
// and container2 both require X resource A and Y resource B. Both allocation
|
||||
// requests may fail if we serve them in mixed order.
|
||||
// TODO: may revisit this part later if we see inefficient resource allocation
|
||||
// in real use as the result of this. Should also consider to parallize device
|
||||
// plugin Allocate grpc calls if it becomes common that a container may require
|
||||
// resources from multiple device plugins.
|
||||
resp, err := h.devicePluginManager.Allocate(resource, allocDevices.UnsortedList())
|
||||
if err != nil {
|
||||
// In case of allocation failure, we want to restore h.allocatedDevices
|
||||
// to the actual allocated state from h.podDevices.
|
||||
h.Lock()
|
||||
h.allocatedDevices = h.podDevices.devices()
|
||||
h.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Update internal cached podDevices state.
|
||||
h.Lock()
|
||||
h.podDevices.insert(podUID, contName, resource, allocDevices, resp)
|
||||
h.Unlock()
|
||||
}
|
||||
|
||||
// Checkpoints device to container allocation information.
|
||||
return h.writeCheckpoint()
|
||||
}
|
||||
|
||||
// Allocate attempts to allocate all of required device plugin resources,
|
||||
// and update Allocatable resources in nodeInfo if necessary
|
||||
func (h *HandlerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
pod := attrs.Pod
|
||||
// TODO: Reuse devices between init containers and regular containers.
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if err := h.allocateContainerResources(pod, &container); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
if err := h.allocateContainerResources(pod, &container); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// quick return if no pluginResources requested
|
||||
if _, podRequireDevicePluginResource := h.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
||||
return nil
|
||||
}
|
||||
|
||||
h.sanitizeNodeAllocatable(node)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// sanitizeNodeAllocatable scans through allocatedDevices in DevicePluginHandler
|
||||
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
||||
// the allocated capacity. This allows pods that have already been scheduled on
|
||||
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
||||
func (h *HandlerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
||||
var newAllocatableResource *schedulercache.Resource
|
||||
allocatableResource := node.AllocatableResource()
|
||||
if allocatableResource.ScalarResources == nil {
|
||||
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
||||
}
|
||||
for resource, devices := range h.allocatedDevices {
|
||||
needed := devices.Len()
|
||||
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
||||
if ok && int(quant) >= needed {
|
||||
continue
|
||||
}
|
||||
// Needs to update nodeInfo.AllocatableResource to make sure
|
||||
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
||||
if newAllocatableResource == nil {
|
||||
newAllocatableResource = allocatableResource.Clone()
|
||||
}
|
||||
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
||||
}
|
||||
if newAllocatableResource != nil {
|
||||
node.SetAllocatableResource(newAllocatableResource)
|
||||
}
|
||||
}
|
||||
|
||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||
// for the found one. An empty struct is returned in case no cached state is found.
|
||||
func (h *HandlerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
return h.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
|
||||
}
|
||||
|
||||
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
||||
// terminated pods. Returns error on failure.
|
||||
func (h *HandlerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
activePodUids := sets.NewString()
|
||||
for _, pod := range activePods {
|
||||
activePodUids.Insert(string(pod.UID))
|
||||
}
|
||||
allocatedPodUids := h.podDevices.pods()
|
||||
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
|
||||
if len(podsToBeRemoved) <= 0 {
|
||||
return
|
||||
}
|
||||
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||
h.podDevices.delete(podsToBeRemoved.List())
|
||||
// Regenerated allocatedDevices after we update pod allocation information.
|
||||
h.allocatedDevices = h.podDevices.devices()
|
||||
}
|
||||
|
||||
// Checkpoints device to container allocation information to disk.
|
||||
func (h *HandlerImpl) writeCheckpoint() error {
|
||||
h.Lock()
|
||||
data := h.podDevices.toCheckpointData()
|
||||
h.Unlock()
|
||||
|
||||
dataJSON, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filepath := h.devicePluginManager.CheckpointFile()
|
||||
return ioutil.WriteFile(filepath, dataJSON, 0644)
|
||||
}
|
||||
|
||||
// Reads device to container allocation information from disk, and populates
|
||||
// h.allocatedDevices accordingly.
|
||||
func (h *HandlerImpl) readCheckpoint() error {
|
||||
filepath := h.devicePluginManager.CheckpointFile()
|
||||
content, err := ioutil.ReadFile(filepath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
|
||||
}
|
||||
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
|
||||
var data checkpointData
|
||||
if err := json.Unmarshal(content, &data); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
h.podDevices.fromCheckpointData(data)
|
||||
h.allocatedDevices = h.podDevices.devices()
|
||||
return nil
|
||||
}
|
|
@ -1,414 +0,0 @@
|
|||
/*
|
||||
Copyright 2017 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package deviceplugin
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
func TestUpdateCapacity(t *testing.T) {
|
||||
var expected = v1.ResourceList{}
|
||||
as := assert.New(t)
|
||||
verifyCapacityFunc := func(updates v1.ResourceList) {
|
||||
as.Equal(expected, updates)
|
||||
}
|
||||
testHandler, err := NewHandlerImpl(verifyCapacityFunc)
|
||||
as.NotNil(testHandler)
|
||||
as.Nil(err)
|
||||
|
||||
devs := []pluginapi.Device{
|
||||
{ID: "Device1", Health: pluginapi.Healthy},
|
||||
{ID: "Device2", Health: pluginapi.Healthy},
|
||||
{ID: "Device3", Health: pluginapi.Unhealthy},
|
||||
}
|
||||
|
||||
resourceName := "resource1"
|
||||
// Adds three devices for resource1, two healthy and one unhealthy.
|
||||
// Expects capacity for resource1 to be 2.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
// Deletes an unhealthy device should NOT change capacity.
|
||||
testHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||
// Updates a healthy device to unhealthy should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
// Deletes a healthy device should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||
// Tests adding another resource.
|
||||
delete(expected, v1.ResourceName(resourceName))
|
||||
resourceName2 := "resource2"
|
||||
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
}
|
||||
|
||||
type stringPairType struct {
|
||||
value1 string
|
||||
value2 string
|
||||
}
|
||||
|
||||
// DevicePluginManager stub to test device Allocation behavior.
|
||||
type DevicePluginManagerTestStub struct {
|
||||
// All data structs are keyed by resourceName+DevId
|
||||
devRuntimeDevices map[string][]stringPairType
|
||||
devRuntimeMounts map[string][]stringPairType
|
||||
devRuntimeEnvs map[string][]stringPairType
|
||||
}
|
||||
|
||||
func NewDevicePluginManagerTestStub() (*DevicePluginManagerTestStub, error) {
|
||||
return &DevicePluginManagerTestStub{
|
||||
devRuntimeDevices: make(map[string][]stringPairType),
|
||||
devRuntimeMounts: make(map[string][]stringPairType),
|
||||
devRuntimeEnvs: make(map[string][]stringPairType),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (m *DevicePluginManagerTestStub) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device {
|
||||
return make(map[string][]pluginapi.Device)
|
||||
}
|
||||
|
||||
func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) {
|
||||
resp := new(pluginapi.AllocateResponse)
|
||||
resp.Envs = make(map[string]string)
|
||||
for _, id := range devIds {
|
||||
key := resourceName + id
|
||||
fmt.Printf("Alloc device %v for resource %v\n", id, resourceName)
|
||||
for _, dev := range m.devRuntimeDevices[key] {
|
||||
fmt.Printf("Add dev %v %v\n", dev.value1, dev.value2)
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
ContainerPath: dev.value1,
|
||||
HostPath: dev.value2,
|
||||
Permissions: "mrw",
|
||||
})
|
||||
}
|
||||
for _, mount := range m.devRuntimeMounts[key] {
|
||||
fmt.Printf("Add mount %v %v\n", mount.value1, mount.value2)
|
||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||
ContainerPath: mount.value1,
|
||||
HostPath: mount.value2,
|
||||
ReadOnly: true,
|
||||
})
|
||||
}
|
||||
for _, env := range m.devRuntimeEnvs[key] {
|
||||
fmt.Printf("Add env %v %v\n", env.value1, env.value2)
|
||||
resp.Envs[env.value1] = env.value2
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (m *DevicePluginManagerTestStub) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *DevicePluginManagerTestStub) CheckpointFile() string {
|
||||
return "/tmp/device-plugin-checkpoint"
|
||||
}
|
||||
|
||||
func constructDevices(devices []string) sets.String {
|
||||
ret := sets.NewString()
|
||||
for _, dev := range devices {
|
||||
ret.Insert(dev)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse {
|
||||
resp := &pluginapi.AllocateResponse{}
|
||||
for k, v := range devices {
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
HostPath: k,
|
||||
ContainerPath: v,
|
||||
Permissions: "mrw",
|
||||
})
|
||||
}
|
||||
for k, v := range mounts {
|
||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||
ContainerPath: k,
|
||||
HostPath: v,
|
||||
ReadOnly: true,
|
||||
})
|
||||
}
|
||||
resp.Envs = make(map[string]string)
|
||||
for k, v := range envs {
|
||||
resp.Envs[k] = v
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func TestCheckpoint(t *testing.T) {
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
|
||||
m, err := NewDevicePluginManagerTestStub()
|
||||
as := assert.New(t)
|
||||
as.Nil(err)
|
||||
|
||||
testHandler := &HandlerImpl{
|
||||
devicePluginManager: m,
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
|
||||
testHandler.podDevices.insert("pod1", "con1", resourceName1,
|
||||
constructDevices([]string{"dev1", "dev2"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
testHandler.podDevices.insert("pod1", "con1", resourceName2,
|
||||
constructDevices([]string{"dev1", "dev2"}),
|
||||
constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
|
||||
map[string]string{"/home/r2lib1": "/usr/r2lib1"},
|
||||
map[string]string{"r2devices": "dev1 dev2"}))
|
||||
testHandler.podDevices.insert("pod1", "con2", resourceName1,
|
||||
constructDevices([]string{"dev3"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
testHandler.podDevices.insert("pod2", "con1", resourceName1,
|
||||
constructDevices([]string{"dev4"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
|
||||
expectedPodDevices := testHandler.podDevices
|
||||
expectedAllocatedDevices := testHandler.podDevices.devices()
|
||||
|
||||
err = testHandler.writeCheckpoint()
|
||||
as.Nil(err)
|
||||
testHandler.podDevices = make(podDevices)
|
||||
err = testHandler.readCheckpoint()
|
||||
as.Nil(err)
|
||||
|
||||
as.Equal(len(expectedPodDevices), len(testHandler.podDevices))
|
||||
for podUID, containerDevices := range expectedPodDevices {
|
||||
for conName, resources := range containerDevices {
|
||||
for resource := range resources {
|
||||
as.True(reflect.DeepEqual(
|
||||
expectedPodDevices.containerDevices(podUID, conName, resource),
|
||||
testHandler.podDevices.containerDevices(podUID, conName, resource)))
|
||||
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
|
||||
opts2 := testHandler.podDevices.deviceRunContainerOptions(podUID, conName)
|
||||
as.Equal(len(opts1.Envs), len(opts2.Envs))
|
||||
as.Equal(len(opts1.Mounts), len(opts2.Mounts))
|
||||
as.Equal(len(opts1.Devices), len(opts2.Devices))
|
||||
}
|
||||
}
|
||||
}
|
||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testHandler.allocatedDevices))
|
||||
}
|
||||
|
||||
type activePodsStub struct {
|
||||
activePods []*v1.Pod
|
||||
}
|
||||
|
||||
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
||||
return a.activePods
|
||||
}
|
||||
|
||||
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
||||
a.activePods = newPods
|
||||
}
|
||||
|
||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||
var logLevel string
|
||||
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
||||
flag.Lookup("v").Value.Set(logLevel)
|
||||
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
devID1 := "dev1"
|
||||
devID2 := "dev2"
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
devID3 := "dev3"
|
||||
devID4 := "dev4"
|
||||
|
||||
m, err := NewDevicePluginManagerTestStub()
|
||||
as := assert.New(t)
|
||||
as.Nil(err)
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
podsStub := activePodsStub{
|
||||
activePods: []*v1.Pod{},
|
||||
}
|
||||
cachedNode := &v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Allocatable: v1.ResourceList{},
|
||||
},
|
||||
}
|
||||
nodeInfo := &schedulercache.NodeInfo{}
|
||||
nodeInfo.SetNode(cachedNode)
|
||||
|
||||
testHandler := &HandlerImpl{
|
||||
devicePluginManager: m,
|
||||
devicePluginManagerMonitorCallback: monitorCallback,
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
activePods: podsStub.getActivePods,
|
||||
}
|
||||
testHandler.allDevices[resourceName1] = sets.NewString()
|
||||
testHandler.allDevices[resourceName1].Insert(devID1)
|
||||
testHandler.allDevices[resourceName1].Insert(devID2)
|
||||
testHandler.allDevices[resourceName2] = sets.NewString()
|
||||
testHandler.allDevices[resourceName2].Insert(devID3)
|
||||
testHandler.allDevices[resourceName2].Insert(devID4)
|
||||
|
||||
m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/aaa", "/dev/aaa"})
|
||||
m.devRuntimeDevices[resourceName1+devID1] = append(m.devRuntimeDevices[resourceName1+devID1], stringPairType{"/dev/bbb", "/dev/bbb"})
|
||||
m.devRuntimeDevices[resourceName1+devID2] = append(m.devRuntimeDevices[resourceName1+devID2], stringPairType{"/dev/ccc", "/dev/ccc"})
|
||||
m.devRuntimeMounts[resourceName1+devID1] = append(m.devRuntimeMounts[resourceName1+devID1], stringPairType{"/container_dir1/file1", "host_dir1/file1"})
|
||||
m.devRuntimeMounts[resourceName1+devID2] = append(m.devRuntimeMounts[resourceName1+devID2], stringPairType{"/container_dir1/file2", "host_dir1/file2"})
|
||||
m.devRuntimeEnvs[resourceName1+devID2] = append(m.devRuntimeEnvs[resourceName1+devID2], stringPairType{"key1", "val1"})
|
||||
m.devRuntimeEnvs[resourceName2+devID3] = append(m.devRuntimeEnvs[resourceName2+devID3], stringPairType{"key2", "val2"})
|
||||
m.devRuntimeEnvs[resourceName2+devID4] = append(m.devRuntimeEnvs[resourceName2+devID4], stringPairType{"key2", "val3"})
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName1): resourceQuantity1,
|
||||
v1.ResourceName("cpu"): resourceQuantity1,
|
||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
podsStub.updateActivePods([]*v1.Pod{pod})
|
||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
||||
as.Nil(err)
|
||||
runContainerOpts := testHandler.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
||||
as.Equal(len(runContainerOpts.Devices), 3)
|
||||
as.Equal(len(runContainerOpts.Mounts), 2)
|
||||
as.Equal(len(runContainerOpts.Envs), 2)
|
||||
|
||||
// Requesting to create a pod without enough resources should fail.
|
||||
as.Equal(2, testHandler.allocatedDevices[resourceName1].Len())
|
||||
failPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName1): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
||||
as.NotNil(err)
|
||||
runContainerOpts2 := testHandler.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
||||
as.Nil(runContainerOpts2)
|
||||
|
||||
// Requesting to create a new pod with a single resourceName2 should succeed.
|
||||
newPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = testHandler.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
||||
as.Nil(err)
|
||||
runContainerOpts3 := testHandler.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
||||
as.Equal(1, len(runContainerOpts3.Envs))
|
||||
}
|
||||
|
||||
func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
devID1 := "dev1"
|
||||
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
devID2 := "dev2"
|
||||
|
||||
m, err := NewDevicePluginManagerTestStub()
|
||||
as := assert.New(t)
|
||||
as.Nil(err)
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
|
||||
testHandler := &HandlerImpl{
|
||||
devicePluginManager: m,
|
||||
devicePluginManagerMonitorCallback: monitorCallback,
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
// require one of resource1 and one of resource2
|
||||
testHandler.allocatedDevices[resourceName1] = sets.NewString()
|
||||
testHandler.allocatedDevices[resourceName1].Insert(devID1)
|
||||
testHandler.allocatedDevices[resourceName2] = sets.NewString()
|
||||
testHandler.allocatedDevices[resourceName2].Insert(devID2)
|
||||
|
||||
cachedNode := &v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Allocatable: v1.ResourceList{
|
||||
// has no resource1 and two of resource2
|
||||
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
}
|
||||
nodeInfo := &schedulercache.NodeInfo{}
|
||||
nodeInfo.SetNode(cachedNode)
|
||||
|
||||
testHandler.sanitizeNodeAllocatable(nodeInfo)
|
||||
|
||||
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
||||
// allocatable in nodeInfo is less than needed, should update
|
||||
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
||||
// allocatable in nodeInfo is more than needed, should skip updating
|
||||
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
||||
}
|
|
@ -32,7 +32,15 @@ import (
|
|||
// endpoint maps to a single registered device plugin. It is responsible
|
||||
// for managing gRPC communications with the device plugin and caching
|
||||
// device states reported by the device plugin.
|
||||
type endpoint struct {
|
||||
type endpoint interface {
|
||||
run()
|
||||
stop()
|
||||
allocate(devs []string) (*pluginapi.AllocateResponse, error)
|
||||
getDevices() []pluginapi.Device
|
||||
callback(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||
}
|
||||
|
||||
type endpointImpl struct {
|
||||
client pluginapi.DevicePluginClient
|
||||
clientConn *grpc.ClientConn
|
||||
|
||||
|
@ -42,18 +50,18 @@ type endpoint struct {
|
|||
devices map[string]pluginapi.Device
|
||||
mutex sync.Mutex
|
||||
|
||||
callback MonitorCallback
|
||||
cb monitorCallback
|
||||
}
|
||||
|
||||
// newEndpoint creates a new endpoint for the given resourceName.
|
||||
func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.Device, callback MonitorCallback) (*endpoint, error) {
|
||||
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) {
|
||||
client, c, err := dial(socketPath)
|
||||
if err != nil {
|
||||
glog.Errorf("Can't create new endpoint with path %s err %v", socketPath, err)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return &endpoint{
|
||||
return &endpointImpl{
|
||||
client: client,
|
||||
clientConn: c,
|
||||
|
||||
|
@ -61,11 +69,15 @@ func newEndpoint(socketPath, resourceName string, devices map[string]pluginapi.D
|
|||
resourceName: resourceName,
|
||||
|
||||
devices: devices,
|
||||
callback: callback,
|
||||
cb: callback,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (e *endpoint) getDevices() []pluginapi.Device {
|
||||
func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
e.cb(resourceName, added, updated, deleted)
|
||||
}
|
||||
|
||||
func (e *endpointImpl) getDevices() []pluginapi.Device {
|
||||
e.mutex.Lock()
|
||||
defer e.mutex.Unlock()
|
||||
var devs []pluginapi.Device
|
||||
|
@ -81,11 +93,9 @@ func (e *endpoint) getDevices() []pluginapi.Device {
|
|||
// blocks on receiving ListAndWatch gRPC stream updates. Each ListAndWatch
|
||||
// stream update contains a new list of device states. listAndWatch compares the new
|
||||
// device states with its cached states to get list of new, updated, and deleted devices.
|
||||
// It then issues a callback to pass this information to the device_plugin_handler which
|
||||
// It then issues a callback to pass this information to the device manager which
|
||||
// will adjust the resource available information accordingly.
|
||||
func (e *endpoint) run() {
|
||||
glog.V(3).Infof("Starting ListAndWatch")
|
||||
|
||||
func (e *endpointImpl) run() {
|
||||
stream, err := e.client.ListAndWatch(context.Background(), &pluginapi.Empty{})
|
||||
if err != nil {
|
||||
glog.Errorf(errListAndWatch, e.resourceName, err)
|
||||
|
@ -162,13 +172,13 @@ func (e *endpoint) run() {
|
|||
}
|
||||
|
||||
// allocate issues Allocate gRPC call to the device plugin.
|
||||
func (e *endpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
|
||||
DevicesIDs: devs,
|
||||
})
|
||||
}
|
||||
|
||||
func (e *endpoint) stop() {
|
||||
func (e *endpointImpl) stop() {
|
||||
e.clientConn.Close()
|
||||
}
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ func TestRun(t *testing.T) {
|
|||
}
|
||||
|
||||
func TestGetDevices(t *testing.T) {
|
||||
e := endpoint{
|
||||
e := endpointImpl{
|
||||
devices: map[string]pluginapi.Device{
|
||||
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
|
||||
},
|
||||
|
@ -96,19 +96,19 @@ func TestGetDevices(t *testing.T) {
|
|||
require.Len(t, devs, 1)
|
||||
}
|
||||
|
||||
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) {
|
||||
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback monitorCallback) (*Stub, *endpointImpl) {
|
||||
p := NewDevicePluginStub(devs, socket)
|
||||
|
||||
err := p.Start()
|
||||
require.NoError(t, err)
|
||||
|
||||
e, err := newEndpoint(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {})
|
||||
e, err := newEndpointImpl(socket, "mock", make(map[string]pluginapi.Device), func(n string, a, u, r []pluginapi.Device) {})
|
||||
require.NoError(t, err)
|
||||
|
||||
return p, e
|
||||
}
|
||||
|
||||
func ecleanup(t *testing.T, p *Stub, e *endpoint) {
|
||||
func ecleanup(t *testing.T, p *Stub, e *endpointImpl) {
|
||||
p.Stop()
|
||||
e.stop()
|
||||
}
|
||||
|
|
|
@ -17,7 +17,9 @@ limitations under the License.
|
|||
package deviceplugin
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
"os"
|
||||
"path/filepath"
|
||||
|
@ -28,27 +30,58 @@ import (
|
|||
"google.golang.org/grpc"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
||||
type ActivePodsFunc func() []*v1.Pod
|
||||
|
||||
// monitorCallback is the function called when a device's health state changes,
|
||||
// or new devices are reported, or old devices are deleted.
|
||||
// Updated contains the most recent state of the Device.
|
||||
type monitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||
|
||||
// ManagerImpl is the structure in charge of managing Device Plugins.
|
||||
type ManagerImpl struct {
|
||||
socketname string
|
||||
socketdir string
|
||||
|
||||
endpoints map[string]*endpoint // Key is ResourceName
|
||||
endpoints map[string]endpoint // Key is ResourceName
|
||||
mutex sync.Mutex
|
||||
|
||||
callback MonitorCallback
|
||||
|
||||
server *grpc.Server
|
||||
|
||||
// activePods is a method for listing active pods on the node
|
||||
// so the amount of pluginResources requested by existing pods
|
||||
// could be counted when updating allocated devices
|
||||
activePods ActivePodsFunc
|
||||
|
||||
// callback is used for updating devices' states in one time call.
|
||||
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
||||
callback monitorCallback
|
||||
|
||||
// allDevices contains all of registered resourceNames and their exported device IDs.
|
||||
allDevices map[string]sets.String
|
||||
|
||||
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
|
||||
allocatedDevices map[string]sets.String
|
||||
|
||||
// podDevices contains pod to allocated device mapping.
|
||||
podDevices podDevices
|
||||
}
|
||||
|
||||
// NewManagerImpl creates a new manager on the socket `socketPath`.
|
||||
// f is the callback that is called when a device becomes unhealthy.
|
||||
// socketPath is present for testing purposes in production this is pluginapi.KubeletSocket
|
||||
func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error) {
|
||||
// NewManagerImpl creates a new manager. updateCapacityFunc is called to
|
||||
// update ContainerManager capacity when device capacity changes.
|
||||
func NewManagerImpl(updateCapacityFunc func(v1.ResourceList)) (*ManagerImpl, error) {
|
||||
return newManagerImpl(updateCapacityFunc, pluginapi.KubeletSocket)
|
||||
}
|
||||
|
||||
func newManagerImpl(updateCapacityFunc func(v1.ResourceList), socketPath string) (*ManagerImpl, error) {
|
||||
glog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
||||
|
||||
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
||||
|
@ -56,13 +89,42 @@ func NewManagerImpl(socketPath string, f MonitorCallback) (*ManagerImpl, error)
|
|||
}
|
||||
|
||||
dir, file := filepath.Split(socketPath)
|
||||
return &ManagerImpl{
|
||||
endpoints: make(map[string]*endpoint),
|
||||
|
||||
manager := &ManagerImpl{
|
||||
endpoints: make(map[string]endpoint),
|
||||
socketname: file,
|
||||
socketdir: dir,
|
||||
callback: f,
|
||||
}, nil
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
|
||||
manager.callback = func(resourceName string, added, updated, deleted []pluginapi.Device) {
|
||||
var capacity = v1.ResourceList{}
|
||||
kept := append(updated, added...)
|
||||
|
||||
manager.mutex.Lock()
|
||||
defer manager.mutex.Unlock()
|
||||
|
||||
if _, ok := manager.allDevices[resourceName]; !ok {
|
||||
manager.allDevices[resourceName] = sets.NewString()
|
||||
}
|
||||
// For now, Manager only keeps track of healthy devices.
|
||||
// We can revisit this later when the need comes to track unhealthy devices here.
|
||||
for _, dev := range kept {
|
||||
if dev.Health == pluginapi.Healthy {
|
||||
manager.allDevices[resourceName].Insert(dev.ID)
|
||||
} else {
|
||||
manager.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
}
|
||||
for _, dev := range deleted {
|
||||
manager.allDevices[resourceName].Delete(dev.ID)
|
||||
}
|
||||
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(manager.allDevices[resourceName].Len()), resource.DecimalSI)
|
||||
updateCapacityFunc(capacity)
|
||||
}
|
||||
|
||||
return manager, nil
|
||||
}
|
||||
|
||||
func (m *ManagerImpl) removeContents(dir string) error {
|
||||
|
@ -77,7 +139,7 @@ func (m *ManagerImpl) removeContents(dir string) error {
|
|||
}
|
||||
for _, name := range names {
|
||||
filePath := filepath.Join(dir, name)
|
||||
if filePath == m.CheckpointFile() {
|
||||
if filePath == m.checkpointFile() {
|
||||
continue
|
||||
}
|
||||
stat, err := os.Stat(filePath)
|
||||
|
@ -101,15 +163,25 @@ const (
|
|||
kubeletDevicePluginCheckpoint = "kubelet_internal_checkpoint"
|
||||
)
|
||||
|
||||
// CheckpointFile returns device plugin checkpoint file path.
|
||||
func (m *ManagerImpl) CheckpointFile() string {
|
||||
// checkpointFile returns device plugin checkpoint file path.
|
||||
func (m *ManagerImpl) checkpointFile() string {
|
||||
return filepath.Join(m.socketdir, kubeletDevicePluginCheckpoint)
|
||||
}
|
||||
|
||||
// Start starts the Device Plugin Manager
|
||||
func (m *ManagerImpl) Start() error {
|
||||
// Start starts the Device Plugin Manager amd start initialization of
|
||||
// podDevices and allocatedDevices information from checkpoint-ed state and
|
||||
// starts device plugin registration service.
|
||||
func (m *ManagerImpl) Start(activePods ActivePodsFunc) error {
|
||||
glog.V(2).Infof("Starting Device Plugin manager")
|
||||
|
||||
m.activePods = activePods
|
||||
|
||||
// Loads in allocatedDevices information from disk.
|
||||
err := m.readCheckpoint()
|
||||
if err != nil {
|
||||
glog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
||||
}
|
||||
|
||||
socketPath := filepath.Join(m.socketdir, m.socketname)
|
||||
os.MkdirAll(m.socketdir, 0755)
|
||||
|
||||
|
@ -130,6 +202,8 @@ func (m *ManagerImpl) Start() error {
|
|||
pluginapi.RegisterRegistrationServer(m.server, m)
|
||||
go m.server.Serve(s)
|
||||
|
||||
glog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -150,22 +224,27 @@ func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
|
|||
|
||||
// Allocate is the call that you can use to allocate a set of devices
|
||||
// from the registered device plugins.
|
||||
func (m *ManagerImpl) Allocate(resourceName string, devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
|
||||
if len(devs) == 0 {
|
||||
return nil, nil
|
||||
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
pod := attrs.Pod
|
||||
// TODO: Reuse devices between init containers and regular containers.
|
||||
for _, container := range pod.Spec.InitContainers {
|
||||
if err := m.allocateContainerResources(pod, &container); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
for _, container := range pod.Spec.Containers {
|
||||
if err := m.allocateContainerResources(pod, &container); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(3).Infof("Recieved allocation request for devices %v for device plugin %s",
|
||||
devs, resourceName)
|
||||
m.mutex.Lock()
|
||||
e, ok := m.endpoints[resourceName]
|
||||
m.mutex.Unlock()
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("Unknown Device Plugin %s", resourceName)
|
||||
// quick return if no pluginResources requested
|
||||
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
||||
return nil
|
||||
}
|
||||
|
||||
return e.allocate(devs)
|
||||
m.sanitizeNodeAllocatable(node)
|
||||
return nil
|
||||
}
|
||||
|
||||
// Register registers a device plugin.
|
||||
|
@ -211,12 +290,16 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||
if ok && old != nil {
|
||||
// Pass devices of previous endpoint into re-registered one,
|
||||
// to avoid potential orphaned devices upon re-registration
|
||||
existingDevs = old.devices
|
||||
devices := make(map[string]pluginapi.Device)
|
||||
for _, device := range old.getDevices() {
|
||||
devices[device.ID] = device
|
||||
}
|
||||
existingDevs = devices
|
||||
}
|
||||
m.mutex.Unlock()
|
||||
|
||||
socketPath := filepath.Join(m.socketdir, r.Endpoint)
|
||||
e, err := newEndpoint(socketPath, r.ResourceName, existingDevs, m.callback)
|
||||
e, err := newEndpointImpl(socketPath, r.ResourceName, existingDevs, m.callback)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
||||
return
|
||||
|
@ -259,3 +342,212 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|||
m.mutex.Unlock()
|
||||
}()
|
||||
}
|
||||
|
||||
// Checkpoints device to container allocation information to disk.
|
||||
func (m *ManagerImpl) writeCheckpoint() error {
|
||||
m.mutex.Lock()
|
||||
data := m.podDevices.toCheckpointData()
|
||||
m.mutex.Unlock()
|
||||
|
||||
dataJSON, err := json.Marshal(data)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
filepath := m.checkpointFile()
|
||||
return ioutil.WriteFile(filepath, dataJSON, 0644)
|
||||
}
|
||||
|
||||
// Reads device to container allocation information from disk, and populates
|
||||
// m.allocatedDevices accordingly.
|
||||
func (m *ManagerImpl) readCheckpoint() error {
|
||||
filepath := m.checkpointFile()
|
||||
content, err := ioutil.ReadFile(filepath)
|
||||
if err != nil && !os.IsNotExist(err) {
|
||||
return fmt.Errorf("failed to read checkpoint file %q: %v", filepath, err)
|
||||
}
|
||||
glog.V(2).Infof("Read checkpoint file %s\n", filepath)
|
||||
var data checkpointData
|
||||
if err := json.Unmarshal(content, &data); err != nil {
|
||||
return fmt.Errorf("failed to unmarshal checkpoint data: %v", err)
|
||||
}
|
||||
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
m.podDevices.fromCheckpointData(data)
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
||||
// terminated pods. Returns error on failure.
|
||||
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
activePodUids := sets.NewString()
|
||||
for _, pod := range activePods {
|
||||
activePodUids.Insert(string(pod.UID))
|
||||
}
|
||||
allocatedPodUids := m.podDevices.pods()
|
||||
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
|
||||
if len(podsToBeRemoved) <= 0 {
|
||||
return
|
||||
}
|
||||
glog.V(5).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
||||
m.podDevices.delete(podsToBeRemoved.List())
|
||||
// Regenerated allocatedDevices after we update pod allocation information.
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
}
|
||||
|
||||
// Returns list of device Ids we need to allocate with Allocate rpc call.
|
||||
// Returns empty list in case we don't need to issue the Allocate rpc call.
|
||||
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int) (sets.String, error) {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
needed := required
|
||||
// Gets list of devices that have already been allocated.
|
||||
// This can happen if a container restarts for example.
|
||||
devices := m.podDevices.containerDevices(podUID, contName, resource)
|
||||
if devices != nil {
|
||||
glog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
|
||||
needed = needed - devices.Len()
|
||||
// A pod's resource is not expected to change once admitted by the API server,
|
||||
// so just fail loudly here. We can revisit this part if this no longer holds.
|
||||
if needed != 0 {
|
||||
return nil, fmt.Errorf("pod %v container %v changed request for resource %v from %v to %v", podUID, contName, resource, devices.Len(), required)
|
||||
}
|
||||
}
|
||||
if needed == 0 {
|
||||
// No change, no work.
|
||||
return nil, nil
|
||||
}
|
||||
devices = sets.NewString()
|
||||
// Needs to allocate additional devices.
|
||||
if m.allocatedDevices[resource] == nil {
|
||||
m.allocatedDevices[resource] = sets.NewString()
|
||||
}
|
||||
// Gets Devices in use.
|
||||
devicesInUse := m.allocatedDevices[resource]
|
||||
// Gets a list of available devices.
|
||||
available := m.allDevices[resource].Difference(devicesInUse)
|
||||
if int(available.Len()) < needed {
|
||||
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
|
||||
}
|
||||
allocated := available.UnsortedList()[:needed]
|
||||
// Updates m.allocatedDevices with allocated devices to prevent them
|
||||
// from being allocated to other pods/containers, given that we are
|
||||
// not holding lock during the rpc call.
|
||||
for _, device := range allocated {
|
||||
m.allocatedDevices[resource].Insert(device)
|
||||
devices.Insert(device)
|
||||
}
|
||||
return devices, nil
|
||||
}
|
||||
|
||||
// allocateContainerResources attempts to allocate all of required device
|
||||
// plugin resources for the input container, issues an Allocate rpc request
|
||||
// for each new device resource requirement, processes their AllocateResponses,
|
||||
// and updates the cached containerDevices on success.
|
||||
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container) error {
|
||||
podUID := string(pod.UID)
|
||||
contName := container.Name
|
||||
allocatedDevicesUpdated := false
|
||||
for k, v := range container.Resources.Limits {
|
||||
resource := string(k)
|
||||
needed := int(v.Value())
|
||||
glog.V(3).Infof("needs %d %s", needed, resource)
|
||||
if _, registeredResource := m.allDevices[resource]; !registeredResource {
|
||||
continue
|
||||
}
|
||||
// Updates allocatedDevices to garbage collect any stranded resources
|
||||
// before doing the device plugin allocation.
|
||||
if !allocatedDevicesUpdated {
|
||||
m.updateAllocatedDevices(m.activePods())
|
||||
allocatedDevicesUpdated = true
|
||||
}
|
||||
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if allocDevices == nil || len(allocDevices) <= 0 {
|
||||
continue
|
||||
}
|
||||
// devicePluginManager.Allocate involves RPC calls to device plugin, which
|
||||
// could be heavy-weight. Therefore we want to perform this operation outside
|
||||
// mutex lock. Note if Allocate call fails, we may leave container resources
|
||||
// partially allocated for the failed container. We rely on updateAllocatedDevices()
|
||||
// to garbage collect these resources later. Another side effect is that if
|
||||
// we have X resource A and Y resource B in total, and two containers, container1
|
||||
// and container2 both require X resource A and Y resource B. Both allocation
|
||||
// requests may fail if we serve them in mixed order.
|
||||
// TODO: may revisit this part later if we see inefficient resource allocation
|
||||
// in real use as the result of this. Should also consider to parallize device
|
||||
// plugin Allocate grpc calls if it becomes common that a container may require
|
||||
// resources from multiple device plugins.
|
||||
m.mutex.Lock()
|
||||
e, ok := m.endpoints[resource]
|
||||
m.mutex.Unlock()
|
||||
if !ok {
|
||||
m.mutex.Lock()
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
m.mutex.Unlock()
|
||||
return fmt.Errorf("Unknown Device Plugin %s", resource)
|
||||
}
|
||||
|
||||
devs := allocDevices.UnsortedList()
|
||||
glog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
|
||||
resp, err := e.allocate(devs)
|
||||
if err != nil {
|
||||
// In case of allocation failure, we want to restore m.allocatedDevices
|
||||
// to the actual allocated state from m.podDevices.
|
||||
m.mutex.Lock()
|
||||
m.allocatedDevices = m.podDevices.devices()
|
||||
m.mutex.Unlock()
|
||||
return err
|
||||
}
|
||||
|
||||
// Update internal cached podDevices state.
|
||||
m.mutex.Lock()
|
||||
m.podDevices.insert(podUID, contName, resource, allocDevices, resp)
|
||||
m.mutex.Unlock()
|
||||
}
|
||||
|
||||
// Checkpoints device to container allocation information.
|
||||
return m.writeCheckpoint()
|
||||
}
|
||||
|
||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||
// for the found one. An empty struct is returned in case no cached state is found.
|
||||
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||
m.mutex.Lock()
|
||||
defer m.mutex.Unlock()
|
||||
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name)
|
||||
}
|
||||
|
||||
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
|
||||
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
||||
// the allocated capacity. This allows pods that have already been scheduled on
|
||||
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
||||
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
||||
var newAllocatableResource *schedulercache.Resource
|
||||
allocatableResource := node.AllocatableResource()
|
||||
if allocatableResource.ScalarResources == nil {
|
||||
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
||||
}
|
||||
for resource, devices := range m.allocatedDevices {
|
||||
needed := devices.Len()
|
||||
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
||||
if ok && int(quant) >= needed {
|
||||
continue
|
||||
}
|
||||
// Needs to update nodeInfo.AllocatableResource to make sure
|
||||
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
||||
if newAllocatableResource == nil {
|
||||
newAllocatableResource = allocatableResource.Clone()
|
||||
}
|
||||
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
||||
}
|
||||
if newAllocatableResource != nil {
|
||||
node.SetAllocatableResource(newAllocatableResource)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,30 +23,35 @@ import (
|
|||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
// HandlerStub provides a simple stub implementation for Handler.
|
||||
type HandlerStub struct{}
|
||||
// ManagerStub provides a simple stub implementation for the Device Manager.
|
||||
type ManagerStub struct{}
|
||||
|
||||
// NewHandlerStub creates a HandlerStub.
|
||||
func NewHandlerStub() (*HandlerStub, error) {
|
||||
return &HandlerStub{}, nil
|
||||
// NewManagerStub creates a ManagerStub.
|
||||
func NewManagerStub() (*ManagerStub, error) {
|
||||
return &ManagerStub{}, nil
|
||||
}
|
||||
|
||||
// Start simply returns nil.
|
||||
func (h *HandlerStub) Start(activePods ActivePodsFunc) error {
|
||||
func (h *ManagerStub) Start(activePods ActivePodsFunc) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Stop simply returns nil.
|
||||
func (h *ManagerStub) Stop() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Devices returns an empty map.
|
||||
func (h *HandlerStub) Devices() map[string][]pluginapi.Device {
|
||||
func (h *ManagerStub) Devices() map[string][]pluginapi.Device {
|
||||
return make(map[string][]pluginapi.Device)
|
||||
}
|
||||
|
||||
// Allocate simply returns nil.
|
||||
func (h *HandlerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
func (h *ManagerStub) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetDeviceRunContainerOptions simply returns nil.
|
||||
func (h *HandlerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||
func (h *ManagerStub) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions {
|
||||
return nil
|
||||
}
|
|
@ -17,13 +17,23 @@ limitations under the License.
|
|||
package deviceplugin
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/resource"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/uuid"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
const (
|
||||
|
@ -33,10 +43,8 @@ const (
|
|||
)
|
||||
|
||||
func TestNewManagerImpl(t *testing.T) {
|
||||
_, err := NewManagerImpl("", func(n string, a, u, r []pluginapi.Device) {})
|
||||
require.Error(t, err)
|
||||
|
||||
_, err = NewManagerImpl(socketName, func(n string, a, u, r []pluginapi.Device) {})
|
||||
verifyCapacityFunc := func(updates v1.ResourceList) {}
|
||||
_, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||
require.NoError(t, err)
|
||||
}
|
||||
|
||||
|
@ -72,6 +80,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
|||
m, p1 := setup(t, devs, callback)
|
||||
p1.Register(socketName, testResourceName)
|
||||
// Wait for the first callback to be issued.
|
||||
|
||||
<-callbackChan
|
||||
// Wait till the endpoint is added to the manager.
|
||||
for i := 0; i < 20; i++ {
|
||||
|
@ -113,10 +122,17 @@ func TestDevicePluginReRegistration(t *testing.T) {
|
|||
|
||||
}
|
||||
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback MonitorCallback) (Manager, *Stub) {
|
||||
m, err := NewManagerImpl(socketName, callback)
|
||||
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback) (Manager, *Stub) {
|
||||
updateCapacity := func(v1.ResourceList) {}
|
||||
m, err := newManagerImpl(updateCapacity, socketName)
|
||||
require.NoError(t, err)
|
||||
err = m.Start()
|
||||
|
||||
m.callback = callback
|
||||
|
||||
activePods := func() []*v1.Pod {
|
||||
return []*v1.Pod{}
|
||||
}
|
||||
err = m.Start(activePods)
|
||||
require.NoError(t, err)
|
||||
|
||||
p := NewDevicePluginStub(devs, pluginSocketName)
|
||||
|
@ -130,3 +146,387 @@ func cleanup(t *testing.T, m Manager, p *Stub) {
|
|||
p.Stop()
|
||||
m.Stop()
|
||||
}
|
||||
|
||||
func TestUpdateCapacity(t *testing.T) {
|
||||
var expected = v1.ResourceList{}
|
||||
as := assert.New(t)
|
||||
verifyCapacityFunc := func(updates v1.ResourceList) {
|
||||
as.Equal(expected, updates)
|
||||
}
|
||||
testManager, err := newManagerImpl(verifyCapacityFunc, socketName)
|
||||
as.NotNil(testManager)
|
||||
as.Nil(err)
|
||||
|
||||
devs := []pluginapi.Device{
|
||||
{ID: "Device1", Health: pluginapi.Healthy},
|
||||
{ID: "Device2", Health: pluginapi.Healthy},
|
||||
{ID: "Device3", Health: pluginapi.Unhealthy},
|
||||
}
|
||||
|
||||
resourceName := "resource1"
|
||||
// Adds three devices for resource1, two healthy and one unhealthy.
|
||||
// Expects capacity for resource1 to be 2.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testManager.callback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
// Deletes an unhealthy device should NOT change capacity.
|
||||
testManager.callback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
|
||||
// Updates a healthy device to unhealthy should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
// Deletes a healthy device should reduce capacity by 1.
|
||||
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(0), resource.DecimalSI)
|
||||
// Tests adding another resource.
|
||||
delete(expected, v1.ResourceName(resourceName))
|
||||
resourceName2 := "resource2"
|
||||
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
testManager.callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
|
||||
}
|
||||
|
||||
type stringPairType struct {
|
||||
value1 string
|
||||
value2 string
|
||||
}
|
||||
|
||||
func constructDevices(devices []string) sets.String {
|
||||
ret := sets.NewString()
|
||||
for _, dev := range devices {
|
||||
ret.Insert(dev)
|
||||
}
|
||||
return ret
|
||||
}
|
||||
|
||||
func constructAllocResp(devices, mounts, envs map[string]string) *pluginapi.AllocateResponse {
|
||||
resp := &pluginapi.AllocateResponse{}
|
||||
for k, v := range devices {
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
HostPath: k,
|
||||
ContainerPath: v,
|
||||
Permissions: "mrw",
|
||||
})
|
||||
}
|
||||
for k, v := range mounts {
|
||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||
ContainerPath: k,
|
||||
HostPath: v,
|
||||
ReadOnly: true,
|
||||
})
|
||||
}
|
||||
resp.Envs = make(map[string]string)
|
||||
for k, v := range envs {
|
||||
resp.Envs[k] = v
|
||||
}
|
||||
return resp
|
||||
}
|
||||
|
||||
func TestCheckpoint(t *testing.T) {
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
|
||||
testManager := &ManagerImpl{
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
|
||||
testManager.podDevices.insert("pod1", "con1", resourceName1,
|
||||
constructDevices([]string{"dev1", "dev2"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev1": "/dev/r1dev1", "/dev/r1dev2": "/dev/r1dev2"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
testManager.podDevices.insert("pod1", "con1", resourceName2,
|
||||
constructDevices([]string{"dev1", "dev2"}),
|
||||
constructAllocResp(map[string]string{"/dev/r2dev1": "/dev/r2dev1", "/dev/r2dev2": "/dev/r2dev2"},
|
||||
map[string]string{"/home/r2lib1": "/usr/r2lib1"},
|
||||
map[string]string{"r2devices": "dev1 dev2"}))
|
||||
testManager.podDevices.insert("pod1", "con2", resourceName1,
|
||||
constructDevices([]string{"dev3"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev3": "/dev/r1dev3"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
testManager.podDevices.insert("pod2", "con1", resourceName1,
|
||||
constructDevices([]string{"dev4"}),
|
||||
constructAllocResp(map[string]string{"/dev/r1dev4": "/dev/r1dev4"},
|
||||
map[string]string{"/home/r1lib1": "/usr/r1lib1"}, map[string]string{}))
|
||||
|
||||
expectedPodDevices := testManager.podDevices
|
||||
expectedAllocatedDevices := testManager.podDevices.devices()
|
||||
|
||||
err := testManager.writeCheckpoint()
|
||||
as := assert.New(t)
|
||||
|
||||
as.Nil(err)
|
||||
testManager.podDevices = make(podDevices)
|
||||
err = testManager.readCheckpoint()
|
||||
as.Nil(err)
|
||||
|
||||
as.Equal(len(expectedPodDevices), len(testManager.podDevices))
|
||||
for podUID, containerDevices := range expectedPodDevices {
|
||||
for conName, resources := range containerDevices {
|
||||
for resource := range resources {
|
||||
as.True(reflect.DeepEqual(
|
||||
expectedPodDevices.containerDevices(podUID, conName, resource),
|
||||
testManager.podDevices.containerDevices(podUID, conName, resource)))
|
||||
opts1 := expectedPodDevices.deviceRunContainerOptions(podUID, conName)
|
||||
opts2 := testManager.podDevices.deviceRunContainerOptions(podUID, conName)
|
||||
as.Equal(len(opts1.Envs), len(opts2.Envs))
|
||||
as.Equal(len(opts1.Mounts), len(opts2.Mounts))
|
||||
as.Equal(len(opts1.Devices), len(opts2.Devices))
|
||||
}
|
||||
}
|
||||
}
|
||||
as.True(reflect.DeepEqual(expectedAllocatedDevices, testManager.allocatedDevices))
|
||||
}
|
||||
|
||||
type activePodsStub struct {
|
||||
activePods []*v1.Pod
|
||||
}
|
||||
|
||||
func (a *activePodsStub) getActivePods() []*v1.Pod {
|
||||
return a.activePods
|
||||
}
|
||||
|
||||
func (a *activePodsStub) updateActivePods(newPods []*v1.Pod) {
|
||||
a.activePods = newPods
|
||||
}
|
||||
|
||||
type MockEndpoint struct {
|
||||
allocateFunc func(devs []string) (*pluginapi.AllocateResponse, error)
|
||||
}
|
||||
|
||||
func (m *MockEndpoint) stop() {}
|
||||
func (m *MockEndpoint) run() {}
|
||||
|
||||
func (m *MockEndpoint) getDevices() []pluginapi.Device {
|
||||
return []pluginapi.Device{}
|
||||
}
|
||||
|
||||
func (m *MockEndpoint) callback(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
|
||||
func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
if m.allocateFunc != nil {
|
||||
return m.allocateFunc(devs)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
func TestPodContainerDeviceAllocation(t *testing.T) {
|
||||
flag.Set("alsologtostderr", fmt.Sprintf("%t", true))
|
||||
var logLevel string
|
||||
flag.StringVar(&logLevel, "logLevel", "4", "test")
|
||||
flag.Lookup("v").Value.Set(logLevel)
|
||||
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
resourceQuantity1 := *resource.NewQuantity(int64(2), resource.DecimalSI)
|
||||
devID1 := "dev1"
|
||||
devID2 := "dev2"
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
resourceQuantity2 := *resource.NewQuantity(int64(1), resource.DecimalSI)
|
||||
devID3 := "dev3"
|
||||
devID4 := "dev4"
|
||||
|
||||
as := require.New(t)
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
podsStub := activePodsStub{
|
||||
activePods: []*v1.Pod{},
|
||||
}
|
||||
cachedNode := &v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Allocatable: v1.ResourceList{},
|
||||
},
|
||||
}
|
||||
nodeInfo := &schedulercache.NodeInfo{}
|
||||
nodeInfo.SetNode(cachedNode)
|
||||
|
||||
testManager := &ManagerImpl{
|
||||
callback: monitorCallback,
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
endpoints: make(map[string]endpoint),
|
||||
podDevices: make(podDevices),
|
||||
activePods: podsStub.getActivePods,
|
||||
}
|
||||
|
||||
testManager.allDevices[resourceName1] = sets.NewString()
|
||||
testManager.allDevices[resourceName1].Insert(devID1)
|
||||
testManager.allDevices[resourceName1].Insert(devID2)
|
||||
testManager.allDevices[resourceName2] = sets.NewString()
|
||||
testManager.allDevices[resourceName2].Insert(devID3)
|
||||
testManager.allDevices[resourceName2].Insert(devID4)
|
||||
|
||||
testManager.endpoints[resourceName1] = &MockEndpoint{
|
||||
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
resp := new(pluginapi.AllocateResponse)
|
||||
resp.Envs = make(map[string]string)
|
||||
for _, dev := range devs {
|
||||
switch dev {
|
||||
case "dev1":
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
ContainerPath: "/dev/aaa",
|
||||
HostPath: "/dev/aaa",
|
||||
Permissions: "mrw",
|
||||
})
|
||||
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
ContainerPath: "/dev/bbb",
|
||||
HostPath: "/dev/bbb",
|
||||
Permissions: "mrw",
|
||||
})
|
||||
|
||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||
ContainerPath: "/container_dir1/file1",
|
||||
HostPath: "host_dir1/file1",
|
||||
ReadOnly: true,
|
||||
})
|
||||
|
||||
case "dev2":
|
||||
resp.Devices = append(resp.Devices, &pluginapi.DeviceSpec{
|
||||
ContainerPath: "/dev/ccc",
|
||||
HostPath: "/dev/ccc",
|
||||
Permissions: "mrw",
|
||||
})
|
||||
|
||||
resp.Mounts = append(resp.Mounts, &pluginapi.Mount{
|
||||
ContainerPath: "/container_dir1/file2",
|
||||
HostPath: "host_dir1/file2",
|
||||
ReadOnly: true,
|
||||
})
|
||||
|
||||
resp.Envs["key1"] = "val1"
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
},
|
||||
}
|
||||
|
||||
testManager.endpoints[resourceName2] = &MockEndpoint{
|
||||
allocateFunc: func(devs []string) (*pluginapi.AllocateResponse, error) {
|
||||
resp := new(pluginapi.AllocateResponse)
|
||||
resp.Envs = make(map[string]string)
|
||||
for _, dev := range devs {
|
||||
switch dev {
|
||||
case "dev3":
|
||||
resp.Envs["key2"] = "val2"
|
||||
|
||||
case "dev4":
|
||||
resp.Envs["key2"] = "val3"
|
||||
}
|
||||
}
|
||||
return resp, nil
|
||||
},
|
||||
}
|
||||
|
||||
pod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName1): resourceQuantity1,
|
||||
v1.ResourceName("cpu"): resourceQuantity1,
|
||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
podsStub.updateActivePods([]*v1.Pod{pod})
|
||||
err := testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: pod})
|
||||
as.Nil(err)
|
||||
runContainerOpts := testManager.GetDeviceRunContainerOptions(pod, &pod.Spec.Containers[0])
|
||||
as.NotNil(runContainerOpts)
|
||||
as.Equal(len(runContainerOpts.Devices), 3)
|
||||
as.Equal(len(runContainerOpts.Mounts), 2)
|
||||
as.Equal(len(runContainerOpts.Envs), 2)
|
||||
|
||||
// Requesting to create a pod without enough resources should fail.
|
||||
as.Equal(2, testManager.allocatedDevices[resourceName1].Len())
|
||||
failPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName1): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: failPod})
|
||||
as.NotNil(err)
|
||||
runContainerOpts2 := testManager.GetDeviceRunContainerOptions(failPod, &failPod.Spec.Containers[0])
|
||||
as.Nil(runContainerOpts2)
|
||||
|
||||
// Requesting to create a new pod with a single resourceName2 should succeed.
|
||||
newPod := &v1.Pod{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
UID: uuid.NewUUID(),
|
||||
},
|
||||
Spec: v1.PodSpec{
|
||||
Containers: []v1.Container{
|
||||
{
|
||||
Name: string(uuid.NewUUID()),
|
||||
Resources: v1.ResourceRequirements{
|
||||
Limits: v1.ResourceList{
|
||||
v1.ResourceName(resourceName2): resourceQuantity2,
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
err = testManager.Allocate(nodeInfo, &lifecycle.PodAdmitAttributes{Pod: newPod})
|
||||
as.Nil(err)
|
||||
runContainerOpts3 := testManager.GetDeviceRunContainerOptions(newPod, &newPod.Spec.Containers[0])
|
||||
as.Equal(1, len(runContainerOpts3.Envs))
|
||||
}
|
||||
|
||||
func TestSanitizeNodeAllocatable(t *testing.T) {
|
||||
resourceName1 := "domain1.com/resource1"
|
||||
devID1 := "dev1"
|
||||
|
||||
resourceName2 := "domain2.com/resource2"
|
||||
devID2 := "dev2"
|
||||
|
||||
as := assert.New(t)
|
||||
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
|
||||
|
||||
testManager := &ManagerImpl{
|
||||
callback: monitorCallback,
|
||||
allDevices: make(map[string]sets.String),
|
||||
allocatedDevices: make(map[string]sets.String),
|
||||
podDevices: make(podDevices),
|
||||
}
|
||||
// require one of resource1 and one of resource2
|
||||
testManager.allocatedDevices[resourceName1] = sets.NewString()
|
||||
testManager.allocatedDevices[resourceName1].Insert(devID1)
|
||||
testManager.allocatedDevices[resourceName2] = sets.NewString()
|
||||
testManager.allocatedDevices[resourceName2].Insert(devID2)
|
||||
|
||||
cachedNode := &v1.Node{
|
||||
Status: v1.NodeStatus{
|
||||
Allocatable: v1.ResourceList{
|
||||
// has no resource1 and two of resource2
|
||||
v1.ResourceName(resourceName2): *resource.NewQuantity(int64(2), resource.DecimalSI),
|
||||
},
|
||||
},
|
||||
}
|
||||
nodeInfo := &schedulercache.NodeInfo{}
|
||||
nodeInfo.SetNode(cachedNode)
|
||||
|
||||
testManager.sanitizeNodeAllocatable(nodeInfo)
|
||||
|
||||
allocatableScalarResources := nodeInfo.AllocatableResource().ScalarResources
|
||||
// allocatable in nodeInfo is less than needed, should update
|
||||
as.Equal(1, int(allocatableScalarResources[v1.ResourceName(resourceName1)]))
|
||||
// allocatable in nodeInfo is more than needed, should skip updating
|
||||
as.Equal(2, int(allocatableScalarResources[v1.ResourceName(resourceName2)]))
|
||||
}
|
||||
|
|
|
@ -116,6 +116,11 @@ func (pdev podDevices) toCheckpointData() checkpointData {
|
|||
for conName, resources := range containerDevices {
|
||||
for resource, devices := range resources {
|
||||
devIds := devices.deviceIds.UnsortedList()
|
||||
if devices.allocResp == nil {
|
||||
glog.Errorf("Can't marshal allocResp for %v %v %v: allocation response is missing", podUID, conName, resource)
|
||||
continue
|
||||
}
|
||||
|
||||
allocResp, err := devices.allocResp.Marshal()
|
||||
if err != nil {
|
||||
glog.Errorf("Can't marshal allocResp for %v %v %v: %v", podUID, conName, resource, err)
|
||||
|
|
|
@ -17,34 +17,40 @@ limitations under the License.
|
|||
package deviceplugin
|
||||
|
||||
import (
|
||||
"k8s.io/api/core/v1"
|
||||
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
||||
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
|
||||
)
|
||||
|
||||
// MonitorCallback is the function called when a device's health state changes,
|
||||
// or new devices are reported, or old devices are deleted.
|
||||
// Updated contains the most recent state of the Device.
|
||||
type MonitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
|
||||
|
||||
// Manager manages all the Device Plugins running on a node.
|
||||
type Manager interface {
|
||||
// Start starts the gRPC Registration service.
|
||||
Start() error
|
||||
// Start starts device plugin registration service.
|
||||
Start(activePods ActivePodsFunc) error
|
||||
|
||||
// Devices is the map of devices that have registered themselves
|
||||
// against the manager.
|
||||
// The map key is the ResourceName of the device plugins.
|
||||
Devices() map[string][]pluginapi.Device
|
||||
|
||||
// Allocate takes resourceName and list of device Ids, and calls the
|
||||
// gRPC Allocate on the device plugin matching the resourceName.
|
||||
Allocate(string, []string) (*pluginapi.AllocateResponse, error)
|
||||
// Allocate configures and assigns devices to pods. The pods are provided
|
||||
// through the pod admission attributes in the attrs argument. From the
|
||||
// requested device resources, Allocate will communicate with the owning
|
||||
// device plugin to allow setup procedures to take place, and for the
|
||||
// device plugin to provide runtime settings to use the device (environment
|
||||
// variables, mount points and device files). The node object is provided
|
||||
// for the device manager to update the node capacity to reflect the
|
||||
// currently available devices.
|
||||
Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error
|
||||
|
||||
// Stop stops the manager.
|
||||
Stop() error
|
||||
|
||||
// Returns checkpoint file path.
|
||||
CheckpointFile() string
|
||||
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
||||
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
||||
// for the found one. An empty struct is returned in case no cached state is found.
|
||||
GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) *DeviceRunContainerOptions
|
||||
}
|
||||
|
||||
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.
|
||||
|
|
Loading…
Reference in New Issue