Fixes the races around devicemanager Allocate() and endpoint deletion.

There is a race in predicateAdmitHandler Admit() that getNodeAnyWayFunc()
could get Node with non-zero deviceplugin resource allocatable for a
non-existing endpoint. That race can happen when a device plugin fails,
but is more likely when kubelet restarts as with the current registration
model, there is a time gap between kubelet restart and device plugin
re-registration. During this time window, even though devicemanager could
have removed the resource initially during GetCapacity() call, Kubelet
may overwrite the device plugin resource capacity/allocatable with the
old value when node update from the API server comes in later. This
could cause a pod to be started without proper device runtime config set.

To solve this problem, introduce endpointStopGracePeriod. When a device
plugin fails, don't immediately remove the endpoint but set stopTime in
its endpoint. During kubelet restart, create endpoints with stopTime set
for any checkpointed registered resource. The endpoint is considered to be
in stopGracePeriod if its stoptime is set. This allows us to track what
resources should be handled by devicemanager during the time gap.
When an endpoint's stopGracePeriod expires, we remove the endpoint and
its resource. This allows the resource to be exported through other channels
(e.g., by directly updating node status through API server) if there is such
use case. Currently endpointStopGracePeriod is set as 5 minutes.

Given that an endpoint is no longer immediately removed upon disconnection,
mark all its devices unhealthy so that we can signal the resource allocatable
change to the scheduler to avoid scheduling more pods to the node.
When a device plugin endpoint is in stopGracePeriod, pods requesting the
corresponding resource will fail admission handler.
pull/6/head
Jiaying Zhang 2018-03-05 13:54:43 -08:00
parent 79257fe611
commit 5514a1f4dd
5 changed files with 154 additions and 30 deletions

View File

