Implement plugin manager - a controller that manages plugin registration/unregistration

k3s-v1.15.3
Tara Gu 2019-03-09 08:25:08 -05:00
parent 9577cbfb9f
commit 5e18554442
49 changed files with 2801 additions and 641 deletions

View File

@ -167,6 +167,7 @@ pkg/kubelet/dockershim/network/testing
pkg/kubelet/events
pkg/kubelet/lifecycle
pkg/kubelet/metrics
pkg/kubelet/pluginmanager/pluginwatcher
pkg/kubelet/pod/testing
pkg/kubelet/preemption
pkg/kubelet/prober
@ -180,7 +181,6 @@ pkg/kubelet/status
pkg/kubelet/status/testing
pkg/kubelet/sysctl
pkg/kubelet/types
pkg/kubelet/util/pluginwatcher
pkg/kubemark
pkg/master
pkg/master/controller/crdregistration

View File

@ -21,8 +21,8 @@ set -o pipefail
KUBE_ROOT="$(cd "$(dirname "${BASH_SOURCE[0]}")/../" && pwd -P)"
KUBELET_PLUGIN_REGISTRATION_V1ALPHA="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1alpha1/"
KUBELET_PLUGIN_REGISTRATION_V1BETA="${KUBE_ROOT}/pkg/kubelet/apis/pluginregistration/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA1="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA2="${KUBE_ROOT}/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2/"
KUBELET_EXAMPLE_PLUGIN_V1BETA1="${KUBE_ROOT}/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1/"
KUBELET_EXAMPLE_PLUGIN_V1BETA2="${KUBE_ROOT}/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2/"
source "${KUBE_ROOT}/hack/lib/protoc.sh"
kube::protoc::generate_proto "${KUBELET_PLUGIN_REGISTRATION_V1ALPHA}"

View File

