Reset extended resources only when node is recreated.

k3s-v1.15.3
Richard Chen 2019-05-09 14:07:22 -07:00
parent 2fd1556eb3
commit c9f1b57b5b
11 changed files with 151 additions and 21 deletions

View File

@ -21,7 +21,7 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
// TODO: Migrate kubelet to either use its own internal objects or client library.
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
internalapi "k8s.io/cri-api/pkg/apis"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
@ -104,6 +104,10 @@ type ContainerManager interface {
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// ShouldResetExtendedResourceCapacity returns whether or not the extended resources should be zeroed,
// due to node recreation.
ShouldResetExtendedResourceCapacity() bool
}
type NodeConfig struct {

View File

@ -34,7 +34,7 @@ import (
"github.com/opencontainers/runc/libcontainer/configs"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
@ -897,3 +897,7 @@ func (cm *containerManagerImpl) GetDevicePluginResourceCapacity() (v1.ResourceLi
func (cm *containerManagerImpl) GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices {
return cm.deviceManager.GetDevices(podUID, containerName)
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return cm.deviceManager.ShouldResetExtendedResourceCapacity()
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package cm
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/klog"
"k8s.io/apimachinery/pkg/api/resource"
@ -32,7 +32,9 @@ import (
schedulernodeinfo "k8s.io/kubernetes/pkg/scheduler/nodeinfo"
)
type containerManagerStub struct{}
type containerManagerStub struct {
shouldResetExtendedResourceCapacity bool
}
var _ ContainerManager = &containerManagerStub{}
@ -110,6 +112,14 @@ func (cm *containerManagerStub) GetDevices(_, _ string) []*podresourcesapi.Conta
return nil
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{}
func (cm *containerManagerStub) ShouldResetExtendedResourceCapacity() bool {
return cm.shouldResetExtendedResourceCapacity
}
func NewStubContainerManager() ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: false}
}
func NewStubContainerManagerWithExtendedResource(shouldResetExtendedResourceCapacity bool) ContainerManager {
return &containerManagerStub{shouldResetExtendedResourceCapacity: shouldResetExtendedResourceCapacity}
}

View File

@ -24,7 +24,7 @@ package cm
import (
"fmt"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/tools/record"
@ -171,3 +171,7 @@ func (cm *containerManagerImpl) GetPodCgroupRoot() string {
func (cm *containerManagerImpl) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}
func (cm *containerManagerImpl) ShouldResetExtendedResourceCapacity() bool {
return false
}

View File

@ -14,6 +14,7 @@ go_library(
visibility = ["//visibility:public"],
deps = [
"//pkg/apis/core/v1/helper:go_default_library",
"//pkg/features:go_default_library",
"//pkg/kubelet/apis/deviceplugin/v1beta1:go_default_library",
"//pkg/kubelet/apis/pluginregistration/v1:go_default_library",
"//pkg/kubelet/apis/podresources/v1alpha1:go_default_library",
@ -30,6 +31,7 @@ go_library(
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/resource:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library",
"//vendor/google.golang.org/grpc:go_default_library",
"//vendor/k8s.io/klog:go_default_library",
],

View File

@ -28,10 +28,12 @@ import (
"google.golang.org/grpc"
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
"k8s.io/apimachinery/pkg/util/sets"
utilfeature "k8s.io/apiserver/pkg/util/feature"
v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper"
"k8s.io/kubernetes/pkg/features"
pluginapi "k8s.io/kubernetes/pkg/kubelet/apis/deviceplugin/v1beta1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@ -838,3 +840,17 @@ func (m *ManagerImpl) GetDevices(podUID, containerName string) []*podresourcesap
defer m.mutex.Unlock()
return m.podDevices.getContainerDevices(podUID, containerName)
}
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be zeroed or not,
// depending on whether the node has been recreated. Absence of the checkpoint file strongly indicates the node
// has been recreated.
func (m *ManagerImpl) ShouldResetExtendedResourceCapacity() bool {
if utilfeature.DefaultFeatureGate.Enabled(features.DevicePlugins) {
checkpoints, err := m.checkpointManager.ListCheckpoints()
if err != nil {
return false
}
return len(checkpoints) == 0
}
return false
}

View File

@ -17,7 +17,7 @@ limitations under the License.
package devicemanager
import (
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
@ -67,3 +67,8 @@ func (h *ManagerStub) GetWatcherHandler() pluginwatcher.PluginHandler {
func (h *ManagerStub) GetDevices(_, _ string) []*podresourcesapi.ContainerDevices {
return nil
}
// ShouldResetExtendedResourceCapacity returns false
func (h *ManagerStub) ShouldResetExtendedResourceCapacity() bool {
return false
}

View File

@ -27,7 +27,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
@ -946,6 +946,45 @@ func TestDevicePreStartContainer(t *testing.T) {
as.Equal(len(runContainerOpts.Envs), len(expectedResp.Envs))
}
func TestResetExtendedResource(t *testing.T) {
as := assert.New(t)
tmpDir, err := ioutil.TempDir("", "checkpoint")
as.Nil(err)
ckm, err := checkpointmanager.NewCheckpointManager(tmpDir)
as.Nil(err)
testManager := &ManagerImpl{
endpoints: make(map[string]endpointInfo),
healthyDevices: make(map[string]sets.String),
unhealthyDevices: make(map[string]sets.String),
allocatedDevices: make(map[string]sets.String),
podDevices: make(podDevices),
checkpointManager: ckm,
}
extendedResourceName := "domain.com/resource"
testManager.podDevices.insert("pod", "con", extendedResourceName,
constructDevices([]string{"dev1"}),
constructAllocResp(map[string]string{"/dev/dev1": "/dev/dev1"},
map[string]string{"/home/lib1": "/usr/lib1"}, map[string]string{}))
testManager.healthyDevices[extendedResourceName] = sets.NewString()
testManager.healthyDevices[extendedResourceName].Insert("dev1")
// checkpoint is present, indicating node hasn't been recreated
err = testManager.writeCheckpoint()
as.Nil(err)
as.False(testManager.ShouldResetExtendedResourceCapacity())
// checkpoint is absent, representing node recreation
ckpts, err := ckm.ListCheckpoints()
as.Nil(err)
for _, ckpt := range ckpts {
err = ckm.RemoveCheckpoint(ckpt)
as.Nil(err)
}
as.True(testManager.ShouldResetExtendedResourceCapacity())
}
func allocateStubFunc() func(devs []string) (*pluginapi.AllocateResponse, error) {
return func(devs []string) (*pluginapi.AllocateResponse, error) {
resp := new(pluginapi.ContainerAllocateResponse)

View File

@ -19,7 +19,7 @@ package devicemanager
import (
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
podresourcesapi "k8s.io/kubernetes/pkg/kubelet/apis/podresources/v1alpha1"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
@ -58,6 +58,11 @@ type Manager interface {
// GetDevices returns information about the devices assigned to pods and containers
GetDevices(podUID, containerName string) []*podresourcesapi.ContainerDevices
// ShouldResetExtendedResourceCapacity returns whether the extended resources should be reset or not,
// depending on the checkpoint file availability. Absence of the checkpoint file strongly indicates
// the node has been recreated.
ShouldResetExtendedResourceCapacity() bool
}
// DeviceRunContainerOptions contains the combined container runtime settings to consume its allocated devices.

View File

@ -26,7 +26,7 @@ import (
"k8s.io/klog"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -132,12 +132,15 @@ func (kl *Kubelet) tryRegisterWithAPIServer(node *v1.Node) bool {
// Zeros out extended resource capacity during reconciliation.
func (kl *Kubelet) reconcileExtendedResource(initialNode, node *v1.Node) bool {
requiresUpdate := false
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
klog.Infof("Zero out resource %s capacity in existing node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
// Check with the device manager to see if node has been recreated, in which case extended resources should be zeroed until they are available
if kl.containerManager.ShouldResetExtendedResourceCapacity() {
for k := range node.Status.Capacity {
if v1helper.IsExtendedResourceName(k) {
klog.Infof("Zero out resource %s capacity in existing node.", k)
node.Status.Capacity[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
node.Status.Allocatable[k] = *resource.NewQuantity(int64(0), resource.DecimalSI)
requiresUpdate = true
}
}
}
return requiresUpdate

View File

@ -31,7 +31,7 @@ import (
"github.com/stretchr/testify/require"
cadvisorapi "github.com/google/cadvisor/info/v1"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/resource"
@ -1737,17 +1737,21 @@ func TestUpdateDefaultLabels(t *testing.T) {
func TestReconcileExtendedResource(t *testing.T) {
testKubelet := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
testKubelet.kubelet.kubeClient = nil // ensure only the heartbeat client is used
testKubelet.kubelet.containerManager = cm.NewStubContainerManagerWithExtendedResource(true /* shouldResetExtendedResourceCapacity*/)
testKubeletNoReset := newTestKubelet(t, false /* controllerAttachDetachEnabled */)
extendedResourceName1 := v1.ResourceName("test.com/resource1")
extendedResourceName2 := v1.ResourceName("test.com/resource2")
cases := []struct {
name string
testKubelet *TestKubelet
existingNode *v1.Node
expectedNode *v1.Node
needsUpdate bool
}{
{
name: "no update needed without extended resource",
name: "no update needed without extended resource",
testKubelet: testKubelet,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
@ -1779,7 +1783,41 @@ func TestReconcileExtendedResource(t *testing.T) {
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
name: "extended resource capacity is not zeroed due to presence of checkpoint file",
testKubelet: testKubelet,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
expectedNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
Allocatable: v1.ResourceList{
v1.ResourceCPU: *resource.NewMilliQuantity(2000, resource.DecimalSI),
v1.ResourceMemory: *resource.NewQuantity(10E9, resource.BinarySI),
v1.ResourceEphemeralStorage: *resource.NewQuantity(5000, resource.BinarySI),
},
},
},
needsUpdate: false,
},
{
name: "extended resource capacity is zeroed",
testKubelet: testKubeletNoReset,
existingNode: &v1.Node{
Status: v1.NodeStatus{
Capacity: v1.ResourceList{