mirror of https://github.com/k3s-io/k3s
805 lines
29 KiB
Go
805 lines
29 KiB
Go
/*
|
|
Copyright 2017 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package devicemanager
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"net"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
|
|
"google.golang.org/grpc"
|
|
"k8s.io/klog"
|
|
|
|
"k8s.io/api/core/v1"
|
|
"k8s.io/apimachinery/pkg/api/resource"
|
|
"k8s.io/apimachinery/pkg/util/sets"
|
|
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
|
|
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
|
|
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
|
|
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager/errors"
|
|
"k8s.io/kubernetes/pkg/kubelet/cm/devicemanager/checkpoint"
|
|
"k8s.io/kubernetes/pkg/kubelet/config"
|
|
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
|
|
"k8s.io/kubernetes/pkg/kubelet/metrics"
|
|
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
|
|
schedulercache "k8s.io/kubernetes/pkg/scheduler/cache"
|
|
)
|
|
|
|
// ActivePodsFunc is a function that returns a list of pods to reconcile.
|
|
type ActivePodsFunc func() []*v1.Pod
|
|
|
|
// monitorCallback is the function called when a device's health state changes,
|
|
// or new devices are reported, or old devices are deleted.
|
|
// Updated contains the most recent state of the Device.
|
|
type monitorCallback func(resourceName string, devices []pluginapi.Device)
|
|
|
|
// ManagerImpl is the structure in charge of managing Device Plugins.
|
|
type ManagerImpl struct {
|
|
socketname string
|
|
socketdir string
|
|
|
|
endpoints map[string]endpointInfo // Key is ResourceName
|
|
mutex sync.Mutex
|
|
|
|
server *grpc.Server
|
|
wg sync.WaitGroup
|
|
|
|
// activePods is a method for listing active pods on the node
|
|
// so the amount of pluginResources requested by existing pods
|
|
// could be counted when updating allocated devices
|
|
activePods ActivePodsFunc
|
|
|
|
// sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness.
|
|
// We use it to determine when we can purge inactive pods from checkpointed state.
|
|
sourcesReady config.SourcesReady
|
|
|
|
// callback is used for updating devices' states in one time call.
|
|
// e.g. a new device is advertised, two old devices are deleted and a running device fails.
|
|
callback monitorCallback
|
|
|
|
// healthyDevices contains all of the registered healthy resourceNames and their exported device IDs.
|
|
healthyDevices map[string]sets.String
|
|
|
|
// unhealthyDevices contains all of the unhealthy devices and their exported device IDs.
|
|
unhealthyDevices map[string]sets.String
|
|
|
|
// allocatedDevices contains allocated deviceIds, keyed by resourceName.
|
|
allocatedDevices map[string]sets.String
|
|
|
|
// podDevices contains pod to allocated device mapping.
|
|
podDevices podDevices
|
|
checkpointManager checkpointmanager.CheckpointManager
|
|
}
|
|
|
|
type endpointInfo struct {
|
|
e endpoint
|
|
opts *pluginapi.DevicePluginOptions
|
|
}
|
|
|
|
type sourcesReadyStub struct{}
|
|
|
|
func (s *sourcesReadyStub) AddSource(source string) {}
|
|
func (s *sourcesReadyStub) AllReady() bool { return true }
|
|
|
|
// NewManagerImpl creates a new manager.
|
|
func NewManagerImpl() (*ManagerImpl, error) {
|
|
return newManagerImpl(pluginapi.KubeletSocket)
|
|
}
|
|
|
|
func newManagerImpl(socketPath string) (*ManagerImpl, error) {
|
|
klog.V(2).Infof("Creating Device Plugin manager at %s", socketPath)
|
|
|
|
if socketPath == "" || !filepath.IsAbs(socketPath) {
|
|
return nil, fmt.Errorf(errBadSocket+" %s", socketPath)
|
|
}
|
|
|
|
dir, file := filepath.Split(socketPath)
|
|
manager := &ManagerImpl{
|
|
endpoints: make(map[string]endpointInfo),
|
|
|
|
socketname: file,
|
|
socketdir: dir,
|
|
healthyDevices: make(map[string]sets.String),
|
|
unhealthyDevices: make(map[string]sets.String),
|
|
allocatedDevices: make(map[string]sets.String),
|
|
podDevices: make(podDevices),
|
|
}
|
|
manager.callback = manager.genericDeviceUpdateCallback
|
|
|
|
// The following structs are populated with real implementations in manager.Start()
|
|
// Before that, initializes them to perform no-op operations.
|
|
manager.activePods = func() []*v1.Pod { return []*v1.Pod{} }
|
|
manager.sourcesReady = &sourcesReadyStub{}
|
|
checkpointManager, err := checkpointmanager.NewCheckpointManager(dir)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize checkpoint manager: %v", err)
|
|
}
|
|
manager.checkpointManager = checkpointManager
|
|
|
|
return manager, nil
|
|
}
|
|
|
|
func (m *ManagerImpl) genericDeviceUpdateCallback(resourceName string, devices []pluginapi.Device) {
|
|
m.mutex.Lock()
|
|
m.healthyDevices[resourceName] = sets.NewString()
|
|
m.unhealthyDevices[resourceName] = sets.NewString()
|
|
for _, dev := range devices {
|
|
if dev.Health == pluginapi.Healthy {
|
|
m.healthyDevices[resourceName].Insert(dev.ID)
|
|
} else {
|
|
m.unhealthyDevices[resourceName].Insert(dev.ID)
|
|
}
|
|
}
|
|
m.mutex.Unlock()
|
|
m.writeCheckpoint()
|
|
}
|
|
|
|
func (m *ManagerImpl) removeContents(dir string) error {
|
|
d, err := os.Open(dir)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer d.Close()
|
|
names, err := d.Readdirnames(-1)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
for _, name := range names {
|
|
filePath := filepath.Join(dir, name)
|
|
if filePath == m.checkpointFile() {
|
|
continue
|
|
}
|
|
stat, err := os.Stat(filePath)
|
|
if err != nil {
|
|
klog.Errorf("Failed to stat file %s: %v", filePath, err)
|
|
continue
|
|
}
|
|
if stat.IsDir() {
|
|
continue
|
|
}
|
|
err = os.RemoveAll(filePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// checkpointFile returns device plugin checkpoint file path.
|
|
func (m *ManagerImpl) checkpointFile() string {
|
|
return filepath.Join(m.socketdir, kubeletDeviceManagerCheckpoint)
|
|
}
|
|
|
|
// Start starts the Device Plugin Manager amd start initialization of
|
|
// podDevices and allocatedDevices information from checkpoint-ed state and
|
|
// starts device plugin registration service.
|
|
func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {
|
|
klog.V(2).Infof("Starting Device Plugin manager")
|
|
|
|
m.activePods = activePods
|
|
m.sourcesReady = sourcesReady
|
|
|
|
// Loads in allocatedDevices information from disk.
|
|
err := m.readCheckpoint()
|
|
if err != nil {
|
|
klog.Warningf("Continue after failing to read checkpoint file. Device allocation info may NOT be up-to-date. Err: %v", err)
|
|
}
|
|
|
|
socketPath := filepath.Join(m.socketdir, m.socketname)
|
|
os.MkdirAll(m.socketdir, 0755)
|
|
|
|
// Removes all stale sockets in m.socketdir. Device plugins can monitor
|
|
// this and use it as a signal to re-register with the new Kubelet.
|
|
if err := m.removeContents(m.socketdir); err != nil {
|
|
klog.Errorf("Fail to clean up stale contents under %s: %v", m.socketdir, err)
|
|
}
|
|
|
|
s, err := net.Listen("unix", socketPath)
|
|
if err != nil {
|
|
klog.Errorf(errListenSocket+" %v", err)
|
|
return err
|
|
}
|
|
|
|
m.wg.Add(1)
|
|
m.server = grpc.NewServer([]grpc.ServerOption{}...)
|
|
|
|
pluginapi.RegisterRegistrationServer(m.server, m)
|
|
go func() {
|
|
defer m.wg.Done()
|
|
m.server.Serve(s)
|
|
}()
|
|
|
|
klog.V(2).Infof("Serving device plugin registration server on %q", socketPath)
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetWatcherHandler returns the plugin handler
|
|
func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
|
|
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
|
|
klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
|
|
} else {
|
|
f.Close()
|
|
klog.V(4).Infof("created deprecation file %s", f.Name())
|
|
}
|
|
|
|
return watcher.PluginHandler(m)
|
|
}
|
|
|
|
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource
|
|
func (m *ManagerImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
|
|
klog.V(2).Infof("Got Plugin %s at endpoint %s with versions %v", pluginName, endpoint, versions)
|
|
|
|
if !m.isVersionCompatibleWithPlugin(versions) {
|
|
return fmt.Errorf("manager version, %s, is not among plugin supported versions %v", pluginapi.Version, versions)
|
|
}
|
|
|
|
if !v1helper.IsExtendedResourceName(v1.ResourceName(pluginName)) {
|
|
return fmt.Errorf("invalid name of device plugin socket: %s", fmt.Sprintf(errInvalidResourceName, pluginName))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// RegisterPlugin starts the endpoint and registers it
|
|
// TODO: Start the endpoint and wait for the First ListAndWatch call
|
|
// before registering the plugin
|
|
func (m *ManagerImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
|
|
klog.V(2).Infof("Registering Plugin %s at endpoint %s", pluginName, endpoint)
|
|
|
|
e, err := newEndpointImpl(endpoint, pluginName, m.callback)
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to dial device plugin with socketPath %s: %v", endpoint, err)
|
|
}
|
|
|
|
options, err := e.client.GetDevicePluginOptions(context.Background(), &pluginapi.Empty{})
|
|
if err != nil {
|
|
return fmt.Errorf("Failed to get device plugin options: %v", err)
|
|
}
|
|
|
|
m.registerEndpoint(pluginName, options, e)
|
|
go m.runEndpoint(pluginName, e)
|
|
|
|
return nil
|
|
}
|
|
|
|
// DeRegisterPlugin deregisters the plugin
|
|
// TODO work on the behavior for deregistering plugins
|
|
// e.g: Should we delete the resource
|
|
func (m *ManagerImpl) DeRegisterPlugin(pluginName string) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// Note: This will mark the resource unhealthy as per the behavior
|
|
// in runEndpoint
|
|
if eI, ok := m.endpoints[pluginName]; ok {
|
|
eI.e.stop()
|
|
}
|
|
}
|
|
|
|
func (m *ManagerImpl) isVersionCompatibleWithPlugin(versions []string) bool {
|
|
// TODO(vikasc): Currently this is fine as we only have a single supported version. When we do need to support
|
|
// multiple versions in the future, we may need to extend this function to return a supported version.
|
|
// E.g., say kubelet supports v1beta1 and v1beta2, and we get v1alpha1 and v1beta1 from a device plugin,
|
|
// this function should return v1beta1
|
|
for _, version := range versions {
|
|
for _, supportedVersion := range pluginapi.SupportedVersions {
|
|
if version == supportedVersion {
|
|
return true
|
|
}
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// Allocate is the call that you can use to allocate a set of devices
|
|
// from the registered device plugins.
|
|
func (m *ManagerImpl) Allocate(node *schedulercache.NodeInfo, attrs *lifecycle.PodAdmitAttributes) error {
|
|
pod := attrs.Pod
|
|
devicesToReuse := make(map[string]sets.String)
|
|
for _, container := range pod.Spec.InitContainers {
|
|
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
|
|
return err
|
|
}
|
|
m.podDevices.addContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
|
|
}
|
|
for _, container := range pod.Spec.Containers {
|
|
if err := m.allocateContainerResources(pod, &container, devicesToReuse); err != nil {
|
|
return err
|
|
}
|
|
m.podDevices.removeContainerAllocatedResources(string(pod.UID), container.Name, devicesToReuse)
|
|
}
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
// quick return if no pluginResources requested
|
|
if _, podRequireDevicePluginResource := m.podDevices[string(pod.UID)]; !podRequireDevicePluginResource {
|
|
return nil
|
|
}
|
|
|
|
m.sanitizeNodeAllocatable(node)
|
|
return nil
|
|
}
|
|
|
|
// Register registers a device plugin.
|
|
func (m *ManagerImpl) Register(ctx context.Context, r *pluginapi.RegisterRequest) (*pluginapi.Empty, error) {
|
|
klog.Infof("Got registration request from device plugin with resource name %q", r.ResourceName)
|
|
metrics.DevicePluginRegistrationCount.WithLabelValues(r.ResourceName).Inc()
|
|
var versionCompatible bool
|
|
for _, v := range pluginapi.SupportedVersions {
|
|
if r.Version == v {
|
|
versionCompatible = true
|
|
break
|
|
}
|
|
}
|
|
if !versionCompatible {
|
|
errorString := fmt.Sprintf(errUnsupportedVersion, r.Version, pluginapi.SupportedVersions)
|
|
klog.Infof("Bad registration request from device plugin with resource name %q: %s", r.ResourceName, errorString)
|
|
return &pluginapi.Empty{}, fmt.Errorf(errorString)
|
|
}
|
|
|
|
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
|
|
errorString := fmt.Sprintf(errInvalidResourceName, r.ResourceName)
|
|
klog.Infof("Bad registration request from device plugin: %s", errorString)
|
|
return &pluginapi.Empty{}, fmt.Errorf(errorString)
|
|
}
|
|
|
|
// TODO: for now, always accepts newest device plugin. Later may consider to
|
|
// add some policies here, e.g., verify whether an old device plugin with the
|
|
// same resource name is still alive to determine whether we want to accept
|
|
// the new registration.
|
|
go m.addEndpoint(r)
|
|
|
|
return &pluginapi.Empty{}, nil
|
|
}
|
|
|
|
// Stop is the function that can stop the gRPC server.
|
|
// Can be called concurrently, more than once, and is safe to call
|
|
// without a prior Start.
|
|
func (m *ManagerImpl) Stop() error {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
for _, eI := range m.endpoints {
|
|
eI.e.stop()
|
|
}
|
|
|
|
if m.server == nil {
|
|
return nil
|
|
}
|
|
m.server.Stop()
|
|
m.wg.Wait()
|
|
m.server = nil
|
|
return nil
|
|
}
|
|
|
|
func (m *ManagerImpl) registerEndpoint(resourceName string, options *pluginapi.DevicePluginOptions, e endpoint) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
m.endpoints[resourceName] = endpointInfo{e: e, opts: options}
|
|
klog.V(2).Infof("Registered endpoint %v", e)
|
|
}
|
|
|
|
func (m *ManagerImpl) runEndpoint(resourceName string, e endpoint) {
|
|
e.run()
|
|
e.stop()
|
|
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
|
|
if old, ok := m.endpoints[resourceName]; ok && old.e == e {
|
|
m.markResourceUnhealthy(resourceName)
|
|
}
|
|
|
|
klog.V(2).Infof("Endpoint (%s, %v) became unhealthy", resourceName, e)
|
|
}
|
|
|
|
func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
|
|
new, err := newEndpointImpl(filepath.Join(m.socketdir, r.Endpoint), r.ResourceName, m.callback)
|
|
if err != nil {
|
|
klog.Errorf("Failed to dial device plugin with request %v: %v", r, err)
|
|
return
|
|
}
|
|
m.registerEndpoint(r.ResourceName, r.Options, new)
|
|
go func() {
|
|
m.runEndpoint(r.ResourceName, new)
|
|
}()
|
|
}
|
|
|
|
func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
|
|
klog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
|
|
healthyDevices := sets.NewString()
|
|
if _, ok := m.healthyDevices[resourceName]; ok {
|
|
healthyDevices = m.healthyDevices[resourceName]
|
|
m.healthyDevices[resourceName] = sets.NewString()
|
|
}
|
|
if _, ok := m.unhealthyDevices[resourceName]; !ok {
|
|
m.unhealthyDevices[resourceName] = sets.NewString()
|
|
}
|
|
m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
|
|
}
|
|
|
|
// GetCapacity is expected to be called when Kubelet updates its node status.
|
|
// The first returned variable contains the registered device plugin resource capacity.
|
|
// The second returned variable contains the registered device plugin resource allocatable.
|
|
// The third returned variable contains previously registered resources that are no longer active.
|
|
// Kubelet uses this information to update resource capacity/allocatable in its node status.
|
|
// After the call, device plugin can remove the inactive resources from its internal list as the
|
|
// change is already reflected in Kubelet node status.
|
|
// Note in the special case after Kubelet restarts, device plugin resource capacities can
|
|
// temporarily drop to zero till corresponding device plugins re-register. This is OK because
|
|
// cm.UpdatePluginResource() run during predicate Admit guarantees we adjust nodeinfo
|
|
// capacity for already allocated pods so that they can continue to run. However, new pods
|
|
// requiring device plugin resources will not be scheduled till device plugin re-registers.
|
|
func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) {
|
|
needsUpdateCheckpoint := false
|
|
var capacity = v1.ResourceList{}
|
|
var allocatable = v1.ResourceList{}
|
|
deletedResources := sets.NewString()
|
|
m.mutex.Lock()
|
|
for resourceName, devices := range m.healthyDevices {
|
|
eI, ok := m.endpoints[resourceName]
|
|
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
|
|
// The resources contained in endpoints and (un)healthyDevices
|
|
// should always be consistent. Otherwise, we run with the risk
|
|
// of failing to garbage collect non-existing resources or devices.
|
|
if !ok {
|
|
klog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
|
|
}
|
|
delete(m.endpoints, resourceName)
|
|
delete(m.healthyDevices, resourceName)
|
|
deletedResources.Insert(resourceName)
|
|
needsUpdateCheckpoint = true
|
|
} else {
|
|
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
|
|
allocatable[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
|
|
}
|
|
}
|
|
for resourceName, devices := range m.unhealthyDevices {
|
|
eI, ok := m.endpoints[resourceName]
|
|
if (ok && eI.e.stopGracePeriodExpired()) || !ok {
|
|
if !ok {
|
|
klog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
|
|
}
|
|
delete(m.endpoints, resourceName)
|
|
delete(m.unhealthyDevices, resourceName)
|
|
deletedResources.Insert(resourceName)
|
|
needsUpdateCheckpoint = true
|
|
} else {
|
|
capacityCount := capacity[v1.ResourceName(resourceName)]
|
|
unhealthyCount := *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
|
|
capacityCount.Add(unhealthyCount)
|
|
capacity[v1.ResourceName(resourceName)] = capacityCount
|
|
}
|
|
}
|
|
m.mutex.Unlock()
|
|
if needsUpdateCheckpoint {
|
|
m.writeCheckpoint()
|
|
}
|
|
return capacity, allocatable, deletedResources.UnsortedList()
|
|
}
|
|
|
|
// Checkpoints device to container allocation information to disk.
|
|
func (m *ManagerImpl) writeCheckpoint() error {
|
|
m.mutex.Lock()
|
|
registeredDevs := make(map[string][]string)
|
|
for resource, devices := range m.healthyDevices {
|
|
registeredDevs[resource] = devices.UnsortedList()
|
|
}
|
|
data := checkpoint.New(m.podDevices.toCheckpointData(),
|
|
registeredDevs)
|
|
m.mutex.Unlock()
|
|
err := m.checkpointManager.CreateCheckpoint(kubeletDeviceManagerCheckpoint, data)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to write checkpoint file %q: %v", kubeletDeviceManagerCheckpoint, err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Reads device to container allocation information from disk, and populates
|
|
// m.allocatedDevices accordingly.
|
|
func (m *ManagerImpl) readCheckpoint() error {
|
|
registeredDevs := make(map[string][]string)
|
|
devEntries := make([]checkpoint.PodDevicesEntry, 0)
|
|
cp := checkpoint.New(devEntries, registeredDevs)
|
|
err := m.checkpointManager.GetCheckpoint(kubeletDeviceManagerCheckpoint, cp)
|
|
if err != nil {
|
|
if err == errors.ErrCheckpointNotFound {
|
|
klog.Warningf("Failed to retrieve checkpoint for %q: %v", kubeletDeviceManagerCheckpoint, err)
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
podDevices, registeredDevs := cp.GetData()
|
|
m.podDevices.fromCheckpointData(podDevices)
|
|
m.allocatedDevices = m.podDevices.devices()
|
|
for resource := range registeredDevs {
|
|
// During start up, creates empty healthyDevices list so that the resource capacity
|
|
// will stay zero till the corresponding device plugin re-registers.
|
|
m.healthyDevices[resource] = sets.NewString()
|
|
m.unhealthyDevices[resource] = sets.NewString()
|
|
m.endpoints[resource] = endpointInfo{e: newStoppedEndpointImpl(resource), opts: nil}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// updateAllocatedDevices gets a list of active pods and then frees any Devices that are bound to
|
|
// terminated pods. Returns error on failure.
|
|
func (m *ManagerImpl) updateAllocatedDevices(activePods []*v1.Pod) {
|
|
if !m.sourcesReady.AllReady() {
|
|
return
|
|
}
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
activePodUids := sets.NewString()
|
|
for _, pod := range activePods {
|
|
activePodUids.Insert(string(pod.UID))
|
|
}
|
|
allocatedPodUids := m.podDevices.pods()
|
|
podsToBeRemoved := allocatedPodUids.Difference(activePodUids)
|
|
if len(podsToBeRemoved) <= 0 {
|
|
return
|
|
}
|
|
klog.V(3).Infof("pods to be removed: %v", podsToBeRemoved.List())
|
|
m.podDevices.delete(podsToBeRemoved.List())
|
|
// Regenerated allocatedDevices after we update pod allocation information.
|
|
m.allocatedDevices = m.podDevices.devices()
|
|
}
|
|
|
|
// Returns list of device Ids we need to allocate with Allocate rpc call.
|
|
// Returns empty list in case we don't need to issue the Allocate rpc call.
|
|
func (m *ManagerImpl) devicesToAllocate(podUID, contName, resource string, required int, reusableDevices sets.String) (sets.String, error) {
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
needed := required
|
|
// Gets list of devices that have already been allocated.
|
|
// This can happen if a container restarts for example.
|
|
devices := m.podDevices.containerDevices(podUID, contName, resource)
|
|
if devices != nil {
|
|
klog.V(3).Infof("Found pre-allocated devices for resource %s container %q in Pod %q: %v", resource, contName, podUID, devices.List())
|
|
needed = needed - devices.Len()
|
|
// A pod's resource is not expected to change once admitted by the API server,
|
|
// so just fail loudly here. We can revisit this part if this no longer holds.
|
|
if needed != 0 {
|
|
return nil, fmt.Errorf("pod %q container %q changed request for resource %q from %d to %d", podUID, contName, resource, devices.Len(), required)
|
|
}
|
|
}
|
|
if needed == 0 {
|
|
// No change, no work.
|
|
return nil, nil
|
|
}
|
|
klog.V(3).Infof("Needs to allocate %d %q for pod %q container %q", needed, resource, podUID, contName)
|
|
// Needs to allocate additional devices.
|
|
if _, ok := m.healthyDevices[resource]; !ok {
|
|
return nil, fmt.Errorf("can't allocate unregistered device %s", resource)
|
|
}
|
|
devices = sets.NewString()
|
|
// Allocates from reusableDevices list first.
|
|
for device := range reusableDevices {
|
|
devices.Insert(device)
|
|
needed--
|
|
if needed == 0 {
|
|
return devices, nil
|
|
}
|
|
}
|
|
// Needs to allocate additional devices.
|
|
if m.allocatedDevices[resource] == nil {
|
|
m.allocatedDevices[resource] = sets.NewString()
|
|
}
|
|
// Gets Devices in use.
|
|
devicesInUse := m.allocatedDevices[resource]
|
|
// Gets a list of available devices.
|
|
available := m.healthyDevices[resource].Difference(devicesInUse)
|
|
if int(available.Len()) < needed {
|
|
return nil, fmt.Errorf("requested number of devices unavailable for %s. Requested: %d, Available: %d", resource, needed, available.Len())
|
|
}
|
|
allocated := available.UnsortedList()[:needed]
|
|
// Updates m.allocatedDevices with allocated devices to prevent them
|
|
// from being allocated to other pods/containers, given that we are
|
|
// not holding lock during the rpc call.
|
|
for _, device := range allocated {
|
|
m.allocatedDevices[resource].Insert(device)
|
|
devices.Insert(device)
|
|
}
|
|
return devices, nil
|
|
}
|
|
|
|
// allocateContainerResources attempts to allocate all of required device
|
|
// plugin resources for the input container, issues an Allocate rpc request
|
|
// for each new device resource requirement, processes their AllocateResponses,
|
|
// and updates the cached containerDevices on success.
|
|
func (m *ManagerImpl) allocateContainerResources(pod *v1.Pod, container *v1.Container, devicesToReuse map[string]sets.String) error {
|
|
podUID := string(pod.UID)
|
|
contName := container.Name
|
|
allocatedDevicesUpdated := false
|
|
// Extended resources are not allowed to be overcommitted.
|
|
// Since device plugin advertises extended resources,
|
|
// therefore Requests must be equal to Limits and iterating
|
|
// over the Limits should be sufficient.
|
|
for k, v := range container.Resources.Limits {
|
|
resource := string(k)
|
|
needed := int(v.Value())
|
|
klog.V(3).Infof("needs %d %s", needed, resource)
|
|
if !m.isDevicePluginResource(resource) {
|
|
continue
|
|
}
|
|
// Updates allocatedDevices to garbage collect any stranded resources
|
|
// before doing the device plugin allocation.
|
|
if !allocatedDevicesUpdated {
|
|
m.updateAllocatedDevices(m.activePods())
|
|
allocatedDevicesUpdated = true
|
|
}
|
|
allocDevices, err := m.devicesToAllocate(podUID, contName, resource, needed, devicesToReuse[resource])
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if allocDevices == nil || len(allocDevices) <= 0 {
|
|
continue
|
|
}
|
|
|
|
startRPCTime := time.Now()
|
|
// Manager.Allocate involves RPC calls to device plugin, which
|
|
// could be heavy-weight. Therefore we want to perform this operation outside
|
|
// mutex lock. Note if Allocate call fails, we may leave container resources
|
|
// partially allocated for the failed container. We rely on updateAllocatedDevices()
|
|
// to garbage collect these resources later. Another side effect is that if
|
|
// we have X resource A and Y resource B in total, and two containers, container1
|
|
// and container2 both require X resource A and Y resource B. Both allocation
|
|
// requests may fail if we serve them in mixed order.
|
|
// TODO: may revisit this part later if we see inefficient resource allocation
|
|
// in real use as the result of this. Should also consider to parallize device
|
|
// plugin Allocate grpc calls if it becomes common that a container may require
|
|
// resources from multiple device plugins.
|
|
m.mutex.Lock()
|
|
eI, ok := m.endpoints[resource]
|
|
m.mutex.Unlock()
|
|
if !ok {
|
|
m.mutex.Lock()
|
|
m.allocatedDevices = m.podDevices.devices()
|
|
m.mutex.Unlock()
|
|
return fmt.Errorf("Unknown Device Plugin %s", resource)
|
|
}
|
|
|
|
devs := allocDevices.UnsortedList()
|
|
// TODO: refactor this part of code to just append a ContainerAllocationRequest
|
|
// in a passed in AllocateRequest pointer, and issues a single Allocate call per pod.
|
|
klog.V(3).Infof("Making allocation request for devices %v for device plugin %s", devs, resource)
|
|
resp, err := eI.e.allocate(devs)
|
|
metrics.DevicePluginAllocationLatency.WithLabelValues(resource).Observe(metrics.SinceInMicroseconds(startRPCTime))
|
|
if err != nil {
|
|
// In case of allocation failure, we want to restore m.allocatedDevices
|
|
// to the actual allocated state from m.podDevices.
|
|
m.mutex.Lock()
|
|
m.allocatedDevices = m.podDevices.devices()
|
|
m.mutex.Unlock()
|
|
return err
|
|
}
|
|
|
|
if len(resp.ContainerResponses) == 0 {
|
|
return fmt.Errorf("No containers return in allocation response %v", resp)
|
|
}
|
|
|
|
// Update internal cached podDevices state.
|
|
m.mutex.Lock()
|
|
m.podDevices.insert(podUID, contName, resource, allocDevices, resp.ContainerResponses[0])
|
|
m.mutex.Unlock()
|
|
}
|
|
|
|
// Checkpoints device to container allocation information.
|
|
return m.writeCheckpoint()
|
|
}
|
|
|
|
// GetDeviceRunContainerOptions checks whether we have cached containerDevices
|
|
// for the passed-in <pod, container> and returns its DeviceRunContainerOptions
|
|
// for the found one. An empty struct is returned in case no cached state is found.
|
|
func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Container) (*DeviceRunContainerOptions, error) {
|
|
podUID := string(pod.UID)
|
|
contName := container.Name
|
|
for k := range container.Resources.Limits {
|
|
resource := string(k)
|
|
if !m.isDevicePluginResource(resource) {
|
|
continue
|
|
}
|
|
err := m.callPreStartContainerIfNeeded(podUID, contName, resource)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
m.mutex.Lock()
|
|
defer m.mutex.Unlock()
|
|
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
|
|
}
|
|
|
|
// callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource
|
|
// with PreStartRequired option set.
|
|
func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
|
|
m.mutex.Lock()
|
|
eI, ok := m.endpoints[resource]
|
|
if !ok {
|
|
m.mutex.Unlock()
|
|
return fmt.Errorf("endpoint not found in cache for a registered resource: %s", resource)
|
|
}
|
|
|
|
if eI.opts == nil || !eI.opts.PreStartRequired {
|
|
m.mutex.Unlock()
|
|
klog.V(4).Infof("Plugin options indicate to skip PreStartContainer for resource: %s", resource)
|
|
return nil
|
|
}
|
|
|
|
devices := m.podDevices.containerDevices(podUID, contName, resource)
|
|
if devices == nil {
|
|
m.mutex.Unlock()
|
|
return fmt.Errorf("no devices found allocated in local cache for pod %s, container %s, resource %s", podUID, contName, resource)
|
|
}
|
|
|
|
m.mutex.Unlock()
|
|
devs := devices.UnsortedList()
|
|
klog.V(4).Infof("Issuing an PreStartContainer call for container, %s, of pod %s", contName, podUID)
|
|
_, err := eI.e.preStartContainer(devs)
|
|
if err != nil {
|
|
return fmt.Errorf("device plugin PreStartContainer rpc failed with err: %v", err)
|
|
}
|
|
// TODO: Add metrics support for init RPC
|
|
return nil
|
|
}
|
|
|
|
// sanitizeNodeAllocatable scans through allocatedDevices in the device manager
|
|
// and if necessary, updates allocatableResource in nodeInfo to at least equal to
|
|
// the allocated capacity. This allows pods that have already been scheduled on
|
|
// the node to pass GeneralPredicates admission checking even upon device plugin failure.
|
|
func (m *ManagerImpl) sanitizeNodeAllocatable(node *schedulercache.NodeInfo) {
|
|
var newAllocatableResource *schedulercache.Resource
|
|
allocatableResource := node.AllocatableResource()
|
|
if allocatableResource.ScalarResources == nil {
|
|
allocatableResource.ScalarResources = make(map[v1.ResourceName]int64)
|
|
}
|
|
for resource, devices := range m.allocatedDevices {
|
|
needed := devices.Len()
|
|
quant, ok := allocatableResource.ScalarResources[v1.ResourceName(resource)]
|
|
if ok && int(quant) >= needed {
|
|
continue
|
|
}
|
|
// Needs to update nodeInfo.AllocatableResource to make sure
|
|
// NodeInfo.allocatableResource at least equal to the capacity already allocated.
|
|
if newAllocatableResource == nil {
|
|
newAllocatableResource = allocatableResource.Clone()
|
|
}
|
|
newAllocatableResource.ScalarResources[v1.ResourceName(resource)] = int64(needed)
|
|
}
|
|
if newAllocatableResource != nil {
|
|
node.SetAllocatableResource(newAllocatableResource)
|
|
}
|
|
}
|
|
|
|
func (m *ManagerImpl) isDevicePluginResource(resource string) bool {
|
|
_, registeredResource := m.healthyDevices[resource]
|
|
_, allocatedResource := m.allocatedDevices[resource]
|
|
// Return true if this is either an active device plugin resource or
|
|
// a resource we have previously allocated.
|
|
if registeredResource || allocatedResource {
|
|
return true
|
|
}
|
|
return false
|
|
}
|