@ -70,6 +70,8 @@ go_library(
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/oom:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/preemption:go_default_library",
"//pkg/kubelet/prober:go_default_library",
@ -90,7 +92,6 @@ go_library(
"//pkg/kubelet/util:go_default_library",
"//pkg/kubelet/util/format:go_default_library",
"//pkg/kubelet/util/manager:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/util/queue:go_default_library",
"//pkg/kubelet/util/sliceutils:go_default_library",
"//pkg/kubelet/volumemanager:go_default_library",
@ -189,6 +190,7 @@ go_test(
"//pkg/kubelet/network/dns:go_default_library",
"//pkg/kubelet/nodestatus:go_default_library",
"//pkg/kubelet/pleg:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/kubelet/pod:go_default_library",
"//pkg/kubelet/pod/testing:go_default_library",
"//pkg/kubelet/prober/results:go_default_library",
@ -296,6 +298,7 @@ filegroup(
"//pkg/kubelet/nodestatus:all-srcs",
"//pkg/kubelet/oom:all-srcs",
"//pkg/kubelet/pleg:all-srcs",
"//pkg/kubelet/pluginmanager:all-srcs",
"//pkg/kubelet/pod:all-srcs",
"//pkg/kubelet/preemption:all-srcs",
"//pkg/kubelet/prober:all-srcs",

View File

@ -32,8 +32,8 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/eviction/api:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/status:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",

View File

@ -28,8 +28,8 @@ import (
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
evictionapi "k8s.io/kubernetes/pkg/kubelet/eviction/api"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"fmt"
@ -100,7 +100,7 @@ type ContainerManager interface {
// GetPluginRegistrationHandler returns a plugin registration handler
// The pluginwatcher's Handlers allow to have a single module for handling
// registration.
GetPluginRegistrationHandler() pluginwatcher.PluginHandler
GetPluginRegistrationHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

View File

@ -52,10 +52,10 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/stats/pidlimit"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/mount"
"k8s.io/kubernetes/pkg/util/oom"
@ -620,7 +620,7 @@ func (cm *containerManagerImpl) Start(node *v1.Node,
return nil
}
func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return cm.deviceManager.GetWatcherHandler()
}

View File

@ -27,8 +27,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -80,7 +80,7 @@ func (cm *containerManagerStub) GetCapacity() v1.ResourceList {
return c
}
func (cm *containerManagerStub) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerStub) GetPluginRegistrationHandler() cache.PluginHandler {
return nil
}

View File

@ -37,8 +37,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/status"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/mount"
)
@ -140,7 +140,7 @@ func (cm *containerManagerImpl) GetCapacity() v1.ResourceList {
return cm.capacity
}
func (cm *containerManagerImpl) GetPluginRegistrationHandler() pluginwatcher.PluginHandler {
func (cm *containerManagerImpl) GetPluginRegistrationHandler() cache.PluginHandler {
return nil
}

View File

@ -25,7 +25,7 @@ go_library(
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/metrics:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//pkg/util/selinux:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
@ -48,14 +48,17 @@ go_test(
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/lifecycle:go_default_library",
"//pkg/kubelet/util/pluginwatcher:go_default_library",
"//pkg/kubelet/pluginmanager:go_default_library",
"//pkg/scheduler/nodeinfo:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],

View File

@ -42,7 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
"k8s.io/kubernetes/pkg/util/selinux"
)
@ -242,7 +242,7 @@ func (m *ManagerImpl) Start(activePods ActivePodsFunc, sourcesReady config.Sourc
}
// GetWatcherHandler returns the plugin handler
func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
func (m *ManagerImpl) GetWatcherHandler() cache.PluginHandler {
if f, err := os.Create(m.socketdir + "DEPRECATION"); err != nil {
klog.Errorf("Failed to create deprecation file at %s", m.socketdir)
} else {
@ -250,7 +250,7 @@ func (m *ManagerImpl) GetWatcherHandler() watcher.PluginHandler {
klog.V(4).Infof("created deprecation file %s", f.Name())
}
return watcher.PluginHandler(m)
return cache.PluginHandler(m)
}
// ValidatePlugin validates a plugin if the version is correct and the name has the format of an extended resource

View File

@ -21,7 +21,7 @@ import (
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -59,7 +59,7 @@ func (h *ManagerStub) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
}
// GetWatcherHandler returns plugin watcher interface
func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
func (h *ManagerStub) GetWatcherHandler() cache.PluginHandler {
return nil
}

View File

@ -32,11 +32,14 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
watcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -51,6 +54,7 @@ func tmpSocketDir() (socketDir, socketName, pluginSocketName string, err error)
}
socketName = socketDir + "/server.sock"
pluginSocketName = socketDir + "/device-plugin.sock"
os.MkdirAll(socketDir, 0755)
return
}
@ -68,17 +72,17 @@ func TestNewManagerImplStart(t *testing.T) {
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, _, p := setup(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, nil)
cleanup(t, m, p)
// Stop should tolerate being called more than once.
cleanup(t, m, p, nil)
cleanup(t, m, p)
}
func TestNewManagerImplStartProbeMode(t *testing.T) {
socketDir, socketName, pluginSocketName, err := tmpSocketDir()
require.NoError(t, err)
defer os.RemoveAll(socketDir)
m, _, p, w := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p, w)
m, _, p, _ := setupInProbeMode(t, []*pluginapi.Device{}, func(n string, d []pluginapi.Device) {}, socketName, pluginSocketName)
cleanup(t, m, p)
}
// Tests that the device plugin manager correctly handles registration and re-registration by
@ -144,7 +148,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of plugin previously registered should be removed.")
p2.Stop()
p3.Stop()
cleanup(t, m, p1, nil)
cleanup(t, m, p1)
}
}
@ -165,7 +169,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
{ID: "Dev3", Health: pluginapi.Healthy},
}
m, ch, p1, w := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
m, ch, p1, _ := setupInProbeMode(t, devs, nil, socketName, pluginSocketName)
// Wait for the first callback to be issued.
select {
@ -213,7 +217,7 @@ func TestDevicePluginReRegistrationProbeMode(t *testing.T) {
require.Equal(t, int64(1), resourceAllocatable.Value(), "Devices of previous registered should be removed")
p2.Stop()
p3.Stop()
cleanup(t, m, p1, w)
cleanup(t, m, p1)
}
func setupDeviceManager(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string) (Manager, <-chan interface{}) {
@ -247,12 +251,21 @@ func setupDevicePlugin(t *testing.T, devs []*pluginapi.Device, pluginSocketName
return p
}
func setupPluginWatcher(pluginSocketName string, m Manager) *pluginwatcher.Watcher {
w := pluginwatcher.NewWatcher(filepath.Dir(pluginSocketName), "" /* deprecatedSockDir */)
w.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
w.Start()
func setupPluginManager(t *testing.T, pluginSocketName string, m Manager) pluginmanager.PluginManager {
pluginManager := pluginmanager.NewPluginManager(
filepath.Dir(pluginSocketName), /* sockDir */
"", /* deprecatedSockDir */
&record.FakeRecorder{},
)
return w
runPluginManager(pluginManager)
pluginManager.AddHandler(watcherapi.DevicePlugin, m.GetWatcherHandler())
return pluginManager
}
func runPluginManager(pluginManager pluginmanager.PluginManager) {
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
go pluginManager.Run(sourcesReady, wait.NeverStop)
}
func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub) {
@ -261,19 +274,16 @@ func setup(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, soc
return m, updateChan, p
}
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, *pluginwatcher.Watcher) {
func setupInProbeMode(t *testing.T, devs []*pluginapi.Device, callback monitorCallback, socketName string, pluginSocketName string) (Manager, <-chan interface{}, *Stub, pluginmanager.PluginManager) {
m, updateChan := setupDeviceManager(t, devs, callback, socketName)
w := setupPluginWatcher(pluginSocketName, m)
pm := setupPluginManager(t, pluginSocketName, m)
p := setupDevicePlugin(t, devs, pluginSocketName)
return m, updateChan, p, w
return m, updateChan, p, pm
}
func cleanup(t *testing.T, m Manager, p *Stub, w *pluginwatcher.Watcher) {
func cleanup(t *testing.T, m Manager, p *Stub) {
p.Stop()
m.Stop()
if w != nil {
require.NoError(t, w.Stop())
}
}
func TestUpdateCapacityAllocatable(t *testing.T) {

View File

@ -24,7 +24,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
watcher "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
@ -54,7 +54,7 @@ type Manager interface {
// GetCapacity returns the amount of available device plugin resource capacity, resource allocatable
// and inactive device plugin resources previously registered on the node.
GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
GetWatcherHandler() watcher.PluginHandler
GetWatcherHandler() cache.PluginHandler
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices

View File

@ -80,6 +80,8 @@ import (
"k8s.io/kubernetes/pkg/kubelet/nodelease"
oomwatcher "k8s.io/kubernetes/pkg/kubelet/oom"
"k8s.io/kubernetes/pkg/kubelet/pleg"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
plugincache "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
"k8s.io/kubernetes/pkg/kubelet/preemption"
"k8s.io/kubernetes/pkg/kubelet/prober"
@ -98,7 +100,6 @@ import (
"k8s.io/kubernetes/pkg/kubelet/util"
"k8s.io/kubernetes/pkg/kubelet/util/format"
"k8s.io/kubernetes/pkg/kubelet/util/manager"
"k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/util/queue"
"k8s.io/kubernetes/pkg/kubelet/util/sliceutils"
"k8s.io/kubernetes/pkg/kubelet/volumemanager"
@ -785,9 +786,10 @@ func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
return nil, err
}
if klet.enablePluginsWatcher {
klet.pluginWatcher = pluginwatcher.NewWatcher(
klet.pluginManager = pluginmanager.NewPluginManager(
klet.getPluginsRegistrationDir(), /* sockDir */
klet.getPluginsDir(), /* deprecatedSockDir */
kubeDeps.Recorder,
)
}
@ -1201,10 +1203,9 @@ type Kubelet struct {
// This can be useful for debugging volume related issues.
keepTerminatedPodVolumes bool // DEPRECATED
// pluginwatcher is a utility for Kubelet to register different types of node-level plugins
// such as device plugins or CSI plugins. It discovers plugins by monitoring inotify events under the
// directory returned by kubelet.getPluginsDir()
pluginWatcher *pluginwatcher.Watcher
// pluginmanager runs a set of asynchronous loops that figure out which
// plugins need to be registered/unregistered based on this node and makes it so.
pluginManager pluginmanager.PluginManager
// This flag sets a maximum number of images to report in the node status.
nodeStatusMaxImages int32
@ -1376,15 +1377,12 @@ func (kl *Kubelet) initializeRuntimeDependentModules() {
kl.containerLogManager.Start()
if kl.enablePluginsWatcher {
// Adding Registration Callback function for CSI Driver
kl.pluginWatcher.AddHandler(pluginwatcherapi.CSIPlugin, pluginwatcher.PluginHandler(csi.PluginHandler))
kl.pluginManager.AddHandler(pluginwatcherapi.CSIPlugin, plugincache.PluginHandler(csi.PluginHandler))
// Adding Registration Callback function for Device Manager
kl.pluginWatcher.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin watcher
klog.V(4).Infof("starting watcher")
if err := kl.pluginWatcher.Start(); err != nil {
kl.recorder.Eventf(kl.nodeRef, v1.EventTypeWarning, events.KubeletSetupFailed, err.Error())
klog.Fatalf("failed to start Plugin Watcher. err: %v", err)
}
kl.pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, kl.containerManager.GetPluginRegistrationHandler())
// Start the plugin manager
klog.V(4).Infof("starting plugin manager")
go kl.pluginManager.Run(kl.sourcesReady, wait.NeverStop)
}
}

View File

@ -27,7 +27,7 @@ import (
cadvisorapi "github.com/google/cadvisor/info/v1"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
@ -53,6 +53,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/logs"
"k8s.io/kubernetes/pkg/kubelet/network/dns"
"k8s.io/kubernetes/pkg/kubelet/pleg"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager"
kubepod "k8s.io/kubernetes/pkg/kubelet/pod"
podtest "k8s.io/kubernetes/pkg/kubelet/pod/testing"
proberesults "k8s.io/kubernetes/pkg/kubelet/prober/results"
@ -326,6 +327,11 @@ func newTestKubeletWithImageList(
false, /* experimentalCheckNodeCapabilitiesBeforeMount*/
false /* keepTerminatedPodVolumes */)
kubelet.pluginManager = pluginmanager.NewPluginManager(
kubelet.getPluginsRegistrationDir(), /* sockDir */
kubelet.getPluginsDir(), /* deprecatedSockDir */
kubelet.recorder,
)
kubelet.setNodeStatusFuncs = kubelet.defaultNodeStatusFuncs()
// enable active deadline handler

View File

@ -0,0 +1,56 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["plugin_manager.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/metrics:go_default_library",
"//pkg/kubelet/pluginmanager/operationexecutor:go_default_library",
"//pkg/kubelet/pluginmanager/pluginwatcher:go_default_library",
"//pkg/kubelet/pluginmanager/reconciler:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/pluginmanager/cache:all-srcs",
"//pkg/kubelet/pluginmanager/metrics:all-srcs",
"//pkg/kubelet/pluginmanager/operationexecutor:all-srcs",
"//pkg/kubelet/pluginmanager/pluginwatcher:all-srcs",
"//pkg/kubelet/pluginmanager/reconciler:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)
go_test(
name = "go_default_test",
srcs = ["plugin_manager_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/pluginwatcher:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)

View File

@ -0,0 +1,5 @@
# See the OWNERS docs at https://go.k8s.io/owners
approvers:
- saad-ali
- taragu

36
pkg/kubelet/pluginmanager/cache/BUILD vendored Normal file
View File

@ -0,0 +1,36 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"actual_state_of_world.go",
"desired_state_of_world.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache",
visibility = ["//visibility:public"],
deps = ["//vendor/k8s.io/klog:go_default_library"],
)
go_test(
name = "go_default_test",
srcs = [
"actual_state_of_world_test.go",
"desired_state_of_world_test.go",
],
embed = [":go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,127 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package cache implements data structures used by the kubelet plugin manager to
keep track of registered plugins.
*/
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/klog"
)
// ActualStateOfWorld defines a set of thread-safe operations for the kubelet
// plugin manager's actual state of the world cache.
// This cache contains a map of socket file path to plugin information of
// all plugins attached to this node.
type ActualStateOfWorld interface {
// GetRegisteredPlugins generates and returns a list of plugins
// that are successfully registered plugins in the current actual state of world.
GetRegisteredPlugins() []PluginInfo
// AddPlugin add the given plugin in the cache.
// An error will be returned if socketPath of the PluginInfo object is empty.
// Note that this is different from desired world cache's AddOrUpdatePlugin
// because for the actual state of world cache, there won't be a scenario where
// we need to update an existing plugin if the timestamps don't match. This is
// because the plugin should have been unregistered in the reconciller and therefore
// removed from the actual state of world cache first before adding it back into
// the actual state of world cache again with the new timestamp
AddPlugin(pluginInfo PluginInfo) error
// RemovePlugin deletes the plugin with the given socket path from the actual
// state of world.
// If a plugin does not exist with the given socket path, this is a no-op.
RemovePlugin(socketPath string)
// PluginExists checks if the given plugin exists in the current actual
// state of world cache with the correct timestamp
PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool
}
// NewActualStateOfWorld returns a new instance of ActualStateOfWorld
func NewActualStateOfWorld() ActualStateOfWorld {
return &actualStateOfWorld{
socketFileToInfo: make(map[string]PluginInfo),
}
}
type actualStateOfWorld struct {
// socketFileToInfo is a map containing the set of successfully registered plugins
// The keys are plugin socket file paths. The values are PluginInfo objects
socketFileToInfo map[string]PluginInfo
sync.RWMutex
}
var _ ActualStateOfWorld = &actualStateOfWorld{}
// PluginInfo holds information of a plugin
type PluginInfo struct {
SocketPath string
FoundInDeprecatedDir bool
Timestamp time.Time
}
func (asw *actualStateOfWorld) AddPlugin(pluginInfo PluginInfo) error {
asw.Lock()
defer asw.Unlock()
if pluginInfo.SocketPath == "" {
return fmt.Errorf("Socket path is empty")
}
if _, ok := asw.socketFileToInfo[pluginInfo.SocketPath]; ok {
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache", pluginInfo.SocketPath)
}
asw.socketFileToInfo[pluginInfo.SocketPath] = pluginInfo
return nil
}
func (asw *actualStateOfWorld) RemovePlugin(socketPath string) {
asw.Lock()
defer asw.Unlock()
if _, ok := asw.socketFileToInfo[socketPath]; ok {
delete(asw.socketFileToInfo, socketPath)
}
}
func (asw *actualStateOfWorld) GetRegisteredPlugins() []PluginInfo {
asw.RLock()
defer asw.RUnlock()
currentPlugins := []PluginInfo{}
for _, pluginInfo := range asw.socketFileToInfo {
currentPlugins = append(currentPlugins, pluginInfo)
}
return currentPlugins
}
func (asw *actualStateOfWorld) PluginExistsWithCorrectTimestamp(pluginInfo PluginInfo) bool {
asw.RLock()
defer asw.RUnlock()
// We need to check both if the socket file path exists, and the timestamp
// matches the given plugin (from the desired state cache) timestamp
actualStatePlugin, exists := asw.socketFileToInfo[pluginInfo.SocketPath]
return exists && (actualStatePlugin.Timestamp == pluginInfo.Timestamp)
}

View File

@ -0,0 +1,140 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
"time"
)
// Calls AddPlugin() to add a plugin
// Verifies newly added plugin exists in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns true for the plugin
func Test_ASW_AddPlugin_Positive_NewPlugin(t *testing.T) {
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
}
asw := NewActualStateOfWorld()
err := asw.AddPlugin(pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Get registered plugins and check the newly added plugin is there
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 1 {
t.Fatalf("Actual state of world length should be one but it's %d", len(aswPlugins))
}
if aswPlugins[0] != pluginInfo {
t.Fatalf("Expected\n%v\nin actual state of world, but got\n%v\n", pluginInfo, aswPlugins[0])
}
// Check PluginExistsWithCorrectTimestamp returns true
if !asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns false for plugin that should be registered")
}
}
// Calls AddPlugin() to add an empty string for socket path
// Verifies the plugin does not exist in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns false
func Test_ASW_AddPlugin_Negative_EmptySocketPath(t *testing.T) {
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
// Assert
if err == nil || err.Error() != "Socket path is empty" {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <Socket path is empty> Actual: <%v>", err)
}
// Get registered plugins and check the newly added plugin is there
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 0 {
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for plugin that's not registered")
}
}
// Calls RemovePlugin() to remove a plugin
// Verifies newly removed plugin no longer exists in GetRegisteredPlugins()
// Verifies PluginExistsWithCorrectTimestamp returns false
func Test_ASW_RemovePlugin_Positive(t *testing.T) {
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Try removing this plugin
asw.RemovePlugin(pluginInfo.SocketPath)
// Get registered plugins and check the newly added plugin is not there
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 0 {
t.Fatalf("Actual state of world length should be zero but it's %d", len(aswPlugins))
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(pluginInfo) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for the removed plugin")
}
}
// Verifies PluginExistsWithCorrectTimestamp returns false for an existing
// plugin with the wrong timestamp
func Test_ASW_PluginExistsWithCorrectTimestamp_Negative_WrongTimestamp(t *testing.T) {
// First, add a plugin
asw := NewActualStateOfWorld()
pluginInfo := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
}
err := asw.AddPlugin(pluginInfo)
// Assert
if err != nil {
t.Fatalf("AddPlugin failed. Expected: <no error> Actual: <%v>", err)
}
newerPlugin := PluginInfo{
SocketPath: "/var/lib/kubelet/device-plugins/test-plugin.sock",
FoundInDeprecatedDir: false,
Timestamp: time.Now(),
}
// Check PluginExistsWithCorrectTimestamp returns false
if asw.PluginExistsWithCorrectTimestamp(newerPlugin) {
t.Fatalf("PluginExistsWithCorrectTimestamp returns true for a plugin with newer timestamp")
}
}

View File

@ -0,0 +1,172 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
/*
Package cache implements data structures used by the kubelet plugin manager to
keep track of registered plugins.
*/
package cache
import (
"fmt"
"sync"
"time"
"k8s.io/klog"
)
// DesiredStateOfWorld defines a set of thread-safe operations for the kubelet
// plugin manager's desired state of the world cache.
// This cache contains a map of socket file path to plugin information of
// all plugins attached to this node.
type DesiredStateOfWorld interface {
// AddOrUpdatePlugin add the given plugin in the cache if it doesn't already exist.
// If it does exist in the cache, then the timestamp and foundInDeprecatedDir of the PluginInfo object in the cache will be updated.
// An error will be returned if socketPath is empty.
AddOrUpdatePlugin(socketPath string, foundInDeprecatedDir bool) error
// RemovePlugin deletes the plugin with the given socket path from the desired
// state of world.
// If a plugin does not exist with the given socket path, this is a no-op.
RemovePlugin(socketPath string)
// GetPluginsToRegister generates and returns a list of plugins
// in the current desired state of world.
GetPluginsToRegister() []PluginInfo
// PluginExists checks if the given socket path exists in the current desired
// state of world cache
PluginExists(socketPath string) bool
}
// NewDesiredStateOfWorld returns a new instance of DesiredStateOfWorld.
func NewDesiredStateOfWorld() DesiredStateOfWorld {
return &desiredStateOfWorld{
socketFileToInfo: make(map[string]PluginInfo),
}
}
type desiredStateOfWorld struct {
// socketFileToInfo is a map containing the set of successfully registered plugins
// The keys are plugin socket file paths. The values are PluginInfo objects
socketFileToInfo map[string]PluginInfo
sync.RWMutex
}
var _ DesiredStateOfWorld = &desiredStateOfWorld{}
// Generate a detailed error msg for logs
func generatePluginMsgDetailed(prefixMsg, suffixMsg, socketPath, details string) (detailedMsg string) {
return fmt.Sprintf("%v for plugin at %q %v %v", prefixMsg, socketPath, details, suffixMsg)
}
// Generate a simplified error msg for events and a detailed error msg for logs
func generatePluginMsg(prefixMsg, suffixMsg, socketPath, details string) (simpleMsg, detailedMsg string) {
simpleMsg = fmt.Sprintf("%v for plugin at %q %v", prefixMsg, socketPath, suffixMsg)
return simpleMsg, generatePluginMsgDetailed(prefixMsg, suffixMsg, socketPath, details)
}
// GenerateMsgDetailed returns detailed msgs for plugins to register
// that can be used in logs.
// The msg format follows the pattern "<prefixMsg> <plugin details> <suffixMsg>"
func (plugin *PluginInfo) GenerateMsgDetailed(prefixMsg, suffixMsg string) (detailedMsg string) {
detailedStr := fmt.Sprintf("(plugin details: %v)", plugin)
return generatePluginMsgDetailed(prefixMsg, suffixMsg, plugin.SocketPath, detailedStr)
}
// GenerateMsg returns simple and detailed msgs for plugins to register
// that is user friendly and a detailed msg that can be used in logs.
// The msg format follows the pattern "<prefixMsg> <plugin details> <suffixMsg>".
func (plugin *PluginInfo) GenerateMsg(prefixMsg, suffixMsg string) (simpleMsg, detailedMsg string) {
detailedStr := fmt.Sprintf("(plugin details: %v)", plugin)
return generatePluginMsg(prefixMsg, suffixMsg, plugin.SocketPath, detailedStr)
}
// GenerateErrorDetailed returns detailed errors for plugins to register
// that can be used in logs.
// The msg format follows the pattern "<prefixMsg> <plugin details>: <err> ",
func (plugin *PluginInfo) GenerateErrorDetailed(prefixMsg string, err error) (detailedErr error) {
return fmt.Errorf(plugin.GenerateMsgDetailed(prefixMsg, errSuffix(err)))
}
// GenerateError returns simple and detailed errors for plugins to register
// that is user friendly and a detailed error that can be used in logs.
// The msg format follows the pattern "<prefixMsg> <plugin details>: <err> ".
func (plugin *PluginInfo) GenerateError(prefixMsg string, err error) (simpleErr, detailedErr error) {
simpleMsg, detailedMsg := plugin.GenerateMsg(prefixMsg, errSuffix(err))
return fmt.Errorf(simpleMsg), fmt.Errorf(detailedMsg)
}
// Generates an error string with the format ": <err>" if err exists
func errSuffix(err error) string {
errStr := ""
if err != nil {
errStr = fmt.Sprintf(": %v", err)
}
return errStr
}
func (dsw *desiredStateOfWorld) AddOrUpdatePlugin(socketPath string, foundInDeprecatedDir bool) error {
dsw.Lock()
defer dsw.Unlock()
if socketPath == "" {
return fmt.Errorf("Socket path is empty")
}
if _, ok := dsw.socketFileToInfo[socketPath]; ok {
klog.V(2).Infof("Plugin (Path %s) exists in actual state cache, timestamp will be updated", socketPath)
}
// Update the the PluginInfo object.
// Note that we only update the timestamp in the desired state of world, not the actual state of world
// because in the reconciler, we need to check if the plugin in the actual state of world is the same
// version as the plugin in the desired state of world
dsw.socketFileToInfo[socketPath] = PluginInfo{
SocketPath: socketPath,
FoundInDeprecatedDir: foundInDeprecatedDir,
Timestamp: time.Now(),
}
return nil
}
func (dsw *desiredStateOfWorld) RemovePlugin(socketPath string) {
dsw.Lock()
defer dsw.Unlock()
if _, ok := dsw.socketFileToInfo[socketPath]; ok {
delete(dsw.socketFileToInfo, socketPath)
}
}
func (dsw *desiredStateOfWorld) GetPluginsToRegister() []PluginInfo {
dsw.RLock()
defer dsw.RUnlock()
pluginsToRegister := []PluginInfo{}
for _, pluginInfo := range dsw.socketFileToInfo {
pluginsToRegister = append(pluginsToRegister, pluginInfo)
}
return pluginsToRegister
}
func (dsw *desiredStateOfWorld) PluginExists(socketPath string) bool {
dsw.RLock()
defer dsw.RUnlock()
_, exists := dsw.socketFileToInfo[socketPath]
return exists
}

View File

@ -0,0 +1,142 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
)
// Calls AddOrUpdatePlugin() to add a plugin
// Verifies newly added plugin exists in GetPluginsToRegister()
// Verifies newly added plugin returns true for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Positive_NewPlugin(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Get pluginsToRegister and check the newly added plugin is there
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 1 {
t.Fatalf("Desired state of world length should be one but it's %d", len(dswPlugins))
}
if dswPlugins[0].SocketPath != socketPath {
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, dswPlugins[0])
}
// Check PluginExists returns true
if !dsw.PluginExists(socketPath) {
t.Fatalf("PluginExists returns false for the newly added plugin")
}
}
// Calls AddOrUpdatePlugin() to update timestamp of an existing plugin
// Verifies the timestamp the existing plugin is updated
// Verifies newly added plugin returns true for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Positive_ExistingPlugin(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
// Adding the plugin for the first time
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Get pluginsToRegister and check the newly added plugin is there, and get the old timestamp
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 1 {
t.Fatalf("Desired state of world length should be one but it's %d", len(dswPlugins))
}
if dswPlugins[0].SocketPath != socketPath {
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, dswPlugins[0])
}
oldTimestamp := dswPlugins[0].Timestamp
// Adding the plugin again so that the timestamp will be updated
err = dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
newDswPlugins := dsw.GetPluginsToRegister()
if len(newDswPlugins) != 1 {
t.Fatalf("Desired state of world length should be one but it's %d", len(newDswPlugins))
}
if newDswPlugins[0].SocketPath != socketPath {
t.Fatalf("Expected\n%s\nin desired state of world, but got\n%v\n", socketPath, newDswPlugins[0])
}
// Verify that the new timestamp is newer than the old timestamp
if !newDswPlugins[0].Timestamp.After(oldTimestamp) {
t.Fatal("New timestamp is not newer than the old timestamp", newDswPlugins[0].Timestamp, oldTimestamp)
}
}
// Calls AddOrUpdatePlugin() to add an empty string for socket path
// Verifies the plugin does not exist in GetPluginsToRegister() after AddOrUpdatePlugin()
// Verifies the plugin returns false for PluginExists()
func Test_DSW_AddOrUpdatePlugin_Negative_PluginMissingInfo(t *testing.T) {
dsw := NewDesiredStateOfWorld()
socketPath := ""
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
// Assert
if err == nil || err.Error() != "Socket path is empty" {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <Socket path is empty> Actual: <%v>", err)
}
// Get pluginsToRegister and check the newly added plugin is there
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 0 {
t.Fatalf("Desired state of world length should be zero but it's %d", len(dswPlugins))
}
// Check PluginExists returns false
if dsw.PluginExists(socketPath) {
t.Fatalf("PluginExists returns true for the plugin that should not have been registered")
}
}
// Calls RemovePlugin() to remove a plugin
// Verifies newly removed plugin no longer exists in GetPluginsToRegister()
// Verifies newly removed plugin returns false for PluginExists()
func Test_DSW_RemovePlugin_Positive(t *testing.T) {
// First, add a plugin
dsw := NewDesiredStateOfWorld()
socketPath := "/var/lib/kubelet/device-plugins/test-plugin.sock"
err := dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
// Assert
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Try removing this plugin
dsw.RemovePlugin(socketPath)
// Get pluginsToRegister and check the newly added plugin is there
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 0 {
t.Fatalf("Desired state of world length should be zero but it's %d", len(dswPlugins))
}
// Check PluginExists returns false
if dsw.PluginExists(socketPath) {
t.Fatalf("PluginExists returns true for the removed plugin")
}
}

View File

@ -1,5 +1,5 @@
/*
Copyright 2018 The Kubernetes Authors.
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginwatcher
package cache
// PluginHandler is an interface a client of the pluginwatcher API needs to implement in
// order to consume plugins
@ -44,7 +44,6 @@ package pluginwatcher
// registers at foo.com/foo-1.9.9
//
// DeRegistration: When ReRegistration happens only the deletion of the new socket will trigger a DeRegister call
type PluginHandler interface {
// Validate returns an error if the information provided by
// the potential plugin is erroneous (unsupported version, ...)

View File

@ -0,0 +1,34 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["metrics.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//vendor/github.com/prometheus/client_golang/prometheus:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["metrics_test.go"],
embed = [":go_default_library"],
deps = ["//pkg/kubelet/pluginmanager/cache:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,104 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"sync"
"github.com/prometheus/client_golang/prometheus"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
const (
pluginNameNotAvailable = "N/A"
// Metric keys for Plugin Manager.
pluginManagerTotalPlugins = "plugin_manager_total_plugins"
)
var (
registerMetrics sync.Once
totalPluginsDesc = prometheus.NewDesc(
pluginManagerTotalPlugins,
"Number of plugins in Plugin Manager",
[]string{"socket_path", "state"},
nil,
)
)
// pluginCount is a map of maps used as a counter.
type pluginCount map[string]map[string]int64
func (pc pluginCount) add(state, pluginName string) {
count, ok := pc[state]
if !ok {
count = map[string]int64{}
}
count[pluginName]++
pc[state] = count
}
// Register registers Plugin Manager metrics.
func Register(asw cache.ActualStateOfWorld, dsw cache.DesiredStateOfWorld) {
registerMetrics.Do(func() {
prometheus.MustRegister(&totalPluginsCollector{asw, dsw})
})
}
type totalPluginsCollector struct {
asw cache.ActualStateOfWorld
dsw cache.DesiredStateOfWorld
}
var _ prometheus.Collector = &totalPluginsCollector{}
// Describe implements the prometheus.Collector interface.
func (c *totalPluginsCollector) Describe(ch chan<- *prometheus.Desc) {
ch <- totalPluginsDesc
}
// Collect implements the prometheus.Collector interface.
func (c *totalPluginsCollector) Collect(ch chan<- prometheus.Metric) {
for stateName, pluginCount := range c.getPluginCount() {
for socketPath, count := range pluginCount {
metric, err := prometheus.NewConstMetric(totalPluginsDesc,
prometheus.GaugeValue,
float64(count),
socketPath,
stateName)
if err != nil {
klog.Warningf("Failed to create metric : %v", err)
}
ch <- metric
}
}
}
func (c *totalPluginsCollector) getPluginCount() pluginCount {
counter := make(pluginCount)
for _, registeredPlugin := range c.asw.GetRegisteredPlugins() {
socketPath := registeredPlugin.SocketPath
counter.add("actual_state_of_world", socketPath)
}
for _, pluginToRegister := range c.dsw.GetPluginsToRegister() {
socketPath := pluginToRegister.SocketPath
counter.add("desired_state_of_world", socketPath)
}
return counter
}

View File

@ -0,0 +1,74 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"testing"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
func TestMetricCollection(t *testing.T) {
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakePlugin := cache.PluginInfo{
SocketPath: fmt.Sprintf("fake/path/plugin.sock"),
FoundInDeprecatedDir: false,
}
// Add one plugin to DesiredStateOfWorld
err := dsw.AddOrUpdatePlugin(fakePlugin.SocketPath, fakePlugin.FoundInDeprecatedDir)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
// Add one plugin to ActualStateOfWorld
err = asw.AddPlugin(fakePlugin)
if err != nil {
t.Fatalf("AddOrUpdatePlugin failed. Expected: <no error> Actual: <%v>", err)
}
metricCollector := &totalPluginsCollector{asw, dsw}
// Check if getPluginCount returns correct data
count := metricCollector.getPluginCount()
if len(count) != 2 {
t.Errorf("getPluginCount failed. Expected <2> states, got <%d>", len(count))
}
dswCount, ok := count["desired_state_of_world"]
if !ok {
t.Errorf("getPluginCount failed. Expected <desired_state_of_world>, got nothing")
}
fakePluginCount := dswCount["fake/path/plugin.sock"]
if fakePluginCount != 1 {
t.Errorf("getPluginCount failed. Expected <1> fake/path/plugin.sock in DesiredStateOfWorld, got <%d>",
fakePluginCount)
}
aswCount, ok := count["actual_state_of_world"]
if !ok {
t.Errorf("getPluginCount failed. Expected <actual_state_of_world>, got nothing")
}
fakePluginCount = aswCount["fake/path/plugin.sock"]
if fakePluginCount != 1 {
t.Errorf("getPluginCount failed. Expected <1> fake/path/plugin.sock in ActualStateOfWorld, got <%d>",
fakePluginCount)
}
}

View File

@ -0,0 +1,40 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = [
"operation_executor.go",
"operation_generator.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["operation_executor_test.go"],
embed = [":go_default_library"],
deps = ["//pkg/kubelet/pluginmanager/cache:go_default_library"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,117 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// register and unregister operations with a
// goroutinemap so that more than one operation is never triggered
// on the same plugin.
package operationexecutor
import (
"time"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/util/goroutinemap"
)
// OperationExecutor defines a set of operations for registering and unregistering
// a plugin that are executed with a NewGoRoutineMap which
// prevents more than one operation from being triggered on the same socket path.
//
// These operations should be idempotent (for example, RegisterPlugin should
// still succeed if the plugin is already registered, etc.). However,
// they depend on the plugin handlers (for each plugin type) to implement this
// behavior.
//
// Once an operation completes successfully, the actualStateOfWorld is updated
// to indicate the plugin is registered/unregistered.
//
// Once the operation is started, since it is executed asynchronously,
// errors are simply logged and the goroutine is terminated without updating
// actualStateOfWorld.
type OperationExecutor interface {
// RegisterPlugin registers the given plugin using the a handler in the plugin handler map.
// It then updates the actual state of the world to reflect that.
RegisterPlugin(socketPath string, foundInDeprecatedDir bool, timestamp time.Time, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
// UnregisterPlugin deregisters the given plugin using a handler in the given plugin handler map.
// It then updates the actual state of the world to reflect that.
UnregisterPlugin(socketPath string, pluginHandlers map[string]cache.PluginHandler, actualStateOfWorld ActualStateOfWorldUpdater) error
}
// NewOperationExecutor returns a new instance of OperationExecutor.
func NewOperationExecutor(
operationGenerator OperationGenerator) OperationExecutor {
return &operationExecutor{
pendingOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */),
operationGenerator: operationGenerator,
}
}
// ActualStateOfWorldUpdater defines a set of operations updating the actual
// state of the world cache after successful registeration/deregistration.
type ActualStateOfWorldUpdater interface {
// AddPlugin add the given plugin in the cache if no existing plugin
// in the cache has the same socket path.
// An error will be returned if socketPath is empty.
AddPlugin(pluginInfo cache.PluginInfo) error
// RemovePlugin deletes the plugin with the given socket path from the actual
// state of world.
// If a plugin does not exist with the given socket path, this is a no-op.
RemovePlugin(socketPath string)
}
type operationExecutor struct {
// pendingOperations keeps track of pending attach and detach operations so
// multiple operations are not started on the same volume
pendingOperations goroutinemap.GoRoutineMap
// operationGenerator is an interface that provides implementations for
// generating volume function
operationGenerator OperationGenerator
}
var _ OperationExecutor = &operationExecutor{}
func (oe *operationExecutor) IsOperationPending(socketPath string) bool {
return oe.pendingOperations.IsOperationPending(socketPath)
}
func (oe *operationExecutor) RegisterPlugin(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateRegisterPluginFunc(socketPath, foundInDeprecatedDir, timestamp, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)
}
func (oe *operationExecutor) UnregisterPlugin(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorld ActualStateOfWorldUpdater) error {
generatedOperation :=
oe.operationGenerator.GenerateUnregisterPluginFunc(socketPath, pluginHandlers, actualStateOfWorld)
return oe.pendingOperations.Run(
socketPath, generatedOperation)
}

View File

@ -0,0 +1,175 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package operationexecutor
import (
"fmt"
"io/ioutil"
"strconv"
"testing"
"time"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
const (
numPluginsToRegister = 2
numPluginsToUnregister = 2
)
var _ OperationGenerator = &fakeOperationGenerator{}
var socketDir string
func init() {
d, err := ioutil.TempDir("", "operation_executor_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
}
func TestOperationExecutor_RegisterPlugin_ConcurrentRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
for i := 0; i < numPluginsToRegister; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToRegister) {
t.Fatalf("Unable to start register operations in Concurrent for plugins")
}
}
func TestOperationExecutor_RegisterPlugin_SerialRegisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToRegister; i++ {
oe.RegisterPlugin(socketPath, false /* foundInDeprecatedDir */, time.Now(), nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunSerially(ch, quit) {
t.Fatalf("Unable to start register operations serially for plugins")
}
}
func TestOperationExecutor_UnregisterPlugin_ConcurrentUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
for i := 0; i < numPluginsToUnregister; i++ {
socketPath := "socket-path" + strconv.Itoa(i)
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunConcurrently(ch, quit, numPluginsToUnregister) {
t.Fatalf("Unable to start unregister operations in Concurrent for plugins")
}
}
func TestOperationExecutor_UnregisterPlugin_SerialUnregisterPlugin(t *testing.T) {
ch, quit, oe := setup()
socketPath := fmt.Sprintf("%s/plugin-serial.sock", socketDir)
for i := 0; i < numPluginsToUnregister; i++ {
oe.UnregisterPlugin(socketPath, nil /* plugin handlers */, nil /* actual state of the world updator */)
}
if !isOperationRunSerially(ch, quit) {
t.Fatalf("Unable to start unregister operations serially for plugins")
}
}
type fakeOperationGenerator struct {
ch chan interface{}
quit chan interface{}
}
func newFakeOperationGenerator(ch chan interface{}, quit chan interface{}) OperationGenerator {
return &fakeOperationGenerator{
ch: ch,
quit: quit,
}
}
func (fopg *fakeOperationGenerator) GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}
return opFunc
}
func (fopg *fakeOperationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
opFunc := func() error {
startOperationAndBlock(fopg.ch, fopg.quit)
return nil
}
return opFunc
}
func isOperationRunSerially(ch <-chan interface{}, quit chan<- interface{}) bool {
defer close(quit)
numOperationsStarted := 0
loop:
for {
select {
case <-ch:
numOperationsStarted++
if numOperationsStarted > 1 {
return false
}
case <-time.After(5 * time.Second):
break loop
}
}
return true
}
func isOperationRunConcurrently(ch <-chan interface{}, quit chan<- interface{}, numOperationsToRun int) bool {
defer close(quit)
numOperationsStarted := 0
loop:
for {
select {
case <-ch:
numOperationsStarted++
if numOperationsStarted == numOperationsToRun {
return true
}
case <-time.After(5 * time.Second):
break loop
}
}
return false
}
func setup() (chan interface{}, chan interface{}, OperationExecutor) {
ch, quit := make(chan interface{}), make(chan interface{})
return ch, quit, NewOperationExecutor(newFakeOperationGenerator(ch, quit))
}
// This function starts by writing to ch and blocks on the quit channel
// until it is closed by the currently running test
func startOperationAndBlock(ch chan<- interface{}, quit <-chan interface{}) {
ch <- nil
<-quit
}

