Fix default resource limits (node capacities) for downward api volumes

pull/6/head
Avesh Agarwal 2016-07-26 18:04:03 -04:00 committed by Avesh
parent 431e7ce1ab
commit 52a60fe3be
11 changed files with 127 additions and 31 deletions

View File

@ -582,3 +582,7 @@ func (adc *attachDetachController) GetHostIP() (net.IP, error) {
func (adc *attachDetachController) GetRootContext() string {
return ""
}
func (adc *attachDetachController) GetNodeAllocatable() (api.ResourceList, error) {
return api.ResourceList{}, nil
}

View File

@ -80,3 +80,7 @@ func (ctrl *PersistentVolumeController) GetHostIP() (net.IP, error) {
func (ctrl *PersistentVolumeController) GetRootContext() string {
return ""
}
func (ctrl *PersistentVolumeController) GetNodeAllocatable() (api.ResourceList, error) {
return api.ResourceList{}, nil
}

View File

@ -75,6 +75,29 @@ func ExtractResourceValueByContainerName(fs *api.ResourceFieldSelector, pod *api
return ExtractContainerResourceValue(fs, container)
}
// ExtractResourceValueByContainerNameAndNodeAllocatable extracts the value of a resource
// by providing container name and node allocatable
func ExtractResourceValueByContainerNameAndNodeAllocatable(fs *api.ResourceFieldSelector, pod *api.Pod, containerName string, nodeAllocatable api.ResourceList) (string, error) {
realContainer, err := findContainerInPod(pod, containerName)
if err != nil {
return "", err
}
containerCopy, err := api.Scheme.DeepCopy(realContainer)
if err != nil {
return "", fmt.Errorf("failed to perform a deep copy of container object: %v", err)
}
container, ok := containerCopy.(*api.Container)
if !ok {
return "", fmt.Errorf("unexpected type returned from deep copy of container object")
}
MergeContainerResourceLimits(container, nodeAllocatable)
return ExtractContainerResourceValue(fs, container)
}
// ExtractContainerResourceValue extracts the value of a resource
// in an already known container
func ExtractContainerResourceValue(fs *api.ResourceFieldSelector, container *api.Container) (string, error) {
@ -122,3 +145,19 @@ func convertResourceMemoryToString(memory *resource.Quantity, divisor resource.Q
m := int64(math.Ceil(float64(memory.Value()) / float64(divisor.Value())))
return strconv.FormatInt(m, 10), nil
}
// MergeContainerResourceLimits checks if a limit is applied for
// the container, and if not, it sets the limit to the passed resource list.
func MergeContainerResourceLimits(container *api.Container,
allocatable api.ResourceList) {
if container.Resources.Limits == nil {
container.Resources.Limits = make(api.ResourceList)
}
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := container.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := allocatable[resource]; exists {
container.Resources.Limits[resource] = *cap.Copy()
}
}
}
}

View File

@ -1637,11 +1637,7 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
}
// Wait for volumes to attach/mount
defaultedPod, _, err := kl.defaultPodLimitsForDownwardApi(pod, nil)
if err != nil {
return err
}
if err := kl.volumeManager.WaitForAttachAndMount(defaultedPod); err != nil {
if err := kl.volumeManager.WaitForAttachAndMount(pod); err != nil {
kl.recorder.Eventf(pod, api.EventTypeWarning, events.FailedMountVolume, "Unable to mount volumes for pod %q: %v", format.Pod(pod), err)
glog.Errorf("Unable to mount volumes for pod %q: %v; skipping pod", format.Pod(pod), err)
return err

View File

@ -20,15 +20,16 @@ import (
"fmt"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/fieldpath"
)
// defaultPodLimitsForDownwardApi copies the input pod, and optional container,
// and applies default resource limits. it returns a copy of the input pod,
// and a copy of the input container (if specified) with default limits
// applied. if a container has no limit specified, it will default the limit to
// the node capacity.
// the node allocatable.
// TODO: if/when we have pod level resources, we need to update this function
// to use those limits instead of node capacity.
// to use those limits instead of node allocatable.
func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.Container) (*api.Pod, *api.Container, error) {
if pod == nil {
return nil, nil, fmt.Errorf("invalid input, pod cannot be nil")
@ -38,7 +39,7 @@ func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.C
if err != nil {
return nil, nil, fmt.Errorf("failed to find node object, expected a node")
}
capacity := node.Status.Capacity
allocatable := node.Status.Allocatable
podCopy, err := api.Scheme.Copy(pod)
if err != nil {
@ -49,7 +50,7 @@ func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.C
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of pod object")
}
for idx := range outputPod.Spec.Containers {
mergeContainerResourceLimitsWithCapacity(&outputPod.Spec.Containers[idx], capacity)
fieldpath.MergeContainerResourceLimits(&outputPod.Spec.Containers[idx], allocatable)
}
var outputContainer *api.Container
@ -62,23 +63,7 @@ func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.C
if !ok {
return nil, nil, fmt.Errorf("unexpected type returned from deep copy of container object")
}
mergeContainerResourceLimitsWithCapacity(outputContainer, capacity)
fieldpath.MergeContainerResourceLimits(outputContainer, allocatable)
}
return outputPod, outputContainer, nil
}
// mergeContainerResourceLimitsWithCapacity checks if a limit is applied for
// the container, and if not, it sets the limit based on the capacity.
func mergeContainerResourceLimitsWithCapacity(container *api.Container,
capacity api.ResourceList) {
if container.Resources.Limits == nil {
container.Resources.Limits = make(api.ResourceList)
}
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := container.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := capacity[resource]; exists {
container.Resources.Limits[resource] = *cap.Copy()
}
}
}
}

