diff --git a/pkg/kubelet/cm/devicemanager/endpoint.go b/pkg/kubelet/cm/devicemanager/endpoint.go index 6f57a482ef..26b3952d04 100644 --- a/pkg/kubelet/cm/devicemanager/endpoint.go +++ b/pkg/kubelet/cm/devicemanager/endpoint.go @@ -39,6 +39,8 @@ type endpoint interface { preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) getDevices() []pluginapi.Device callback(resourceName string, added, updated, deleted []pluginapi.Device) + isStopped() bool + stopGracePeriodExpired() bool } type endpointImpl struct { @@ -47,6 +49,7 @@ type endpointImpl struct { socketPath string resourceName string + stopTime time.Time devices map[string]pluginapi.Device mutex sync.Mutex @@ -55,6 +58,7 @@ type endpointImpl struct { } // newEndpoint creates a new endpoint for the given resourceName. +// This is to be used during normal device plugin registration. func newEndpointImpl(socketPath, resourceName string, devices map[string]pluginapi.Device, callback monitorCallback) (*endpointImpl, error) { client, c, err := dial(socketPath) if err != nil { @@ -74,6 +78,16 @@ func newEndpointImpl(socketPath, resourceName string, devices map[string]plugina }, nil } +// newStoppedEndpointImpl creates a new endpoint for the given resourceName with stopTime set. +// This is to be used during Kubelet restart, before the actual device plugin re-registers. +func newStoppedEndpointImpl(resourceName string, devices map[string]pluginapi.Device) *endpointImpl { + return &endpointImpl{ + resourceName: resourceName, + devices: devices, + stopTime: time.Now(), + } +} + func (e *endpointImpl) callback(resourceName string, added, updated, deleted []pluginapi.Device) { e.cb(resourceName, added, updated, deleted) } @@ -176,8 +190,30 @@ func (e *endpointImpl) run() { } } +func (e *endpointImpl) isStopped() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() +} + +func (e *endpointImpl) stopGracePeriodExpired() bool { + e.mutex.Lock() + defer e.mutex.Unlock() + return !e.stopTime.IsZero() && time.Since(e.stopTime) > endpointStopGracePeriod +} + +// used for testing only +func (e *endpointImpl) setStopTime(t time.Time) { + e.mutex.Lock() + defer e.mutex.Unlock() + e.stopTime = t +} + // allocate issues Allocate gRPC call to the device plugin. func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, error) { + if e.isStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } return e.client.Allocate(context.Background(), &pluginapi.AllocateRequest{ ContainerRequests: []*pluginapi.ContainerAllocateRequest{ {DevicesIDs: devs}, @@ -187,6 +223,9 @@ func (e *endpointImpl) allocate(devs []string) (*pluginapi.AllocateResponse, err // preStartContainer issues PreStartContainer gRPC call to the device plugin. func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartContainerResponse, error) { + if e.isStopped() { + return nil, fmt.Errorf(errEndpointStopped, e) + } ctx, cancel := context.WithTimeout(context.Background(), pluginapi.KubeletPreStartContainerRPCTimeoutInSecs*time.Second) defer cancel() return e.client.PreStartContainer(ctx, &pluginapi.PreStartContainerRequest{ @@ -195,7 +234,12 @@ func (e *endpointImpl) preStartContainer(devs []string) (*pluginapi.PreStartCont } func (e *endpointImpl) stop() { - e.clientConn.Close() + e.mutex.Lock() + defer e.mutex.Unlock() + if e.clientConn != nil { + e.clientConn.Close() + } + e.stopTime = time.Now() } // dial establishes the gRPC communication with the registered device plugin. https://godoc.org/google.golang.org/grpc#Dial diff --git a/pkg/kubelet/cm/devicemanager/manager.go b/pkg/kubelet/cm/devicemanager/manager.go index ce2093edf8..c8cc006589 100644 --- a/pkg/kubelet/cm/devicemanager/manager.go +++ b/pkg/kubelet/cm/devicemanager/manager.go @@ -377,18 +377,28 @@ func (m *ManagerImpl) addEndpoint(r *pluginapi.RegisterRequest) { go func() { e.run() e.stop() - m.mutex.Lock() if old, ok := m.endpoints[r.ResourceName]; ok && old == e { - glog.V(2).Infof("Delete resource for endpoint %v", e) - delete(m.endpoints, r.ResourceName) + m.markResourceUnhealthy(r.ResourceName) } - glog.V(2).Infof("Unregistered endpoint %v", e) m.mutex.Unlock() }() } +func (m *ManagerImpl) markResourceUnhealthy(resourceName string) { + glog.V(2).Infof("Mark all resources Unhealthy for resource %s", resourceName) + healthyDevices := sets.NewString() + if _, ok := m.healthyDevices[resourceName]; ok { + healthyDevices = m.healthyDevices[resourceName] + m.healthyDevices[resourceName] = sets.NewString() + } + if _, ok := m.unhealthyDevices[resourceName]; !ok { + m.unhealthyDevices[resourceName] = sets.NewString() + } + m.unhealthyDevices[resourceName] = m.unhealthyDevices[resourceName].Union(healthyDevices) +} + // GetCapacity is expected to be called when Kubelet updates its node status. // The first returned variable contains the registered device plugin resource capacity. // The second returned variable contains the registered device plugin resource allocatable. @@ -405,12 +415,20 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) needsUpdateCheckpoint := false var capacity = v1.ResourceList{} var allocatable = v1.ResourceList{} - var deletedResources []string + deletedResources := sets.NewString() m.mutex.Lock() for resourceName, devices := range m.healthyDevices { - if _, ok := m.endpoints[resourceName]; !ok { + e, ok := m.endpoints[resourceName] + if (ok && e.stopGracePeriodExpired()) || !ok { + // The resources contained in endpoints and (un)healthyDevices + // should always be consistent. Otherwise, we run with the risk + // of failing to garbage collect non-existing resources or devices. + if !ok { + glog.Errorf("unexpected: healthyDevices and endpoints are out of sync") + } + delete(m.endpoints, resourceName) delete(m.healthyDevices, resourceName) - deletedResources = append(deletedResources, resourceName) + deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { capacity[v1.ResourceName(resourceName)] = *resource.NewQuantity(int64(devices.Len()), resource.DecimalSI) @@ -418,17 +436,14 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) } } for resourceName, devices := range m.unhealthyDevices { - if _, ok := m.endpoints[resourceName]; !ok { + e, ok := m.endpoints[resourceName] + if (ok && e.stopGracePeriodExpired()) || !ok { + if !ok { + glog.Errorf("unexpected: unhealthyDevices and endpoints are out of sync") + } + delete(m.endpoints, resourceName) delete(m.unhealthyDevices, resourceName) - alreadyDeleted := false - for _, name := range deletedResources { - if name == resourceName { - alreadyDeleted = true - } - } - if !alreadyDeleted { - deletedResources = append(deletedResources, resourceName) - } + deletedResources.Insert(resourceName) needsUpdateCheckpoint = true } else { capacityCount := capacity[v1.ResourceName(resourceName)] @@ -441,7 +456,7 @@ func (m *ManagerImpl) GetCapacity() (v1.ResourceList, v1.ResourceList, []string) if needsUpdateCheckpoint { m.writeCheckpoint() } - return capacity, allocatable, deletedResources + return capacity, allocatable, deletedResources.UnsortedList() } // checkpointData struct is used to store pod to device allocation information @@ -495,12 +510,12 @@ func (m *ManagerImpl) readCheckpoint() error { defer m.mutex.Unlock() m.podDevices.fromCheckpointData(data.PodDeviceEntries) m.allocatedDevices = m.podDevices.devices() - for resource, devices := range data.RegisteredDevices { - // TODO: Support Checkpointing for unhealthy devices as well + for resource := range data.RegisteredDevices { + // During start up, creates empty healthyDevices list so that the resource capacity + // will stay zero till the corresponding device plugin re-registers. m.healthyDevices[resource] = sets.NewString() - for _, dev := range devices { - m.healthyDevices[resource].Insert(dev) - } + m.unhealthyDevices[resource] = sets.NewString() + m.endpoints[resource] = newStoppedEndpointImpl(resource, make(map[string]pluginapi.Device)) } return nil } @@ -688,6 +703,8 @@ func (m *ManagerImpl) GetDeviceRunContainerOptions(pod *v1.Pod, container *v1.Co return m.podDevices.deviceRunContainerOptions(string(pod.UID), container.Name), nil } +// callPreStartContainerIfNeeded issues PreStartContainer grpc call for device plugin resource +// with PreStartRequired option set. func (m *ManagerImpl) callPreStartContainerIfNeeded(podUID, contName, resource string) error { m.mutex.Lock() opts, ok := m.pluginOpts[resource] diff --git a/pkg/kubelet/cm/devicemanager/manager_test.go b/pkg/kubelet/cm/devicemanager/manager_test.go index 643b965387..b0ef2857fd 100644 --- a/pkg/kubelet/cm/devicemanager/manager_test.go +++ b/pkg/kubelet/cm/devicemanager/manager_test.go @@ -192,7 +192,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Adds three devices for resource1, two healthy and one unhealthy. // Expects capacity for resource1 to be 2. resourceName1 := "domain1.com/resource1" - testManager.endpoints[resourceName1] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + e1 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + testManager.endpoints[resourceName1] = e1 callback(resourceName1, devs, []pluginapi.Device{}, []pluginapi.Device{}) capacity, allocatable, removedResources := testManager.GetCapacity() resource1Capacity, ok := capacity[v1.ResourceName(resourceName1)] @@ -240,7 +241,8 @@ func TestUpdateCapacityAllocatable(t *testing.T) { // Tests adding another resource. resourceName2 := "resource2" - testManager.endpoints[resourceName2] = &endpointImpl{devices: make(map[string]pluginapi.Device)} + e2 := &endpointImpl{devices: make(map[string]pluginapi.Device)} + testManager.endpoints[resourceName2] = e2 callback(resourceName2, devs, []pluginapi.Device{}, []pluginapi.Device{}) capacity, allocatable, removedResources = testManager.GetCapacity() as.Equal(2, len(capacity)) @@ -252,9 +254,9 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.Equal(int64(2), resource2Allocatable.Value()) as.Equal(0, len(removedResources)) - // Removes resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 + // Expires resourceName1 endpoint. Verifies testManager.GetCapacity() reports that resourceName1 // is removed from capacity and it no longer exists in healthyDevices after the call. - delete(testManager.endpoints, resourceName1) + e1.setStopTime(time.Now().Add(-1*endpointStopGracePeriod - time.Duration(10)*time.Second)) capacity, allocatable, removed := testManager.GetCapacity() as.Equal([]string{resourceName1}, removed) _, ok = capacity[v1.ResourceName(resourceName1)] @@ -266,9 +268,49 @@ func TestUpdateCapacityAllocatable(t *testing.T) { as.False(ok) _, ok = testManager.unhealthyDevices[resourceName1] as.False(ok) - fmt.Println("removed: ", removed) - as.Equal(1, len(removed)) + _, ok = testManager.endpoints[resourceName1] + as.False(ok) + as.Equal(1, len(testManager.endpoints)) + // Stops resourceName2 endpoint. Verifies its stopTime is set, allocate and + // preStartContainer calls return errors. + e2.stop() + as.False(e2.stopTime.IsZero()) + _, err = e2.allocate([]string{"Device1"}) + reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) + _, err = e2.preStartContainer([]string{"Device1"}) + reflect.DeepEqual(err, fmt.Errorf(errEndpointStopped, e2)) + // Marks resourceName2 unhealthy and verifies its capacity/allocatable are + // correctly updated. + testManager.markResourceUnhealthy(resourceName2) + capacity, allocatable, removed = testManager.GetCapacity() + val, ok = capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(3), val.Value()) + val, ok = allocatable[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(0), val.Value()) + as.Empty(removed) + // Writes and re-reads checkpoints. Verifies we create a stopped endpoint + // for resourceName2, its capacity is set to zero, and we still consider + // it as a DevicePlugin resource. This makes sure any pod that was scheduled + // during the time of propagating capacity change to the scheduler will be + // properly rejected instead of being incorrectly started. + err = testManager.writeCheckpoint() + as.Nil(err) + testManager.healthyDevices = make(map[string]sets.String) + testManager.unhealthyDevices = make(map[string]sets.String) + err = testManager.readCheckpoint() + as.Nil(err) + as.Equal(1, len(testManager.endpoints)) + _, ok = testManager.endpoints[resourceName2] + as.True(ok) + capacity, allocatable, removed = testManager.GetCapacity() + val, ok = capacity[v1.ResourceName(resourceName2)] + as.True(ok) + as.Equal(int64(0), val.Value()) + as.Empty(removed) + as.True(testManager.isDevicePluginResource(resourceName2)) } func constructDevices(devices []string) sets.String { @@ -312,7 +354,9 @@ func TestCheckpoint(t *testing.T) { defer os.RemoveAll(tmpDir) testManager := &ManagerImpl{ socketdir: tmpDir, + endpoints: make(map[string]endpoint), healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), podDevices: make(podDevices), } @@ -414,6 +458,10 @@ func (m *MockEndpoint) allocate(devs []string) (*pluginapi.AllocateResponse, err return nil, nil } +func (m *MockEndpoint) isStopped() bool { return false } + +func (m *MockEndpoint) stopGracePeriodExpired() bool { return false } + func makePod(limits v1.ResourceList) *v1.Pod { return &v1.Pod{ ObjectMeta: metav1.ObjectMeta{ @@ -437,6 +485,7 @@ func getTestManager(tmpDir string, activePods ActivePodsFunc, testRes []TestReso socketdir: tmpDir, callback: monitorCallback, healthyDevices: make(map[string]sets.String), + unhealthyDevices: make(map[string]sets.String), allocatedDevices: make(map[string]sets.String), endpoints: make(map[string]endpoint), pluginOpts: opts, diff --git a/pkg/kubelet/cm/devicemanager/types.go b/pkg/kubelet/cm/devicemanager/types.go index d069484ccb..e1d07ef5fa 100644 --- a/pkg/kubelet/cm/devicemanager/types.go +++ b/pkg/kubelet/cm/devicemanager/types.go @@ -17,6 +17,8 @@ limitations under the License. package devicemanager import ( + "time" + "k8s.io/api/core/v1" pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1" "k8s.io/kubernetes/pkg/kubelet/config" @@ -86,6 +88,8 @@ const ( errInvalidResourceName = "the ResourceName %q is invalid" // errEmptyResourceName is the error raised when the resource name field is empty errEmptyResourceName = "invalid Empty ResourceName" + // errEndpointStopped indicates that the endpoint has been stopped + errEndpointStopped = "endpoint %v has been stopped" // errBadSocket is the error raised when the registry socket path is not absolute errBadSocket = "bad socketPath, must be an absolute path:" @@ -96,3 +100,9 @@ const ( // errListAndWatch is the error raised when ListAndWatch ended unsuccessfully errListAndWatch = "listAndWatch ended unexpectedly for device plugin %s with error %v" ) + +// endpointStopGracePeriod indicates the grace period after an endpoint is stopped +// because its device plugin fails. DeviceManager keeps the stopped endpoint in its +// cache during this grace period to cover the time gap for the capacity change to +// take effect. +const endpointStopGracePeriod = time.Duration(5) * time.Minute diff --git a/test/e2e_node/gpu_device_plugin.go b/test/e2e_node/gpu_device_plugin.go index 29521c7813..6984520168 100644 --- a/test/e2e_node/gpu_device_plugin.go +++ b/test/e2e_node/gpu_device_plugin.go @@ -92,6 +92,10 @@ var _ = framework.KubeDescribe("NVIDIA GPU Device Plugin [Feature:GPUDevicePlugi By("Restarting Kubelet and creating another pod") restartKubelet() + framework.WaitForAllNodesSchedulable(f.ClientSet, framework.TestContext.NodeSchedulableTimeout) + Eventually(func() bool { + return framework.NumberOfNVIDIAGPUs(getLocalNode(f)) > 0 + }, 10*time.Second, framework.Poll).Should(BeTrue()) p2 := f.PodClient().CreateSync(makeBusyboxPod(framework.NVIDIAGPUResourceName, podRECMD)) By("Checking that pods got a different GPU")