View File

@ -0,0 +1,204 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package operationexecutor implements interfaces that enable execution of
// register and unregister operations with a
// goroutinemap so that more than one operation is never triggered
// on the same plugin.
package operationexecutor
import (
"context"
"fmt"
"net"
"time"
"github.com/pkg/errors"
"google.golang.org/grpc"
"k8s.io/client-go/tools/record"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
const (
dialTimeoutDuration = 10 * time.Second
notifyTimeoutDuration = 5 * time.Second
)
var _ OperationGenerator = &operationGenerator{}
type operationGenerator struct {
// recorder is used to record events in the API server
recorder record.EventRecorder
}
// NewOperationGenerator is returns instance of operationGenerator
func NewOperationGenerator(recorder record.EventRecorder) OperationGenerator {
return &operationGenerator{
recorder: recorder,
}
}
// OperationGenerator interface that extracts out the functions from operation_executor to make it dependency injectable
type OperationGenerator interface {
// Generates the RegisterPlugin function needed to perform the registration of a plugin
GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
// Generates the UnregisterPlugin function needed to perform the unregistration of a plugin
GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error
}
func (og *operationGenerator) GenerateRegisterPluginFunc(
socketPath string,
foundInDeprecatedDir bool,
timestamp time.Time,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
registerPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("RegisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
if err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}
handler, ok := pluginHandlers[infoResp.Type]
if !ok {
if err := og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
}
if infoResp.Endpoint == "" {
infoResp.Endpoint = socketPath
}
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
if err = og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin validation failed with err: %v", err)); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send error at socket %s, err: %v", socketPath, err)
}
return fmt.Errorf("RegisterPlugin error -- pluginHandler.ValidatePluginFunc failed")
}
// We add the plugin to the actual state of world cache before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
actualStateOfWorldUpdater.AddPlugin(cache.PluginInfo{
SocketPath: socketPath,
FoundInDeprecatedDir: foundInDeprecatedDir,
Timestamp: timestamp,
})
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return og.notifyPlugin(client, false, fmt.Sprintf("RegisterPlugin error -- plugin registration failed with err: %v", err))
}
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := og.notifyPlugin(client, true, ""); err != nil {
return fmt.Errorf("RegisterPlugin error -- failed to send registration status at socket %s, err: %v", socketPath, err)
}
return nil
}
return registerPluginFunc
}
func (og *operationGenerator) GenerateUnregisterPluginFunc(
socketPath string,
pluginHandlers map[string]cache.PluginHandler,
actualStateOfWorldUpdater ActualStateOfWorldUpdater) func() error {
unregisterPluginFunc := func() error {
client, conn, err := dial(socketPath, dialTimeoutDuration)
if err != nil {
return fmt.Errorf("UnregisterPlugin error -- dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
if err != nil {
return fmt.Errorf("UnregisterPlugin error -- failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}
handler, ok := pluginHandlers[infoResp.Type]
if !ok {
return fmt.Errorf("UnregisterPlugin error -- no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath)
}
// We remove the plugin to the actual state of world cache before calling a plugin consumer's Unregister handle
// so that if we receive a register event during Register Plugin, we can process it as a Register call.
actualStateOfWorldUpdater.RemovePlugin(socketPath)
handler.DeRegisterPlugin(infoResp.Name)
return nil
}
return unregisterPluginFunc
}
func (og *operationGenerator) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
ctx, cancel := context.WithTimeout(context.Background(), notifyTimeoutDuration)
defer cancel()
status := &registerapi.RegistrationStatus{
PluginRegistered: registered,
Error: errStr,
}
if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil {
return errors.Wrap(err, errStr)
}
if errStr != "" {
return errors.New(errStr)
}
return nil
}
// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
}
return registerapi.NewRegistrationClient(c), c, nil
}

View File

@ -0,0 +1,126 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginmanager
import (
"time"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/metrics"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler"
)
// PluginManager runs a set of asynchronous loops that figure out which plugins
// need to be registered/deregistered and makes it so.
type PluginManager interface {
// Starts the plugin manager and all the asynchronous loops that it controls
Run(sourcesReady config.SourcesReady, stopCh <-chan struct{})
// AddHandler adds the given plugin handler for a specific plugin type, which
// will be added to the actual state of world cache so that it can be passed to
// the desired state of world cache in order to be used during plugin
// registration/deregistration
AddHandler(pluginType string, pluginHandler cache.PluginHandler)
}
const (
// loopSleepDuration is the amount of time the reconciler loop waits
// between successive executions
loopSleepDuration = 1 * time.Second
)
// NewPluginManager returns a new concrete instance implementing the
// PluginManager interface.
func NewPluginManager(
sockDir string,
deprecatedSockDir string,
recorder record.EventRecorder) PluginManager {
asw := cache.NewActualStateOfWorld()
dsw := cache.NewDesiredStateOfWorld()
reconciler := reconciler.NewReconciler(
operationexecutor.NewOperationExecutor(
operationexecutor.NewOperationGenerator(
recorder,
),
),
loopSleepDuration,
dsw,
asw,
)
pm := &pluginManager{
desiredStateOfWorldPopulator: pluginwatcher.NewWatcher(
sockDir,
deprecatedSockDir,
dsw,
),
reconciler: reconciler,
desiredStateOfWorld: dsw,
actualStateOfWorld: asw,
}
return pm
}
// pluginManager implements the PluginManager interface
type pluginManager struct {
// desiredStateOfWorldPopulator (the plugin watcher) runs an asynchronous
// periodic loop to populate the desiredStateOfWorld.
desiredStateOfWorldPopulator *pluginwatcher.Watcher
// reconciler runs an asynchronous periodic loop to reconcile the
// desiredStateOfWorld with the actualStateOfWorld by triggering register
// and unregister operations using the operationExecutor.
reconciler reconciler.Reconciler
// actualStateOfWorld is a data structure containing the actual state of
// the world according to the manager: i.e. which plugins are registered.
// The data structure is populated upon successful completion of register
// and unregister actions triggered by the reconciler.
actualStateOfWorld cache.ActualStateOfWorld
// desiredStateOfWorld is a data structure containing the desired state of
// the world according to the plugin manager: i.e. what plugins are registered.
// The data structure is populated by the desired state of the world
// populator (plugin watcher).
desiredStateOfWorld cache.DesiredStateOfWorld
}
var _ PluginManager = &pluginManager{}
func (pm *pluginManager) Run(sourcesReady config.SourcesReady, stopCh <-chan struct{}) {
defer runtime.HandleCrash()
pm.desiredStateOfWorldPopulator.Start(stopCh)
klog.V(2).Infof("The desired_state_of_world populator (plugin watcher) starts")
klog.Infof("Starting Kubelet Plugin Manager")
go pm.reconciler.Run(stopCh)
metrics.Register(pm.actualStateOfWorld, pm.desiredStateOfWorld)
<-stopCh
klog.Infof("Shutting down Kubelet Plugin Manager")
}
func (pm *pluginManager) AddHandler(pluginType string, handler cache.PluginHandler) {
pm.reconciler.AddHandler(pluginType, handler)
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginmanager
import (
"fmt"
"io/ioutil"
"os"
"sync"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
)
const (
testHostname = "test-hostname"
)
var (
socketDir string
deprecatedSocketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
// fake cache.PluginHandler
type PluginHandler interface {
ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error
RegisterPlugin(pluginName, endpoint string, versions []string) error
DeRegisterPlugin(pluginName string)
}
type fakePluginHandler struct {
validatePluginCalled bool
registerPluginCalled bool
deregisterPluginCalled bool
sync.RWMutex
}
func newFakePluginHandler() *fakePluginHandler {
return &fakePluginHandler{
validatePluginCalled: false,
registerPluginCalled: false,
deregisterPluginCalled: false,
}
}
// ValidatePlugin is a fake method
func (f *fakePluginHandler) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
f.Lock()
defer f.Unlock()
f.validatePluginCalled = true
return nil
}
// RegisterPlugin is a fake method
func (f *fakePluginHandler) RegisterPlugin(pluginName, endpoint string, versions []string) error {
f.Lock()
defer f.Unlock()
f.registerPluginCalled = true
return nil
}
// DeRegisterPlugin is a fake method
func (f *fakePluginHandler) DeRegisterPlugin(pluginName string) {
f.Lock()
defer f.Unlock()
f.deregisterPluginCalled = true
return
}
func init() {
d, err := ioutil.TempDir("", "plugin_manager_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
d2, err := ioutil.TempDir("", "deprecateddir_plugin_manager_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
deprecatedSocketDir = d2
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
require.NoError(t, os.RemoveAll(deprecatedSocketDir))
os.MkdirAll(socketDir, 0755)
os.MkdirAll(deprecatedSocketDir, 0755)
}
func newWatcher(
t *testing.T, testDeprecatedDir bool,
desiredStateOfWorldCache cache.DesiredStateOfWorld) *pluginwatcher.Watcher {
depSocketDir := ""
if testDeprecatedDir {
depSocketDir = deprecatedSocketDir
}
w := pluginwatcher.NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
require.NoError(t, w.Start(wait.NeverStop))
return w
}
func waitForRegistration(t *testing.T, fakePluginHandler *fakePluginHandler) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
fakePluginHandler.Lock()
defer fakePluginHandler.Unlock()
if fakePluginHandler.validatePluginCalled && fakePluginHandler.registerPluginCalled {
return true, nil
}
return false, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be added to actual state of world cache.")
}
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 6,
}
return wait.ExponentialBackoff(backoff, fn)
}
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
pluginManager := newTestPluginManager(socketDir, deprecatedSocketDir)
// Start the plugin manager
stopChan := make(chan struct{})
defer close(stopChan)
go func() {
sourcesReady := config.NewSourcesReady(func(_ sets.String) bool { return true })
pluginManager.Run(sourcesReady, stopChan)
}()
// Add handler for device plugin
fakeHandler := newFakePluginHandler()
pluginManager.AddHandler(pluginwatcherapi.DevicePlugin, fakeHandler)
// Add a new plugin
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
pluginName := "example-plugin"
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
// Verify that the plugin is registered
waitForRegistration(t, fakeHandler)
}
func newTestPluginManager(
sockDir string,
deprecatedSockDir string) PluginManager {
pm := NewPluginManager(
sockDir,
deprecatedSockDir,
&record.FakeRecorder{},
)
return pm
}

View File

@ -6,17 +6,16 @@ go_library(
"example_handler.go",
"example_plugin.go",
"plugin_watcher.go",
"types.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher",
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:go_default_library",
"//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1:go_default_library",
"//pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2:go_default_library",
"//pkg/util/filesystem:go_default_library",
"//vendor/github.com/fsnotify/fsnotify:go_default_library",
"//vendor/github.com/pkg/errors:go_default_library",
"//vendor/golang.org/x/net/context:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
@ -29,6 +28,7 @@ go_test(
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
@ -46,8 +46,8 @@ filegroup(
name = "all-srcs",
srcs = [
":package-srcs",
"//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1:all-srcs",
"//pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2:all-srcs",
"//pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1:all-srcs",
"//pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2:all-srcs",
],
tags = ["automanaged"],
visibility = ["//visibility:public"],

View File

@ -22,14 +22,24 @@ This socket filename should not start with a '.' as it will be ignored.
For any discovered plugin, kubelet will issue a Registration.GetInfo gRPC call
to get plugin type, name, endpoint and supported service API versions.
Kubelet will then go through a plugin initialization phase where it will issue
Plugin specific calls (e.g: DevicePlugin::GetDevicePluginOptions).
If any of the following steps in registration fails, on retry registration will
start from scratch:
- Registration.GetInfo is called against socket.
- Validate is called against internal plugin type handler.
- Register is called against internal plugin type handler.
- NotifyRegistrationStatus is called against socket to indicate registration result.
During plugin initialization phase, Kubelet will issue Plugin specific calls
(e.g: DevicePlugin::GetDevicePluginOptions).
Once Kubelet determines that it is ready to use your plugin it will issue a
Registration.NotifyRegistrationStatus gRPC call.
If the plugin removes its socket from the PluginDir this will be interpreted
as a plugin Deregistration
as a plugin Deregistration. If any of the following steps in deregistration fails,
on retry deregistration will start from scratch:
- Registration.GetInfo is called against socket.
- DeRegisterPlugin is called against internal plugin type handler.
## gRPC Service Overview

View File

@ -19,15 +19,18 @@ package pluginwatcher
import (
"errors"
"fmt"
"net"
"reflect"
"sync"
"time"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/klog"
v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1"
v1beta2 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
)
type exampleHandler struct {
@ -152,3 +155,21 @@ func (p *exampleHandler) DecreasePluginCount(pluginName string) (old int, ok boo
return v, ok
}
// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
}
return registerapi.NewRegistrationClient(c), c, nil
}

View File

@ -29,8 +29,9 @@ import (
"k8s.io/klog"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
v1beta1 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1"
v1beta2 "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
v1beta1 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1"
v1beta2 "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2"
)
// examplePlugin is a sample plugin to work with plugin watcher
@ -86,6 +87,14 @@ func NewTestExamplePlugin(pluginName string, pluginType string, endpoint string,
}
}
// GetPluginInfo returns a PluginInfo object
func GetPluginInfo(plugin *examplePlugin, foundInDeprecatedDir bool) cache.PluginInfo {
return cache.PluginInfo{
SocketPath: plugin.endpoint,
FoundInDeprecatedDir: foundInDeprecatedDir,
}
}
// GetInfo is the RPC invoked by plugin watcher
func (e *examplePlugin) GetInfo(ctx context.Context, req *registerapi.InfoRequest) (*registerapi.PluginInfo, error) {
return &registerapi.PluginInfo{

View File

@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["api.pb.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta1",
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta1",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/gogo/protobuf/gogoproto:go_default_library",

View File

@ -3,7 +3,7 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library")
go_library(
name = "go_default_library",
srcs = ["api.pb.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/util/pluginwatcher/example_plugin_apis/v1beta2",
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher/example_plugin_apis/v1beta2",
visibility = ["//visibility:public"],
deps = [
"//vendor/github.com/gogo/protobuf/gogoproto:go_default_library",

View File

@ -0,0 +1,254 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginwatcher
import (
"fmt"
"os"
"path/filepath"
"strings"
"time"
"github.com/fsnotify/fsnotify"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Watcher is the plugin watcher
type Watcher struct {
path string
deprecatedPath string
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
stopped chan struct{}
desiredStateOfWorld cache.DesiredStateOfWorld
}
// NewWatcher provides a new watcher
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string, desiredStateOfWorld cache.DesiredStateOfWorld) *Watcher {
return &Watcher{
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},
desiredStateOfWorld: desiredStateOfWorld,
}
}
// Start watches for the creation and deletion of plugin sockets at the path
func (w *Watcher) Start(stopCh <-chan struct{}) error {
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
w.stopped = make(chan struct{})
// Creating the directory to be watched if it doesn't exist yet,
// and walks through the directory to discover the existing plugins.
if err := w.init(); err != nil {
return err
}
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
}
w.fsWatcher = fsWatcher
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
if err := w.traversePluginDir(w.path); err != nil {
klog.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}
// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
klog.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}
go func(fsWatcher *fsnotify.Watcher) {
defer close(w.stopped)
for {
select {
case event := <-fsWatcher.Events:
//TODO: Handle errors by taking corrective measures
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
err := w.handleDeleteEvent(event)
if err != nil {
klog.Errorf("error %v when handling delete event: %s", err, event)
}
}
continue
case err := <-fsWatcher.Errors:
if err != nil {
klog.Errorf("fsWatcher received error: %v", err)
}
continue
case <-stopCh:
// In case of plugin watcher being stopped by plugin manager, stop
// probing the creation/deletion of plugin sockets.
// Also give all pending go routines a chance to complete
select {
case <-w.stopped:
case <-time.After(11 * time.Second):
klog.Errorf("timeout on stopping watcher")
}
w.fsWatcher.Close()
return
}
}
}(fsWatcher)
return nil
}
func (w *Watcher) init() error {
klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
}
return nil
}
// Walks through the plugin directory discover any existing plugin sockets.
// Ignore all errors except root dir not being walkable
func (w *Watcher) traversePluginDir(dir string) error {
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path == dir {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
}
klog.Errorf("error accessing path: %s error: %v", path, err)
return nil
}
switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
case mode&os.ModeSocket != 0:
event := fsnotify.Event{
Name: path,
Op: fsnotify.Create,
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
default:
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
}
return nil
})
}
// Handle filesystem notify event.
// Files names:
// - MUST NOT start with a '.'
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)
if w.containsBlacklistedDir(event.Name) {
return nil
}
fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
}
if strings.HasPrefix(fi.Name(), ".") {
klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
return nil
}
if !fi.IsDir() {
if fi.Mode()&os.ModeSocket == 0 {
klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
return nil
}
return w.handlePluginRegistration(event.Name)
}
return w.traversePluginDir(event.Name)
}
func (w *Watcher) handlePluginRegistration(socketPath string) error {
//TODO: Implement rate limiting to mitigate any DOS kind of attacks.
// Update desired state of world list of plugins
// If the socket path does exist in the desired world cache, there's still
// a possibility that it has been deleted and recreated again before it is
// removed from the desired world cache, so we still need to call AddOrUpdatePlugin
// in this case to update the timestamp
klog.V(2).Infof("Adding socket path or updating timestamp %s to desired state cache", socketPath)
err := w.desiredStateOfWorld.AddOrUpdatePlugin(socketPath, w.foundInDeprecatedDir(socketPath))
if err != nil {
return fmt.Errorf("error adding socket path %s or updating timestamp to desired state cache: %v", socketPath, err)
}
return nil
}
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling delete event: %v", event)
socketPath := event.Name
klog.V(2).Infof("Removing socket path %s from desired state cache", socketPath)
w.desiredStateOfWorld.RemovePlugin(socketPath)
return nil
}
// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}
func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}
deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}