View File

@ -25,6 +25,7 @@ import (
cadvisorapiv2 "github.com/google/cadvisor/info/v2"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
func TestPodResourceLimitsDefaulting(t *testing.T) {
@ -38,25 +39,37 @@ func TestPodResourceLimitsDefaulting(t *testing.T) {
}, nil)
tk.fakeCadvisor.On("ImagesFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
tk.fakeCadvisor.On("RootFsInfo").Return(cadvisorapiv2.FsInfo{}, nil)
tk.kubelet.reservation = kubetypes.Reservation{
Kubernetes: api.ResourceList{
api.ResourceCPU: resource.MustParse("3"),
api.ResourceMemory: resource.MustParse("4Gi"),
},
System: api.ResourceList{
api.ResourceCPU: resource.MustParse("1"),
api.ResourceMemory: resource.MustParse("2Gi"),
},
}
cases := []struct {
pod *api.Pod
expected *api.Pod
}{
{
pod: getPod("0", "0"),
expected: getPod("10", "10Gi"),
expected: getPod("6", "4Gi"),
},
{
pod: getPod("1", "0"),
expected: getPod("1", "10Gi"),
expected: getPod("1", "4Gi"),
},
{
pod: getPod("", ""),
expected: getPod("10", "10Gi"),
expected: getPod("6", "4Gi"),
},
{
pod: getPod("0", "1Mi"),
expected: getPod("10", "1Mi"),
expected: getPod("6", "1Mi"),
},
}
as := assert.New(t)

View File

@ -125,6 +125,14 @@ func (kvh *kubeletVolumeHost) GetHostIP() (net.IP, error) {
return kvh.kubelet.GetHostIP()
}
func (kvh *kubeletVolumeHost) GetNodeAllocatable() (api.ResourceList, error) {
node, err := kvh.kubelet.getNodeAnyWay()
if err != nil {
return nil, fmt.Errorf("error retrieving node: %v", err)
}
return node.Status.Allocatable, nil
}
func (kvh *kubeletVolumeHost) GetRootContext() string {
rootContext, err := kvh.kubelet.getRootDirContext()
if err != nil {

View File

@ -206,7 +206,10 @@ func (d *downwardAPIVolume) collectData() (map[string][]byte, error) {
}
} else if fileInfo.ResourceFieldRef != nil {
containerName := fileInfo.ResourceFieldRef.ContainerName
if values, err := fieldpath.ExtractResourceValueByContainerName(fileInfo.ResourceFieldRef, d.pod, containerName); err != nil {
nodeAllocatable, err := d.plugin.host.GetNodeAllocatable()
if err != nil {
errlist = append(errlist, err)
} else if values, err := fieldpath.ExtractResourceValueByContainerNameAndNodeAllocatable(fileInfo.ResourceFieldRef, d.pod, containerName, nodeAllocatable); err != nil {
glog.Errorf("Unable to extract field %s: %s", fileInfo.ResourceFieldRef.Resource, err.Error())
errlist = append(errlist, err)
} else {

View File

@ -214,6 +214,9 @@ type VolumeHost interface {
// mounts correctly. It will be replaced and expanded on by future
// SecurityContext work.
GetRootContext() string
// Returns node allocatable
GetNodeAllocatable() (api.ResourceList, error)
}
// VolumePluginMgr tracks registered plugins.

View File

@ -127,6 +127,10 @@ func (f *fakeVolumeHost) GetRootContext() string {
return f.rootContext
}
func (f *fakeVolumeHost) GetNodeAllocatable() (api.ResourceList, error) {
return api.ResourceList{}, nil
}
func ProbeVolumePlugins(config VolumeConfig) []VolumePlugin {
if _, ok := config.OtherAttributes["fake-property"]; ok {
return []VolumePlugin{

View File

@ -163,6 +163,20 @@ var _ = framework.KubeDescribe("Downward API volume", func() {
})
})
It("should provide node allocatable (cpu) as default cpu limit if the limit is not set", func() {
podName := "downwardapi-volume-" + string(uuid.NewUUID())
pod := downwardAPIVolumeForDefaultContainerResources(podName, "/etc/cpu_limit")
f.TestContainerOutputRegexp("downward API volume plugin", pod, 0, []string{"[1-9]"})
})
It("should provide node allocatable (memory) as default memory limit if the limit is not set", func() {
podName := "downwardapi-volume-" + string(uuid.NewUUID())
pod := downwardAPIVolumeForDefaultContainerResources(podName, "/etc/memory_limit")
f.TestContainerOutputRegexp("downward API volume plugin", pod, 0, []string{"[1-9]"})
})
})
func downwardAPIVolumePodForSimpleTest(name string, filePath string) *api.Pod {
@ -192,6 +206,12 @@ func downwardAPIVolumeForContainerResources(name string, filePath string) *api.P
return pod
}
func downwardAPIVolumeForDefaultContainerResources(name string, filePath string) *api.Pod {
pod := downwardAPIVolumeBasePod(name, nil, nil)
pod.Spec.Containers = downwardAPIVolumeDefaultBaseContainer("client-container", filePath)
return pod
}
func downwardAPIVolumeBaseContainers(name, filePath string) []api.Container {
return []api.Container{
{
@ -220,6 +240,23 @@ func downwardAPIVolumeBaseContainers(name, filePath string) []api.Container {
}
func downwardAPIVolumeDefaultBaseContainer(name, filePath string) []api.Container {
return []api.Container{
{
Name: name,
Image: "gcr.io/google_containers/mounttest:0.6",
Command: []string{"/mt", "--file_content=" + filePath},
VolumeMounts: []api.VolumeMount{
{
Name: "podinfo",
MountPath: "/etc",
},
},
},
}
}
func downwardAPIVolumePodForUpdateTest(name string, labels, annotations map[string]string, filePath string) *api.Pod {
pod := downwardAPIVolumeBasePod(name, labels, annotations)