@ -39,6 +39,8 @@ type endpoint interface {
preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error)
getDevices() []pluginapi.Device getDevices() []pluginapi.Device
callback(resourceName string, added, updated, deleted []pluginapi.Device) callback(resourceName string, added, updated, deleted []pluginapi.Device)
isStopped() bool
stopGracePeriodExpired() bool
} }
type endpointImpl struct { type endpointImpl struct {
@ -47,6 +49,7 @@ type endpointImpl struct {
socketPath string socketPath string
resourceName string resourceName string
stopTime time.Time
devices map[string]pluginapi.Device devices map[string]pluginapi.Device
mutex sync.Mutex mutex sync.Mutex
@ -55,6 +58,7 @@ type endpointImpl struct {
} }
// newEndpoint creates a new endpoint for the given resourceName. // newEndpoint creates a new endpoint for the given resourceName.
// This is to be used during normal device plugin registration.
func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) {
client, c, err := dial(socketPath) client, c, err := dial(socketPath)
if err != nil { if err != nil {
@ -74,6 +78,16 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina
}, nil }, nil
} }
// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set.
// This is to be used during Kubelet restart, before the actual device plugin re-registers.
func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl {
return &endpointImpl{
resourceName: resourceName,
devices: devices,
stopTime: time.Now(),
}
}
func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) {
e.cb(resourceName, added, updated, deleted) e.cb(resourceName, added, updated, deleted)
} }
@ -176,8 +190,30 @@ func (e *endpointImpl) run() {
} }
} }
func (e *endpointImpl) isStopped() bool {
e.mutex.Lock()
defer e.mutex.Unlock()
return !e.stopTime.IsZero()
}
func (e *endpointImpl) stopGracePeriodExpired() bool {
e.mutex.Lock()
defer e.mutex.Unlock()
return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod
}
// used for testing only
func (e *endpointImpl) setStopTime(t time.Time) {
e.mutex.Lock()
defer e.mutex.Unlock()
e.stopTime = t
}
// allocate issues Allocate gRPC call to the device plugin. // allocate issues Allocate gRPC call to the device plugin.
func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) { func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) {
if e.isStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{
ContainerRequests: []*pluginapi.ContainerAllocateRequest{ ContainerRequests: []*pluginapi.ContainerAllocateRequest{
{DevicesIDs: devs}, {DevicesIDs: devs},
@ -187,6 +223,9 @@ func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, err
// preStartContainer issues PreStartContainer gRPC call to the device plugin. // preStartContainer issues PreStartContainer gRPC call to the device plugin.
func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) {
if e.isStopped() {
return nil, fmt.Errorf(errEndpointStopped, e)
}
ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second) ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second)
defer cancel() defer cancel()
return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{ return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{
@ -195,7 +234,12 @@ func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartCont
} }
func (e *endpointImpl) stop() { func (e *endpointImpl) stop() {
e.clientConn.Close() e.mutex.Lock()
defer e.mutex.Unlock()
if e.clientConn != nil {
e.clientConn.Close()
}
e.stopTime = time.Now()
} }
// dial establishes the gRPC communication with the registered device plugin. https://godoc.org/google.golang.org/grpc#Dial // dial establishes the gRPC communication with the registered device plugin. https://godoc.org/google.golang.org/grpc#Dial

View File

@ -377,18 +377,28 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) {
go func() { go func() {
e.run() e.run()
e.stop() e.stop()
m.mutex.Lock() m.mutex.Lock()
if old, ok := m.endpoints[r.ResourceName]; ok && old == e { if old, ok := m.endpoints[r.ResourceName]; ok && old == e {
glog.V(2).Infof("Delete resource for endpoint %v", e) m.markResourceUnhealthy(r.ResourceName)
delete(m.endpoints, r.ResourceName)
} }
glog.V(2).Infof("Unregistered endpoint %v", e) glog.V(2).Infof("Unregistered endpoint %v", e)
m.mutex.Unlock() m.mutex.Unlock()
}() }()
} }
func (m *ManagerImpl) markResourceUnhealthy(resourceName string) {
glog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName)
healthyDevices := sets.NewString()
if _, ok := m.healthyDevices[resourceName]; ok {
healthyDevices = m.healthyDevices[resourceName]
m.healthyDevices[resourceName] = sets.NewString()
}
if _, ok := m.unhealthyDevices[resourceName]; !ok {
m.unhealthyDevices[resourceName] = sets.NewString()
}
m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices)
}
// GetCapacity is expected to be called when Kubelet updates its node status. // GetCapacity is expected to be called when Kubelet updates its node status.
// The first returned variable contains the registered device plugin resource capacity. // The first returned variable contains the registered device plugin resource capacity.
// The second returned variable contains the registered device plugin resource allocatable. // The second returned variable contains the registered device plugin resource allocatable.
@ -405,12 +415,20 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
needsUpdateCheckpoint := false needsUpdateCheckpoint := false
var capacity = v1.ResourceList{} var capacity = v1.ResourceList{}
var allocatable = v1.ResourceList{} var allocatable = v1.ResourceList{}
var deletedResources []string deletedResources := sets.NewString()
m.mutex.Lock() m.mutex.Lock()
for resourceName, devices := range m.healthyDevices { for resourceName, devices := range m.healthyDevices {
if _, ok := m.endpoints[resourceName]; !ok { e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
// The resources contained in endpoints and (un)healthyDevices
// should always be consistent. Otherwise, we run with the risk
// of failing to garbage collect non-existing resources or devices.
if !ok {
glog.Errorf("unexpected: healthyDevices and endpoints are out of sync")
}
delete(m.endpoints, resourceName)
delete(m.healthyDevices, resourceName) delete(m.healthyDevices, resourceName)
deletedResources = append(deletedResources, resourceName) deletedResources.Insert(resourceName)
needsUpdateCheckpoint = true needsUpdateCheckpoint = true
} else { } else {
capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI)
@ -418,17 +436,14 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
} }
} }
for resourceName, devices := range m.unhealthyDevices { for resourceName, devices := range m.unhealthyDevices {
if _, ok := m.endpoints[resourceName]; !ok { e, ok := m.endpoints[resourceName]
if (ok && e.stopGracePeriodExpired()) || !ok {
if !ok {
glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync")
}
delete(m.endpoints, resourceName)
delete(m.unhealthyDevices, resourceName) delete(m.unhealthyDevices, resourceName)
alreadyDeleted := false deletedResources.Insert(resourceName)
for _, name := range deletedResources {
if name == resourceName {
alreadyDeleted = true
}
}
if !alreadyDeleted {
deletedResources = append(deletedResources, resourceName)
}
needsUpdateCheckpoint = true needsUpdateCheckpoint = true
} else { } else {
capacityCount := capacity[v1.ResourceName(resourceName)] capacityCount := capacity[v1.ResourceName(resourceName)]
@ -441,7 +456,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string)
if needsUpdateCheckpoint { if needsUpdateCheckpoint {
m.writeCheckpoint() m.writeCheckpoint()
} }
return capacity, allocatable, deletedResources return capacity, allocatable, deletedResources.UnsortedList()
} }
// checkpointData struct is used to store pod to device allocation information // checkpointData struct is used to store pod to device allocation information
@ -495,12 +510,12 @@ func (m *ManagerImpl) readCheckpoint() error {
defer m.mutex.Unlock() defer m.mutex.Unlock()
m.podDevices.fromCheckpointData(data.PodDeviceEntries) m.podDevices.fromCheckpointData(data.PodDeviceEntries)
m.allocatedDevices = m.podDevices.devices() m.allocatedDevices = m.podDevices.devices()
for resource, devices := range data.RegisteredDevices { for resource := range data.RegisteredDevices {
// TODO: Support Checkpointing for unhealthy devices as well // During start up, creates empty healthyDevices list so that the resource capacity
// will stay zero till the corresponding device plugin re-registers.
m.healthyDevices[resource] = sets.NewString() m.healthyDevices[resource] = sets.NewString()
for _, dev := range devices { m.unhealthyDevices[resource] = sets.NewString()
m.healthyDevices[resource].Insert(dev) m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device))
}
} }
return nil return nil
} }
@ -688,6 +703,8 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co
return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil
} }
// callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource
// with PreStartRequired option set.
func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error { func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error {
m.mutex.Lock() m.mutex.Lock()
opts, ok := m.pluginOpts[resource] opts, ok := m.pluginOpts[resource]

View File

@ -192,7 +192,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Adds three devices for resource1, two healthy and one unhealthy. // Adds three devices for resource1, two healthy and one unhealthy.
// Expects capacity for resource1 to be 2. // Expects capacity for resource1 to be 2.
resourceName1 := "domain1.com/resource1" resourceName1 := "domain1.com/resource1"
testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)} e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
testManager.endpoints[resourceName1] = e1
callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, allocatable, removedResources := testManager.GetCapacity() capacity, allocatable, removedResources := testManager.GetCapacity()
resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)]
@ -240,7 +241,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
// Tests adding another resource. // Tests adding another resource.
resourceName2 := "resource2" resourceName2 := "resource2"
testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)} e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)}
testManager.endpoints[resourceName2] = e2
callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{})
capacity, allocatable, removedResources = testManager.GetCapacity() capacity, allocatable, removedResources = testManager.GetCapacity()
as.Equal(2, len(capacity)) as.Equal(2, len(capacity))
@ -252,9 +254,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
as.Equal(int64(2), resource2Allocatable.Value()) as.Equal(int64(2), resource2Allocatable.Value())
as.Equal(0, len(removedResources)) as.Equal(0, len(removedResources))
// Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1
// is removed from capacity and it no longer exists in healthyDevices after the call. // is removed from capacity and it no longer exists in healthyDevices after the call.
delete(testManager.endpoints, resourceName1) e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second))
capacity, allocatable, removed := testManager.GetCapacity() capacity, allocatable, removed := testManager.GetCapacity()
as.Equal([]string{resourceName1}, removed) as.Equal([]string{resourceName1}, removed)
_, ok = capacity[v1.ResourceName(resourceName1)] _, ok = capacity[v1.ResourceName(resourceName1)]
@ -266,9 +268,49 @@ func TestUpdateCapacityAllocatable(t *testing.T) {
as.False(ok) as.False(ok)
_, ok = testManager.unhealthyDevices[resourceName1] _, ok = testManager.unhealthyDevices[resourceName1]
as.False(ok) as.False(ok)
fmt.Println("removed: ", removed) _, ok = testManager.endpoints[resourceName1]
as.Equal(1, len(removed)) as.False(ok)
as.Equal(1, len(testManager.endpoints))
// Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and
// preStartContainer calls return errors.
e2.stop()
as.False(e2.stopTime.IsZero())
_, err = e2.allocate([]string{"Device1"})
reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
_, err = e2.preStartContainer([]string{"Device1"})
reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2))
// Marks resourceName2 unhealthy and verifies its capacity/allocatable are
// correctly updated.
testManager.markResourceUnhealthy(resourceName2)
capacity, allocatable, removed = testManager.GetCapacity()
val, ok = capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(3), val.Value())
val, ok = allocatable[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(0), val.Value())
as.Empty(removed)
// Writes and re-reads checkpoints. Verifies we create a stopped endpoint
// for resourceName2, its capacity is set to zero, and we still consider
// it as a DevicePlugin resource. This makes sure any pod that was scheduled
// during the time of propagating capacity change to the scheduler will be
// properly rejected instead of being incorrectly started.
err = testManager.writeCheckpoint()
as.Nil(err)
testManager.healthyDevices = make(map[string]sets.String)
testManager.unhealthyDevices = make(map[string]sets.String)
err = testManager.readCheckpoint()
as.Nil(err)
as.Equal(1, len(testManager.endpoints))
_, ok = testManager.endpoints[resourceName2]
as.True(ok)
capacity, allocatable, removed = testManager.GetCapacity()
val, ok = capacity[v1.ResourceName(resourceName2)]
as.True(ok)
as.Equal(int64(0), val.Value())
as.Empty(removed)
as.True(testManager.isDevicePluginResource(resourceName2))
} }
func constructDevices(devices []string) sets.String { func constructDevices(devices []string) sets.String {
@ -312,7 +354,9 @@ func TestCheckpoint(t *testing.T) {
defer os.RemoveAll(tmpDir) defer os.RemoveAll(tmpDir)
testManager := &ManagerImpl{ testManager := &ManagerImpl{
socketdir: tmpDir, socketdir: tmpDir,
endpoints: make(map[string]endpoint),
healthyDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices), podDevices: make(podDevices),
} }
@ -414,6 +458,10 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err
return nil, nil return nil, nil
} }
func (m *MockEndpoint) isStopped() bool { return false }
func (m *MockEndpoint) stopGracePeriodExpired() bool { return false }
func makePod(limits v1.ResourceList) *v1.Pod { func makePod(limits v1.ResourceList) *v1.Pod {
return &v1.Pod{ return &v1.Pod{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -437,6 +485,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso
socketdir: tmpDir, socketdir: tmpDir,
callback: monitorCallback, callback: monitorCallback,
healthyDevices: make(map[string]sets.String), healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String),
endpoints: make(map[string]endpoint), endpoints: make(map[string]endpoint),
pluginOpts: opts, pluginOpts: opts,

View File

@ -17,6 +17,8 @@ limitations under the License.
package devicemanager package devicemanager
import ( import (
"time"
"k8s.io/api/core/v1" "k8s.io/api/core/v1"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
"k8s.io/kubernetes/pkg/kubelet/config" "k8s.io/kubernetes/pkg/kubelet/config"
@ -86,6 +88,8 @@ const (
errInvalidResourceName = "the ResourceName %q is invalid" errInvalidResourceName = "the ResourceName %q is invalid"
// errEmptyResourceName is the error raised when the resource name field is empty // errEmptyResourceName is the error raised when the resource name field is empty
errEmptyResourceName = "invalid Empty ResourceName" errEmptyResourceName = "invalid Empty ResourceName"
// errEndpointStopped indicates that the endpoint has been stopped
errEndpointStopped = "endpoint %v has been stopped"
// errBadSocket is the error raised when the registry socket path is not absolute // errBadSocket is the error raised when the registry socket path is not absolute
errBadSocket = "bad socketPath, must be an absolute path:" errBadSocket = "bad socketPath, must be an absolute path:"
@ -96,3 +100,9 @@ const (
// errListAndWatch is the error raised when ListAndWatch ended unsuccessfully // errListAndWatch is the error raised when ListAndWatch ended unsuccessfully
errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v" errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v"
) )
// endpointStopGracePeriod indicates the grace period after an endpoint is stopped
// because its device plugin fails. DeviceManager keeps the stopped endpoint in its
// cache during this grace period to cover the time gap for the capacity change to
// take effect.
const endpointStopGracePeriod = time.Duration(5) * time.Minute

View File

@ -92,6 +92,10 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi
By("Restarting Kubelet and creating another pod") By("Restarting Kubelet and creating another pod")
restartKubelet() restartKubelet()
framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout)
Eventually(func() bool {
return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) > 0
}, 10*time.Second, framework.Poll).Should(BeTrue())
p2 := f.PodClient().CreateSync(makeBusyboxPod(framework.NVIDIAGPUResourceName, podRECMD)) p2 := f.PodClient().CreateSync(makeBusyboxPod(framework.NVIDIAGPUResourceName, podRECMD))
By("Checking that pods got a different GPU") By("Checking that pods got a different GPU")