View File

@ -26,10 +26,10 @@ import (
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
)
var (
@ -68,128 +68,211 @@ func cleanup(t *testing.T) {
os.MkdirAll(deprecatedSocketDir, 0755)
}
func waitForRegistration(
t *testing.T,
socketPath string,
dsw cache.DesiredStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
if dsw.PluginExists(socketPath) {
return true, nil
}
return false, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be added to desired state of world cache:\n%s.", socketPath)
}
}
func waitForUnregistration(
t *testing.T,
socketPath string,
dsw cache.DesiredStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
if !dsw.PluginExists(socketPath) {
return true, nil
}
return false, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
}
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 6,
}
return wait.ExponentialBackoff(backoff, fn)
}
func TestPluginRegistration(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
// Check the desired state for plugins
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 1 {
t.Fatalf("TestPluginRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
}
// Stop the plugin; the plugin should be removed from the desired state of world cache
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
// The following doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
waitForUnregistration(t, pluginInfo.SocketPath, dsw)
dswPlugins = dsw.GetPluginsToRegister()
if len(dswPlugins) != 0 {
t.Fatalf("TestPluginRegistration: desired state of world length should be 0 but it's %d", len(dswPlugins))
}
}
}
func TestPluginRegistrationDeprecated(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, true /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, true /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, true /* testDeprecatedDir */, dsw, wait.NeverStop)
// Test plugins in deprecated dir
for i := 0; i < 10; i++ {
endpoint := fmt.Sprintf("%s/dep-plugin-%d.sock", deprecatedSocketDir, i)
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, endpoint, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
pluginInfo := GetPluginInfo(p, true /* testDeprecatedDir */)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
// Check the desired state for plugins
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != i+1 {
t.Fatalf("TestPluginRegistrationDeprecated: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
}
}
}
require.NoError(t, p.Stop())
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(p.pluginName)))
func TestPluginRegistrationSameName(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
// Make 10 plugins with the same name and same type but different socket path;
// all 10 should be in desired state of world cache
pluginName := "dep-example-plugin"
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
waitForRegistration(t, pluginInfo.SocketPath, dsw)
// Check the desired state for plugins
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != i+1 {
t.Fatalf("TestPluginRegistrationSameName: desired state of world length should be %d but it's %d", i+1, len(dswPlugins))
}
}
}
func TestPluginReRegistration(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
plugins := make([]*examplePlugin, 10)
for i := 0; i < 10; i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
hdlr.AddPluginName(pluginName)
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
// Create a plugin first, we are then going to remove the plugin, update the plugin with a different name
// and recreate it.
socketPath := fmt.Sprintf("%s/plugin-reregistration.sock", socketDir)
pluginName := "reregister-plugin"
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
lastTimestamp := time.Now()
waitForRegistration(t, pluginInfo.SocketPath, dsw)
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
// Remove this plugin, then recreate it again with a different name for 10 times
// The updated plugin should be in the desired state of world cache
for i := 0; i < 10; i++ {
// Stop the plugin; the plugin should be removed from the desired state of world cache
// The plugin removel doesn't work when running the unit tests locally: event.Op of plugin watcher won't pick up the delete event
require.NoError(t, p.Stop())
waitForUnregistration(t, pluginInfo.SocketPath, dsw)
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
// Add the plugin again
pluginName := fmt.Sprintf("dep-example-plugin-%d", i)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
waitForRegistration(t, pluginInfo.SocketPath, dsw)
plugins[i] = p
// Check the dsw cache. The updated plugin should be the only plugin in it
dswPlugins := dsw.GetPluginsToRegister()
if len(dswPlugins) != 1 {
t.Fatalf("TestPluginReRegistration: desired state of world length should be 1 but it's %d", len(dswPlugins))
}
plugins[len(plugins)-1].Stop()
require.True(t, waitForEvent(t, exampleEventDeRegister, hdlr.EventChan(pluginName)))
close(hdlr.EventChan(pluginName))
for i := 0; i < len(plugins)-1; i++ {
plugins[i].Stop()
if !dswPlugins[0].Timestamp.After(lastTimestamp) {
t.Fatalf("TestPluginReRegistration: for plugin %s timestamp of plugin is not updated", pluginName)
}
lastTimestamp = dswPlugins[0].Timestamp
}
}
func TestPluginRegistrationAtKubeletStart(t *testing.T) {
defer cleanup(t)
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
plugins := make([]*examplePlugin, 10)
for i := 0; i < len(plugins); i++ {
socketPath := fmt.Sprintf("%s/plugin-%d.sock", socketDir, i)
pluginName := fmt.Sprintf("example-plugin-%d", i)
hdlr.AddPluginName(pluginName)
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
defer func(p *examplePlugin) { require.NoError(t, p.Stop()) }(p)
defer func(p *examplePlugin) {
require.NoError(t, p.Stop())
}(p)
plugins[i] = p
}
dsw := cache.NewDesiredStateOfWorld()
newWatcher(t, false /* testDeprecatedDir */, dsw, wait.NeverStop)
var wg sync.WaitGroup
for i := 0; i < len(plugins); i++ {
wg.Add(1)
go func(p *examplePlugin) {
defer wg.Done()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.True(t, waitForEvent(t, exampleEventRegister, hdlr.EventChan(p.pluginName)))
require.True(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
pluginInfo := GetPluginInfo(p, false /* testDeprecatedDir */)
// Validate that the plugin is in the desired state cache
waitForRegistration(t, pluginInfo.SocketPath, dsw)
}(plugins[i])
}
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
c := make(chan struct{})
go func() {
defer close(c)
@ -204,64 +287,11 @@ func TestPluginRegistrationAtKubeletStart(t *testing.T) {
}
}
func TestPluginRegistrationFailureWithUnsupportedVersion(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}
func TestPlugiRegistrationFailureWithUnsupportedVersionAtKubeletStart(t *testing.T) {
defer cleanup(t)
pluginName := fmt.Sprintf("example-plugin")
socketPath := socketDir + "/plugin.sock"
// Advertise v1beta3 but don't serve anything else than the plugin service
p := NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, "v1beta3")
require.NoError(t, p.Serve())
defer func() { require.NoError(t, p.Stop()) }()
hdlr := NewExampleHandler(supportedVersions, false /* permitDeprecatedDir */)
hdlr.AddPluginName(pluginName)
c := make(chan struct{})
go func() {
defer close(c)
require.True(t, waitForEvent(t, exampleEventValidate, hdlr.EventChan(p.pluginName)))
require.False(t, waitForPluginRegistrationStatus(t, p.registrationStatus))
}()
w := newWatcherWithHandler(t, hdlr, false /* testDeprecatedDir */)
defer func() { require.NoError(t, w.Stop()) }()
select {
case <-c:
return
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timeout while waiting for the plugin registration status")
}
}
func waitForPluginRegistrationStatus(t *testing.T, statusChan chan registerapi.RegistrationStatus) bool {
select {
case status := <-statusChan:
return status.PluginRegistered
case <-time.After(10 * time.Second):
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("Timed out while waiting for registration status")
}
return false
@ -278,15 +308,13 @@ func waitForEvent(t *testing.T, expected examplePluginEvent, eventChan chan exam
return false
}
func newWatcherWithHandler(t *testing.T, hdlr PluginHandler, testDeprecatedDir bool) *Watcher {
func newWatcher(t *testing.T, testDeprecatedDir bool, desiredStateOfWorldCache cache.DesiredStateOfWorld, stopCh <-chan struct{}) *Watcher {
depSocketDir := ""
if testDeprecatedDir {
depSocketDir = deprecatedSocketDir
}
w := NewWatcher(socketDir, depSocketDir)
w.AddHandler(registerapi.DevicePlugin, hdlr)
require.NoError(t, w.Start())
w := NewWatcher(socketDir, depSocketDir, desiredStateOfWorldCache)
require.NoError(t, w.Start(stopCh))
return w
}
@ -356,7 +384,7 @@ func TestFoundInDeprecatedDir(t *testing.T) {
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
actualFoundInDeprecatedDir := watcher.foundInDeprecatedDir(tc.socketPath)
@ -480,7 +508,7 @@ func TestContainsBlacklistedDir(t *testing.T) {
for _, tc := range testCases {
// Arrange & Act
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir)
watcher := NewWatcher(tc.sockDir, tc.deprecatedSockDir, cache.NewDesiredStateOfWorld())
actual := watcher.containsBlacklistedDir(tc.path)

View File

@ -0,0 +1,45 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["reconciler.go"],
importpath = "k8s.io/kubernetes/pkg/kubelet/pluginmanager/reconciler",
visibility = ["//visibility:public"],
deps = [
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/operationexecutor:go_default_library",
"//pkg/util/goroutinemap:go_default_library",
"//pkg/util/goroutinemap/exponentialbackoff:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["reconciler_test.go"],
embed = [":go_default_library"],
deps = [
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/pluginmanager/cache:go_default_library",
"//pkg/kubelet/pluginmanager/operationexecutor:go_default_library",
"//pkg/kubelet/pluginmanager/pluginwatcher:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//vendor/github.com/stretchr/testify/require:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,160 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package reconciler implements interfaces that attempt to reconcile the
// desired state of the world with the actual state of the world by triggering
// relevant actions (register/deregister plugins).
package reconciler
import (
"sync"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
"k8s.io/kubernetes/pkg/util/goroutinemap"
"k8s.io/kubernetes/pkg/util/goroutinemap/exponentialbackoff"
)
// Reconciler runs a periodic loop to reconcile the desired state of the world
// with the actual state of the world by triggering register and unregister
// operations.
type Reconciler interface {
// Starts running the reconciliation loop which executes periodically, checks
// if plugins that should be registered are register and plugins that should be
// unregistered are unregistered. If not, it will trigger register/unregister
// operations to rectify.
Run(stopCh <-chan struct{})
// AddHandler adds the given plugin handler for a specific plugin type
AddHandler(pluginType string, pluginHandler cache.PluginHandler)
}
// NewReconciler returns a new instance of Reconciler.
//
// loopSleepDuration - the amount of time the reconciler loop sleeps between
// successive executions
// syncDuration - the amount of time the syncStates sleeps between
// successive executions
// operationExecutor - used to trigger register/unregister operations safely
// (prevents more than one operation from being triggered on the same
// socket path)
// desiredStateOfWorld - cache containing the desired state of the world
// actualStateOfWorld - cache containing the actual state of the world
func NewReconciler(
operationExecutor operationexecutor.OperationExecutor,
loopSleepDuration time.Duration,
desiredStateOfWorld cache.DesiredStateOfWorld,
actualStateOfWorld cache.ActualStateOfWorld) Reconciler {
return &reconciler{
operationExecutor: operationExecutor,
loopSleepDuration: loopSleepDuration,
desiredStateOfWorld: desiredStateOfWorld,
actualStateOfWorld: actualStateOfWorld,
handlers: make(map[string]cache.PluginHandler),
}
}
type reconciler struct {
operationExecutor operationexecutor.OperationExecutor
loopSleepDuration time.Duration
desiredStateOfWorld cache.DesiredStateOfWorld
actualStateOfWorld cache.ActualStateOfWorld
handlers map[string]cache.PluginHandler
sync.RWMutex
}
var _ Reconciler = &reconciler{}
func (rc *reconciler) Run(stopCh <-chan struct{}) {
wait.Until(func() {
rc.reconcile()
},
rc.loopSleepDuration,
stopCh)
}
func (rc *reconciler) AddHandler(pluginType string, pluginHandler cache.PluginHandler) {
rc.Lock()
defer rc.Unlock()
rc.handlers[pluginType] = pluginHandler
}
func (rc *reconciler) getHandlers() map[string]cache.PluginHandler {
rc.Lock()
defer rc.Unlock()
return rc.handlers
}
func (rc *reconciler) reconcile() {
// Unregisterations are triggered before registrations
// Ensure plugins that should be unregistered are unregistered.
for _, registeredPlugin := range rc.actualStateOfWorld.GetRegisteredPlugins() {
unregisterPlugin := false
if !rc.desiredStateOfWorld.PluginExists(registeredPlugin.SocketPath) {
unregisterPlugin = true
} else {
// We also need to unregister the plugins that exist in both actual state of world
// and desired state of world cache, but the timestamps don't match.
// Iterate through desired state of world plugins and see if there's any plugin
// with the same socket path but different timestamp.
for _, dswPlugin := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if dswPlugin.SocketPath == registeredPlugin.SocketPath && dswPlugin.Timestamp != registeredPlugin.Timestamp {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("An updated version of plugin has been found, unregistering the plugin first before reregistering", ""))
unregisterPlugin = true
break
}
}
}
if unregisterPlugin {
klog.V(5).Infof(registeredPlugin.GenerateMsgDetailed("Starting operationExecutor.UnregisterPlugin", ""))
err := rc.operationExecutor.UnregisterPlugin(registeredPlugin.SocketPath, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
// Log all other errors.
klog.Errorf(registeredPlugin.GenerateErrorDetailed("operationExecutor.UnregisterPlugin failed", err).Error())
}
if err == nil {
klog.V(1).Infof(registeredPlugin.GenerateMsgDetailed("operationExecutor.UnregisterPlugin started", ""))
}
}
}
// Ensure plugins that should be registered are registered
for _, pluginToRegister := range rc.desiredStateOfWorld.GetPluginsToRegister() {
if !rc.actualStateOfWorld.PluginExistsWithCorrectTimestamp(pluginToRegister) {
klog.V(5).Infof(pluginToRegister.GenerateMsgDetailed("Starting operationExecutor.RegisterPlugin", ""))
err := rc.operationExecutor.RegisterPlugin(pluginToRegister.SocketPath, pluginToRegister.FoundInDeprecatedDir, pluginToRegister.Timestamp, rc.getHandlers(), rc.actualStateOfWorld)
if err != nil &&
!goroutinemap.IsAlreadyExists(err) &&
!exponentialbackoff.IsExponentialBackoff(err) {
// Ignore goroutinemap.IsAlreadyExists and exponentialbackoff.IsExponentialBackoff errors, they are expected.
klog.Errorf(pluginToRegister.GenerateErrorDetailed("operationExecutor.RegisterPlugin failed", err).Error())
}
if err == nil {
klog.V(1).Infof(pluginToRegister.GenerateMsgDetailed("operationExecutor.RegisterPlugin started", ""))
}
}
}
}

View File

@ -0,0 +1,320 @@
/*
Copyright 2019 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package reconciler
import (
"fmt"
"io/ioutil"
"os"
"testing"
"time"
"github.com/stretchr/testify/require"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/record"
pluginwatcherapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/cache"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/operationexecutor"
"k8s.io/kubernetes/pkg/kubelet/pluginmanager/pluginwatcher"
)
const (
// reconcilerLoopSleepDuration is the amount of time the reconciler loop
// waits between successive executions
reconcilerLoopSleepDuration time.Duration = 1 * time.Nanosecond
)
var (
socketDir string
supportedVersions = []string{"v1beta1", "v1beta2"}
)
func init() {
d, err := ioutil.TempDir("", "reconciler_test")
if err != nil {
panic(fmt.Sprintf("Could not create a temp directory: %s", d))
}
socketDir = d
}
func cleanup(t *testing.T) {
require.NoError(t, os.RemoveAll(socketDir))
os.MkdirAll(socketDir, 0755)
}
func runReconciler(reconciler Reconciler) {
go reconciler.Run(wait.NeverStop)
}
func waitForRegistration(
t *testing.T,
socketPath string,
previousTimestamp time.Time,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
registeredPlugins := asw.GetRegisteredPlugins()
for _, plugin := range registeredPlugins {
if plugin.SocketPath == socketPath && plugin.Timestamp.After(previousTimestamp) {
return true, nil
}
}
return false, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be registered:\n%s.", socketPath)
}
}
func waitForUnregistration(
t *testing.T,
socketPath string,
asw cache.ActualStateOfWorld) {
err := retryWithExponentialBackOff(
time.Duration(500*time.Millisecond),
func() (bool, error) {
registeredPlugins := asw.GetRegisteredPlugins()
for _, plugin := range registeredPlugins {
if plugin.SocketPath == socketPath {
return false, nil
}
}
return true, nil
},
)
if err != nil {
t.Fatalf("Timed out waiting for plugin to be unregistered:\n%s.", socketPath)
}
}
func retryWithExponentialBackOff(initialDuration time.Duration, fn wait.ConditionFunc) error {
backoff := wait.Backoff{
Duration: initialDuration,
Factor: 3,
Jitter: 0,
Steps: 6,
}
return wait.ExponentialBackoff(backoff, fn)
}
type DummyImpl struct {
dummy string
}
func NewDummyImpl() *DummyImpl {
return &DummyImpl{}
}
// ValidatePlugin is a dummy implementation
func (d *DummyImpl) ValidatePlugin(pluginName string, endpoint string, versions []string, foundInDeprecatedDir bool) error {
return nil
}
// RegisterPlugin is a dummy implementation
func (d *DummyImpl) RegisterPlugin(pluginName string, endpoint string, versions []string) error {
return nil
}
// DeRegisterPlugin is a dummy implementation
func (d *DummyImpl) DeRegisterPlugin(pluginName string) {
}
// Calls Run()
// Verifies that asw and dsw have no plugins
func Test_Run_Positive_DoNothing(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeRecorder,
))
reconciler := NewReconciler(
oex,
reconcilerLoopSleepDuration,
dsw,
asw,
)
// Act
runReconciler(reconciler)
// Get dsw and asw plugins; they should both be empty
if len(asw.GetRegisteredPlugins()) != 0 {
t.Fatalf("Test_Run_Positive_DoNothing: actual state of world should be empty but it's not")
}
if len(dsw.GetPluginsToRegister()) != 0 {
t.Fatalf("Test_Run_Positive_DoNothing: desired state of world should be empty but it's not")
}
}
// Populates desiredStateOfWorld cache with one plugin.
// Calls Run()
// Verifies the actual state of world contains that plugin
func Test_Run_Positive_Register(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeRecorder,
))
reconciler := NewReconciler(
oex,
reconcilerLoopSleepDuration,
dsw,
asw,
)
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
// Start the reconciler to fill ASW.
stopChan := make(chan struct{})
defer close(stopChan)
go reconciler.Run(stopChan)
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
pluginName := fmt.Sprintf("example-plugin")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 1 {
t.Fatalf("Test_Run_Positive_Register: actual state of world length should be one but it's %d", len(aswPlugins))
}
if aswPlugins[0].SocketPath != socketPath {
t.Fatalf("Test_Run_Positive_Register: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
}
}
// Populates desiredStateOfWorld cache with one plugin
// Calls Run()
// Verifies there is one plugin now in actual state of world.
// Deletes plugin from desired state of world.
// Verifies that plugin no longer exists in actual state of world.
func Test_Run_Positive_RegisterThenUnregister(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeRecorder,
))
reconciler := NewReconciler(
oex,
reconcilerLoopSleepDuration,
dsw,
asw,
)
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
// Start the reconciler to fill ASW.
stopChan := make(chan struct{})
defer close(stopChan)
go reconciler.Run(stopChan)
socketPath := fmt.Sprintf("%s/plugin.sock", socketDir)
pluginName := fmt.Sprintf("example-plugin")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 1 {
t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
}
if aswPlugins[0].SocketPath != socketPath {
t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
}
dsw.RemovePlugin(socketPath)
waitForUnregistration(t, socketPath, asw)
// Get asw plugins; it should no longer contain the added plugin
aswPlugins = asw.GetRegisteredPlugins()
if len(aswPlugins) != 0 {
t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be zero but it's %d", len(aswPlugins))
}
}
// Populates desiredStateOfWorld cache with one plugin
// Calls Run()
// Then update the timestamp of the plugin
// Verifies that the plugin is reregistered.
// Verifies the plugin with updated timestamp now in actual state of world.
func Test_Run_Positive_ReRegister(t *testing.T) {
defer cleanup(t)
dsw := cache.NewDesiredStateOfWorld()
asw := cache.NewActualStateOfWorld()
di := NewDummyImpl()
fakeRecorder := &record.FakeRecorder{}
oex := operationexecutor.NewOperationExecutor(operationexecutor.NewOperationGenerator(
fakeRecorder,
))
reconciler := NewReconciler(
oex,
reconcilerLoopSleepDuration,
dsw,
asw,
)
reconciler.AddHandler(pluginwatcherapi.DevicePlugin, cache.PluginHandler(di))
// Start the reconciler to fill ASW.
stopChan := make(chan struct{})
defer close(stopChan)
go reconciler.Run(stopChan)
socketPath := fmt.Sprintf("%s/plugin2.sock", socketDir)
pluginName := fmt.Sprintf("example-plugin2")
p := pluginwatcher.NewTestExamplePlugin(pluginName, registerapi.DevicePlugin, socketPath, supportedVersions...)
require.NoError(t, p.Serve("v1beta1", "v1beta2"))
timestampBeforeRegistration := time.Now()
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
waitForRegistration(t, socketPath, timestampBeforeRegistration, asw)
timeStampBeforeReRegistration := time.Now()
// Add the plugin again to update the timestamp
dsw.AddOrUpdatePlugin(socketPath, false /* foundInDeprecatedDir */)
// This should trigger a deregistration and a regitration
// The process of unregistration and reregistration can happen so fast that
// we are not able to catch it with waitForUnregistration, so here we are checking
// the plugin has an updated timestamp.
waitForRegistration(t, socketPath, timeStampBeforeReRegistration, asw)
// Get asw plugins; it should contain the added plugin
aswPlugins := asw.GetRegisteredPlugins()
if len(aswPlugins) != 1 {
t.Fatalf("Test_Run_Positive_RegisterThenUnregister: actual state of world length should be one but it's %d", len(aswPlugins))
}
if aswPlugins[0].SocketPath != socketPath {
t.Fatalf("Test_Run_Positive_RegisterThenUnregister: expected\n%s\nin actual state of world, but got\n%v\n", socketPath, aswPlugins[0])
}
}

View File

@ -81,7 +81,6 @@ filegroup(
"//pkg/kubelet/util/ioutils:all-srcs",
"//pkg/kubelet/util/logreduction:all-srcs",
"//pkg/kubelet/util/manager:all-srcs",
"//pkg/kubelet/util/pluginwatcher:all-srcs",
"//pkg/kubelet/util/queue:all-srcs",
"//pkg/kubelet/util/sliceutils:all-srcs",
"//pkg/kubelet/util/store:all-srcs",

View File

@ -1,451 +0,0 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package pluginwatcher
import (
"fmt"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"github.com/pkg/errors"
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/klog"
registerapi "k8s.io/kubernetes/pkg/kubelet/apis/pluginregistration/v1"
utilfs "k8s.io/kubernetes/pkg/util/filesystem"
)
// Watcher is the plugin watcher
type Watcher struct {
path string
deprecatedPath string
stopCh chan struct{}
stopped chan struct{}
fs utilfs.Filesystem
fsWatcher *fsnotify.Watcher
mutex sync.Mutex
handlers map[string]PluginHandler
plugins map[string]pathInfo
pluginsPool map[string]map[string]*sync.Mutex // map[pluginType][pluginName]
}
type pathInfo struct {
pluginType string
pluginName string
}
// NewWatcher provides a new watcher
// deprecatedSockDir refers to a pre-GA directory that was used by older plugins
// for socket registration. New plugins should not use this directory.
func NewWatcher(sockDir string, deprecatedSockDir string) *Watcher {
return &Watcher{
path: sockDir,
deprecatedPath: deprecatedSockDir,
fs: &utilfs.DefaultFs{},
handlers: make(map[string]PluginHandler),
plugins: make(map[string]pathInfo),
pluginsPool: make(map[string]map[string]*sync.Mutex),
}
}
func (w *Watcher) AddHandler(pluginType string, handler PluginHandler) {
w.mutex.Lock()
defer w.mutex.Unlock()
w.handlers[pluginType] = handler
}
func (w *Watcher) getHandler(pluginType string) (PluginHandler, bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
h, ok := w.handlers[pluginType]
return h, ok
}
// Start watches for the creation of plugin sockets at the path
func (w *Watcher) Start() error {
klog.V(2).Infof("Plugin Watcher Start at %s", w.path)
w.stopCh = make(chan struct{})
w.stopped = make(chan struct{})
// Creating the directory to be watched if it doesn't exist yet,
// and walks through the directory to discover the existing plugins.
if err := w.init(); err != nil {
return err
}
fsWatcher, err := fsnotify.NewWatcher()
if err != nil {
return fmt.Errorf("failed to start plugin fsWatcher, err: %v", err)
}
w.fsWatcher = fsWatcher
// Traverse plugin dir and add filesystem watchers before starting the plugin processing goroutine.
if err := w.traversePluginDir(w.path); err != nil {
w.fsWatcher.Close()
return fmt.Errorf("failed to traverse plugin socket path %q, err: %v", w.path, err)
}
// Traverse deprecated plugin dir, if specified.
if len(w.deprecatedPath) != 0 {
if err := w.traversePluginDir(w.deprecatedPath); err != nil {
w.fsWatcher.Close()
return fmt.Errorf("failed to traverse deprecated plugin socket path %q, err: %v", w.deprecatedPath, err)
}
}
go func() {
defer close(w.stopped)
for {
select {
case event := <-fsWatcher.Events:
//TODO: Handle errors by taking corrective measures
if event.Op&fsnotify.Create == fsnotify.Create {
err := w.handleCreateEvent(event)
if err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
} else if event.Op&fsnotify.Remove == fsnotify.Remove {
err := w.handleDeleteEvent(event)
if err != nil {
klog.Errorf("error %v when handling delete event: %s", err, event)
}
}
case err := <-fsWatcher.Errors:
if err != nil {
klog.Errorf("fsWatcher received error: %v", err)
}
case <-w.stopCh:
return
}
}
}()
return nil
}
// Stop stops probing the creation of plugin sockets at the path
func (w *Watcher) Stop() error {
close(w.stopCh)
select {
case <-w.stopped:
case <-time.After(11 * time.Second):
return fmt.Errorf("timeout on stopping watcher")
}
w.fsWatcher.Close()
return nil
}
func (w *Watcher) init() error {
klog.V(4).Infof("Ensuring Plugin directory at %s ", w.path)
if err := w.fs.MkdirAll(w.path, 0755); err != nil {
return fmt.Errorf("error (re-)creating root %s: %v", w.path, err)
}
return nil
}
// Walks through the plugin directory discover any existing plugin sockets.
// Goroutines started here will be waited for in Stop() before cleaning up.
// Ignore all errors except root dir not being walkable
func (w *Watcher) traversePluginDir(dir string) error {
return w.fs.Walk(dir, func(path string, info os.FileInfo, err error) error {
if err != nil {
if path == dir {
return fmt.Errorf("error accessing path: %s error: %v", path, err)
}
klog.Errorf("error accessing path: %s error: %v", path, err)
return nil
}
switch mode := info.Mode(); {
case mode.IsDir():
if w.containsBlacklistedDir(path) {
return filepath.SkipDir
}
if err := w.fsWatcher.Add(path); err != nil {
return fmt.Errorf("failed to watch %s, err: %v", path, err)
}
case mode&os.ModeSocket != 0:
event := fsnotify.Event{
Name: path,
Op: fsnotify.Create,
}
//TODO: Handle errors by taking corrective measures
if err := w.handleCreateEvent(event); err != nil {
klog.Errorf("error %v when handling create event: %s", err, event)
}
default:
klog.V(5).Infof("Ignoring file %s with mode %v", path, mode)
}
return nil
})
}
// Handle filesystem notify event.
// Files names:
// - MUST NOT start with a '.'
func (w *Watcher) handleCreateEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling create event: %v", event)
if w.containsBlacklistedDir(event.Name) {
return nil
}
fi, err := os.Stat(event.Name)
if err != nil {
return fmt.Errorf("stat file %s failed: %v", event.Name, err)
}
if strings.HasPrefix(fi.Name(), ".") {
klog.V(5).Infof("Ignoring file (starts with '.'): %s", fi.Name())
return nil
}
if !fi.IsDir() {
if fi.Mode()&os.ModeSocket == 0 {
klog.V(5).Infof("Ignoring non socket file %s", fi.Name())
return nil
}
return w.handlePluginRegistration(event.Name)
}
return w.traversePluginDir(event.Name)
}
func (w *Watcher) handlePluginRegistration(socketPath string) error {
//TODO: Implement rate limiting to mitigate any DOS kind of attacks.
client, conn, err := dial(socketPath, 10*time.Second)
if err != nil {
return fmt.Errorf("dial failed at socket %s, err: %v", socketPath, err)
}
defer conn.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
infoResp, err := client.GetInfo(ctx, &registerapi.InfoRequest{})
if err != nil {
return fmt.Errorf("failed to get plugin info using RPC GetInfo at socket %s, err: %v", socketPath, err)
}
handler, ok := w.getHandler(infoResp.Type)
if !ok {
return w.notifyPlugin(client, false, fmt.Sprintf("no handler registered for plugin type: %s at socket %s", infoResp.Type, socketPath))
}
// ReRegistration: We want to handle multiple plugins registering at the same time with the same name sequentially.
// See the state machine for more information.
// This is done by using a Lock for each plugin with the same name and type
pool := w.getPluginPool(infoResp.Type, infoResp.Name)
pool.Lock()
defer pool.Unlock()
if infoResp.Endpoint == "" {
infoResp.Endpoint = socketPath
}
foundInDeprecatedDir := w.foundInDeprecatedDir(socketPath)
// calls handler callback to verify registration request
if err := handler.ValidatePlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions, foundInDeprecatedDir); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin validation failed with err: %v", err))
}
// We add the plugin to the pluginwatcher's map before calling a plugin consumer's Register handle
// so that if we receive a delete event during Register Plugin, we can process it as a DeRegister call.
w.registerPlugin(socketPath, infoResp.Type, infoResp.Name)
if err := handler.RegisterPlugin(infoResp.Name, infoResp.Endpoint, infoResp.SupportedVersions); err != nil {
return w.notifyPlugin(client, false, fmt.Sprintf("plugin registration failed with err: %v", err))
}
// Notify is called after register to guarantee that even if notify throws an error Register will always be called after validate
if err := w.notifyPlugin(client, true, ""); err != nil {
return fmt.Errorf("failed to send registration status at socket %s, err: %v", socketPath, err)
}
return nil
}
func (w *Watcher) handleDeleteEvent(event fsnotify.Event) error {
klog.V(6).Infof("Handling delete event: %v", event)
plugin, ok := w.getPlugin(event.Name)
if !ok {
klog.V(5).Infof("could not find plugin for deleted file %s", event.Name)
return nil
}
// You should not get a Deregister call while registering a plugin
pool := w.getPluginPool(plugin.pluginType, plugin.pluginName)
pool.Lock()
defer pool.Unlock()
// ReRegisteration: When waiting for the lock a plugin with the same name (not socketPath) could have registered
// In that case, we don't want to issue a DeRegister call for that plugin
// When ReRegistering, the new plugin will have removed the current mapping (map[socketPath] = plugin) and replaced
// it with it's own socketPath.
if _, ok = w.getPlugin(event.Name); !ok {
klog.V(2).Infof("A newer plugin watcher has been registered for plugin %v, dropping DeRegister call", plugin)
return nil
}
h, ok := w.getHandler(plugin.pluginType)
if !ok {
return fmt.Errorf("could not find handler %s for plugin %s at path %s", plugin.pluginType, plugin.pluginName, event.Name)
}
klog.V(2).Infof("DeRegistering plugin %v at path %s", plugin, event.Name)
w.deRegisterPlugin(event.Name, plugin.pluginType, plugin.pluginName)
h.DeRegisterPlugin(plugin.pluginName)
return nil
}
func (w *Watcher) registerPlugin(socketPath, pluginType, pluginName string) {
w.mutex.Lock()
defer w.mutex.Unlock()
// Reregistration case, if this plugin is already in the map, remove it
// This will prevent handleDeleteEvent to issue a DeRegister call
for path, info := range w.plugins {
if info.pluginType != pluginType || info.pluginName != pluginName {
continue
}
delete(w.plugins, path)
break
}
w.plugins[socketPath] = pathInfo{
pluginType: pluginType,
pluginName: pluginName,
}
}
func (w *Watcher) deRegisterPlugin(socketPath, pluginType, pluginName string) {
w.mutex.Lock()
defer w.mutex.Unlock()
delete(w.plugins, socketPath)
delete(w.pluginsPool[pluginType], pluginName)
}
func (w *Watcher) getPlugin(socketPath string) (pathInfo, bool) {
w.mutex.Lock()
defer w.mutex.Unlock()
plugin, ok := w.plugins[socketPath]
return plugin, ok
}
func (w *Watcher) getPluginPool(pluginType, pluginName string) *sync.Mutex {
w.mutex.Lock()
defer w.mutex.Unlock()
if _, ok := w.pluginsPool[pluginType]; !ok {
w.pluginsPool[pluginType] = make(map[string]*sync.Mutex)
}
if _, ok := w.pluginsPool[pluginType][pluginName]; !ok {
w.pluginsPool[pluginType][pluginName] = &sync.Mutex{}
}
return w.pluginsPool[pluginType][pluginName]
}
func (w *Watcher) notifyPlugin(client registerapi.RegistrationClient, registered bool, errStr string) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
status := &registerapi.RegistrationStatus{
PluginRegistered: registered,
Error: errStr,
}
if _, err := client.NotifyRegistrationStatus(ctx, status); err != nil {
return errors.Wrap(err, errStr)
}
if errStr != "" {
return errors.New(errStr)
}
return nil
}
// Dial establishes the gRPC communication with the picked up plugin socket. https://godoc.org/google.golang.org/grpc#Dial
func dial(unixSocketPath string, timeout time.Duration) (registerapi.RegistrationClient, *grpc.ClientConn, error) {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
c, err := grpc.DialContext(ctx, unixSocketPath, grpc.WithInsecure(), grpc.WithBlock(),
grpc.WithDialer(func(addr string, timeout time.Duration) (net.Conn, error) {
return net.DialTimeout("unix", addr, timeout)
}),
)
if err != nil {
return nil, nil, fmt.Errorf("failed to dial socket %s, err: %v", unixSocketPath, err)
}
return registerapi.NewRegistrationClient(c), c, nil
}
// While deprecated dir is supported, to add extra protection around #69015
// we will explicitly blacklist kubernetes.io directory.
func (w *Watcher) containsBlacklistedDir(path string) bool {
return strings.HasPrefix(path, w.deprecatedPath+"/kubernetes.io/") ||
path == w.deprecatedPath+"/kubernetes.io"
}
func (w *Watcher) foundInDeprecatedDir(socketPath string) bool {
if len(w.deprecatedPath) != 0 {
if socketPath == w.deprecatedPath {
return true
}
deprecatedPath := w.deprecatedPath
if !strings.HasSuffix(deprecatedPath, "/") {
deprecatedPath = deprecatedPath + "/"
}
if strings.HasPrefix(socketPath, deprecatedPath) {
return true
}
}
return false
}