unnecessary functions cleanup for deviceplugin

pull/6/head
lichuqiang 2017-09-08 12:14:15 +08:00
parent 191ff804ea
commit fd8b04649e
11 changed files with 68 additions and 166 deletions

View File

@ -86,7 +86,7 @@ type DevicePluginHandler interface {
// Start starts device plugin registration service.
Start() error
// Devices returns all of registered devices keyed by resourceName.
Devices() map[string][]*pluginapi.Device
Devices() map[string][]pluginapi.Device
// Allocate attempts to allocate all of required extended resources for
// the input container, issues an Allocate rpc request for each of such
// resources, and returns their AllocateResponses on success.
@ -114,7 +114,7 @@ func NewDevicePluginHandlerImpl(updateCapacityFunc func(v1.ResourceList)) (*Devi
allocatedDevices: make(map[string]podDevices),
}
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {
deviceManagerMonitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {
var capacity = v1.ResourceList{}
kept := append(updated, added...)
if _, ok := handler.allDevices[resourceName]; !ok {
@ -155,7 +155,7 @@ func (h *DevicePluginHandlerImpl) Start() error {
return h.devicePluginManager.Start()
}
func (h *DevicePluginHandlerImpl) Devices() map[string][]*pluginapi.Device {
func (h *DevicePluginHandlerImpl) Devices() map[string][]pluginapi.Device {
return h.devicePluginManager.Devices()
}

View File

@ -32,8 +32,8 @@ func (h *DevicePluginHandlerStub) Start() error {
return nil
}
func (h *DevicePluginHandlerStub) Devices() map[string][]*pluginapi.Device {
return make(map[string][]*pluginapi.Device)
func (h *DevicePluginHandlerStub) Devices() map[string][]pluginapi.Device {
return make(map[string][]pluginapi.Device)
}
func (h *DevicePluginHandlerStub) Allocate(pod *v1.Pod, container *v1.Container, activePods []*v1.Pod) ([]*pluginapi.AllocateResponse, error) {

View File

@ -41,7 +41,7 @@ func TestUpdateCapacity(t *testing.T) {
as.NotNil(testDevicePluginHandler)
as.Nil(err)
devs := []*pluginapi.Device{
devs := []pluginapi.Device{
{ID: "Device1", Health: pluginapi.Healthy},
{ID: "Device2", Health: pluginapi.Healthy},
{ID: "Device3", Health: pluginapi.Unhealthy},
@ -51,9 +51,9 @@ func TestUpdateCapacity(t *testing.T) {
// Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []*pluginapi.Device{}, []*pluginapi.Device{})
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, devs, []pluginapi.Device{}, []pluginapi.Device{})
// Deletes an unhealthy device should NOT change capacity.
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, []*pluginapi.Device{devs[2]})
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName, []pluginapi.Device{}, []pluginapi.Device{}, []pluginapi.Device{devs[2]})
// Updates a healthy device to unhealthy should reduce capacity by 1.
expected[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(1), resource.DecimalSI)
// Deletes a healthy device should reduce capacity by 1.
@ -62,7 +62,7 @@ func TestUpdateCapacity(t *testing.T) {
delete(expected, v1.ResourceName(resourceName))
resourceName2 := "resource2"
expected[v1.ResourceName(resourceName2)] = *resource.NewQuantity(int64(2), resource.DecimalSI)
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []*pluginapi.Device{}, []*pluginapi.Device{})
testDevicePluginHandler.devicePluginManagerMonitorCallback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
}
type stringPairType struct {
@ -90,8 +90,8 @@ func (m *DevicePluginManagerTestStub) Start() error {
return nil
}
func (m *DevicePluginManagerTestStub) Devices() map[string][]*pluginapi.Device {
return make(map[string][]*pluginapi.Device)
func (m *DevicePluginManagerTestStub) Devices() map[string][]pluginapi.Device {
return make(map[string][]pluginapi.Device)
}
func (m *DevicePluginManagerTestStub) Allocate(resourceName string, devIds []string) (*pluginapi.AllocateResponse, error) {
@ -181,7 +181,7 @@ func TestPodContainerDeviceAllocation(t *testing.T) {
m, err := NewDevicePluginManagerTestStub()
as := assert.New(t)
as.Nil(err)
monitorCallback := func(resourceName string, added, updated, deleted []*pluginapi.Device) {}
monitorCallback := func(resourceName string, added, updated, deleted []pluginapi.Device) {}
testDevicePluginHandler := &DevicePluginHandlerImpl{
devicePluginManager: m,

View File

@ -13,7 +13,6 @@ go_library(
"endpoint.go",
"manager.go",
"types.go",
"utils.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin",
deps = [
@ -44,7 +43,6 @@ go_test(
srcs = [
"endpoint_test.go",
"manager_test.go",
"utils_test.go",
],
importpath = "k8s.io/kubernetes/pkg/kubelet/deviceplugin",
library = ":go_default_library",

View File

@ -38,7 +38,7 @@ type endpoint struct {
socketPath string
resourceName string
devices map[string]*pluginapi.Device
devices map[string]pluginapi.Device
mutex sync.Mutex
callback MonitorCallback
@ -71,10 +71,16 @@ func newEndpoint(socketPath, resourceName string, callback MonitorCallback) (*en
}, nil
}
func (e *endpoint) getDevices() []*pluginapi.Device {
func (e *endpoint) getDevices() []pluginapi.Device {
e.mutex.Lock()
defer e.mutex.Unlock()
return copyDevices(e.devices)
var devs []pluginapi.Device
for _, d := range e.devices {
devs = append(devs, d)
}
return devs
}
// list initializes ListAndWatch gRPC call for the device plugin and gets the
@ -94,11 +100,11 @@ func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
return nil, err
}
devices := make(map[string]*pluginapi.Device)
var added, updated, deleted []*pluginapi.Device
devices := make(map[string]pluginapi.Device)
var added, updated, deleted []pluginapi.Device
for _, d := range devs.Devices {
devices[d.ID] = d
added = append(added, cloneDevice(d))
devices[d.ID] = *d
added = append(added, *d)
}
e.mutex.Lock()
@ -118,11 +124,11 @@ func (e *endpoint) list() (pluginapi.DevicePlugin_ListAndWatchClient, error) {
func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient) {
glog.V(3).Infof("Starting ListAndWatch")
devices := make(map[string]*pluginapi.Device)
devices := make(map[string]pluginapi.Device)
e.mutex.Lock()
for _, d := range e.devices {
devices[d.ID] = cloneDevice(d)
devices[d.ID] = d
}
e.mutex.Unlock()
@ -137,7 +143,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
glog.V(2).Infof("State pushed for device plugin %s", e.resourceName)
newDevs := make(map[string]*pluginapi.Device)
var added, updated []*pluginapi.Device
var added, updated []pluginapi.Device
for _, d := range devs {
dOld, ok := devices[d.ID]
@ -146,8 +152,8 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
if !ok {
glog.V(2).Infof("New device for Endpoint %s: %v", e.resourceName, d)
devices[d.ID] = d
added = append(added, cloneDevice(d))
devices[d.ID] = *d
added = append(added, *d)
continue
}
@ -162,11 +168,11 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
glog.V(2).Infof("Device %s is now Healthy", d.ID)
}
devices[d.ID] = d
updated = append(updated, cloneDevice(d))
devices[d.ID] = *d
updated = append(updated, *d)
}
var deleted []*pluginapi.Device
var deleted []pluginapi.Device
for id, d := range devices {
if _, ok := newDevs[id]; ok {
continue
@ -174,7 +180,7 @@ func (e *endpoint) listAndWatch(stream pluginapi.DevicePlugin_ListAndWatchClient
glog.Errorf("Device %s was deleted", d.ID)
deleted = append(deleted, cloneDevice(d))
deleted = append(deleted, d)
delete(devices, id)
}

View File

@ -37,7 +37,7 @@ func TestNewEndpoint(t *testing.T) {
{ID: "ADeviceId", Health: pluginapi.Healthy},
}
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {})
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
defer ecleanup(t, p, e)
}
@ -48,7 +48,7 @@ func TestList(t *testing.T) {
{ID: "ADeviceId", Health: pluginapi.Healthy},
}
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {})
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
defer ecleanup(t, p, e)
_, err := e.list()
@ -79,7 +79,7 @@ func TestListAndWatch(t *testing.T) {
{ID: "AThirdDeviceId", Health: pluginapi.Healthy},
}
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []*pluginapi.Device) {
p, e := esetup(t, devs, socket, "mock", func(n string, a, u, r []pluginapi.Device) {
require.Len(t, a, 1)
require.Len(t, u, 1)
require.Len(t, r, 1)
@ -114,13 +114,23 @@ func TestListAndWatch(t *testing.T) {
}
func TestGetDevices(t *testing.T) {
e := endpoint{
devices: map[string]pluginapi.Device{
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
},
}
devs := e.getDevices()
require.Len(t, devs, 1)
}
func esetup(t *testing.T, devs []*pluginapi.Device, socket, resourceName string, callback MonitorCallback) (*Stub, *endpoint) {
p := NewDevicePluginStub(devs, socket)
err := p.Start()
require.NoError(t, err)
e, err := newEndpoint(socket, "mock", func(n string, a, u, r []*pluginapi.Device) {})
e, err := newEndpoint(socket, "mock", func(n string, a, u, r []pluginapi.Device) {})
require.NoError(t, err)
return p, e

View File

@ -27,6 +27,8 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
)
@ -94,9 +96,14 @@ func (m *ManagerImpl) removeContents(dir string) error {
return nil
}
const (
// defaultCheckpoint is the file name of device plugin checkpoint
defaultCheckpoint = "kubelet_internal_checkpoint"
)
// CheckpointFile returns device plugin checkpoint file path.
func (m *ManagerImpl) CheckpointFile() string {
return filepath.Join(m.socketdir, "kubelet_internal_checkpoint")
return filepath.Join(m.socketdir, defaultCheckpoint)
}
// Start starts the Device Plugin Manager
@ -112,11 +119,6 @@ func (m *ManagerImpl) Start() error {
glog.Errorf("Fail to clean up stale contents under %s: %+v", m.socketdir, err)
}
if err := os.Remove(socketPath); err != nil && !os.IsNotExist(err) {
glog.Errorf(errRemoveSocket+" %+v", err)
return err
}
s, err := net.Listen("unix", socketPath)
if err != nil {
glog.Errorf(errListenSocket+" %+v", err)
@ -133,11 +135,11 @@ func (m *ManagerImpl) Start() error {
// Devices is the map of devices that are known by the Device
// Plugin manager with the kind of the devices as key
func (m *ManagerImpl) Devices() map[string][]*pluginapi.Device {
func (m *ManagerImpl) Devices() map[string][]pluginapi.Device {
m.mutex.Lock()
defer m.mutex.Unlock()
devs := make(map[string][]*pluginapi.Device)
devs := make(map[string][]pluginapi.Device)
for k, e := range m.endpoints {
glog.V(3).Infof("Endpoint: %+v: %+v", k, e)
devs[k] = e.getDevices()
@ -174,8 +176,8 @@ func (m *ManagerImpl) Register(ctx context.Context,
return &pluginapi.Empty{}, fmt.Errorf(errUnsuportedVersion)
}
if err := IsResourceNameValid(r.ResourceName); err != nil {
return &pluginapi.Empty{}, err
if !v1helper.IsExtendedResourceName(v1.ResourceName(r.ResourceName)) {
return &pluginapi.Empty{}, fmt.Errorf(errInvalidResourceName, r.ResourceName)
}
// TODO: for now, always accepts newest device plugin. Later may consider to
@ -231,7 +233,7 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
glog.V(2).Infof("Delete resource for endpoint %v", e)
delete(m.endpoints, r.ResourceName)
// Issues callback to delete all of devices.
e.callback(e.resourceName, []*pluginapi.Device{}, []*pluginapi.Device{}, e.getDevices())
e.callback(e.resourceName, []pluginapi.Device{}, []pluginapi.Device{}, e.getDevices())
}
glog.V(2).Infof("Unregistered endpoint %v", e)
m.mutex.Unlock()

View File

@ -33,15 +33,15 @@ const (
)
func TestNewManagerImpl(t *testing.T) {
_, err := NewManagerImpl("", func(n string, a, u, r []*pluginapi.Device) {})
_, err := NewManagerImpl("", func(n string, a, u, r []pluginapi.Device) {})
require.Error(t, err)
_, err = NewManagerImpl(socketName, func(n string, a, u, r []*pluginapi.Device) {})
_, err = NewManagerImpl(socketName, func(n string, a, u, r []pluginapi.Device) {})
require.NoError(t, err)
}
func TestNewManagerImplStart(t *testing.T) {
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []*pluginapi.Device) {})
m, p := setup(t, []*pluginapi.Device{}, func(n string, a, u, r []pluginapi.Device) {})
cleanup(t, m, p)
}
@ -58,7 +58,7 @@ func TestDevicePluginReRegistration(t *testing.T) {
callbackChan := make(chan int)
var stopping int32
stopping = 0
callback := func(n string, a, u, r []*pluginapi.Device) {
callback := func(n string, a, u, r []pluginapi.Device) {
// Should be called twice, one for each plugin registration, till we are stopping.
if callbackCount > 1 && atomic.LoadInt32(&stopping) <= 0 {
t.FailNow()

View File

@ -23,7 +23,7 @@ import (
// MonitorCallback is the function called when a device's health state changes,
// or new devices are reported, or old devices are deleted.
// Updated contains the most recent state of the Device.
type MonitorCallback func(resourceName string, added, updated, deleted []*pluginapi.Device)
type MonitorCallback func(resourceName string, added, updated, deleted []pluginapi.Device)
// Manager manages all the Device Plugins running on a node.
type Manager interface {
@ -33,7 +33,7 @@ type Manager interface {
// Devices is the map of devices that have registered themselves
// against the manager.
// The map key is the ResourceName of the device plugins.
Devices() map[string][]*pluginapi.Device
Devices() map[string][]pluginapi.Device
// Allocate takes resourceName and list of device Ids, and calls the
// gRPC Allocate on the device plugin matching the resourceName.
@ -59,7 +59,7 @@ const (
errDevicePluginAlreadyExists = "another device plugin already registered this Resource Name"
// errInvalidResourceName is the error raised when a device plugin is registering
// itself with an invalid ResourceName
errInvalidResourceName = "the ResourceName is invalid"
errInvalidResourceName = "the ResourceName %s is invalid"
// errEmptyResourceName is the error raised when the resource name field is empty
errEmptyResourceName = "invalid Empty ResourceName"

View File

@ -1,61 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"fmt"
"k8s.io/api/core/v1"
v1helper "k8s.io/kubernetes/pkg/api/v1/helper"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
)
func cloneDevice(d *pluginapi.Device) *pluginapi.Device {
return &pluginapi.Device{
ID: d.ID,
Health: d.Health,
}
}
func copyDevices(devs map[string]*pluginapi.Device) []*pluginapi.Device {
var clones []*pluginapi.Device
for _, d := range devs {
clones = append(clones, cloneDevice(d))
}
return clones
}
// IsResourceNameValid returns an error if the resource is invalid or is not an
// extended resource name.
func IsResourceNameValid(resourceName string) error {
if resourceName == "" {
return fmt.Errorf(errEmptyResourceName)
}
if !IsDeviceName(v1.ResourceName(resourceName)) {
return fmt.Errorf(errInvalidResourceName)
}
return nil
}
// IsDeviceName returns whether the ResourceName points to an extended resource
// name exported by a device plugin.
func IsDeviceName(k v1.ResourceName) bool {
return v1helper.IsExtendedResourceName(k)
}

View File

@ -1,53 +0,0 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package deviceplugin
import (
"testing"
"github.com/stretchr/testify/require"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1alpha"
)
func TestCloneDevice(t *testing.T) {
d := cloneDevice(&pluginapi.Device{ID: "ADeviceId", Health: pluginapi.Healthy})
require.Equal(t, d.ID, "ADeviceId")
require.Equal(t, d.Health, pluginapi.Healthy)
}
func TestCopyDevices(t *testing.T) {
d := map[string]*pluginapi.Device{
"ADeviceId": {ID: "ADeviceId", Health: pluginapi.Healthy},
}
devs := copyDevices(d)
require.Len(t, devs, 1)
}
func TestIsResourceName(t *testing.T) {
require.NotNil(t, IsResourceNameValid(""))
require.NotNil(t, IsResourceNameValid("cpu"))
require.NotNil(t, IsResourceNameValid("name1"))
require.NotNil(t, IsResourceNameValid("alpha.kubernetes.io/name1"))
require.NotNil(t, IsResourceNameValid("beta.kubernetes.io/name1"))
require.NotNil(t, IsResourceNameValid("kubernetes.io/name1"))
require.Nil(t, IsResourceNameValid("domain1.io/name1"))
require.Nil(t, IsResourceNameValid("alpha.domain1.io/name1"))
require.Nil(t, IsResourceNameValid("beta.domain1.io/name1"))
}