Merge pull request #27303 from derekwaynecarr/downward_api_node_defaults

Automatic merge from submit-queue

Downward api node defaults

Fixes #27062

Replaces https://github.com/kubernetes/kubernetes/pull/27107
pull/6/head
k8s-merge-robot 2016-06-14 22:53:03 -07:00 committed by GitHub
commit 922facd652
6 changed files with 363 additions and 3 deletions

View File

@ -20,7 +20,10 @@ import (
"strings" "strings"
"testing" "testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
) )
func TestExtractFieldPathAsString(t *testing.T) { func TestExtractFieldPathAsString(t *testing.T) {
@ -115,3 +118,119 @@ func TestExtractFieldPathAsString(t *testing.T) {
} }
} }
} }
func getPod(cname, cpuRequest, cpuLimit, memoryRequest, memoryLimit string) *api.Pod {
resources := api.ResourceRequirements{
Limits: make(api.ResourceList),
Requests: make(api.ResourceList),
}
if cpuLimit != "" {
resources.Limits[api.ResourceCPU] = resource.MustParse(cpuLimit)
}
if memoryLimit != "" {
resources.Limits[api.ResourceMemory] = resource.MustParse(memoryLimit)
}
if cpuRequest != "" {
resources.Requests[api.ResourceCPU] = resource.MustParse(cpuRequest)
}
if memoryRequest != "" {
resources.Requests[api.ResourceMemory] = resource.MustParse(memoryRequest)
}
return &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: cname,
Resources: resources,
},
},
},
}
}
func TestExtractResourceValue(t *testing.T) {
cases := []struct {
fs *api.ResourceFieldSelector
pod *api.Pod
cName string
expectedValue string
expectedError error
}{
{
fs: &api.ResourceFieldSelector{
Resource: "limits.cpu",
},
cName: "foo",
pod: getPod("foo", "", "9", "", ""),
expectedValue: "9",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "", "", "", ""),
expectedValue: "0",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "8", "", "", ""),
expectedValue: "8",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
},
cName: "foo",
pod: getPod("foo", "100m", "", "", ""),
expectedValue: "1",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.cpu",
Divisor: resource.MustParse("100m"),
},
cName: "foo",
pod: getPod("foo", "1200m", "", "", ""),
expectedValue: "12",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.memory",
},
cName: "foo",
pod: getPod("foo", "", "", "100Mi", ""),
expectedValue: "104857600",
},
{
fs: &api.ResourceFieldSelector{
Resource: "requests.memory",
Divisor: resource.MustParse("1Mi"),
},
cName: "foo",
pod: getPod("foo", "", "", "100Mi", "1Gi"),
expectedValue: "100",
},
{
fs: &api.ResourceFieldSelector{
Resource: "limits.memory",
},
cName: "foo",
pod: getPod("foo", "", "", "10Mi", "100Mi"),
expectedValue: "104857600",
},
}
as := assert.New(t)
for idx, tc := range cases {
actual, err := ExtractResourceValueByContainerName(tc.fs, tc.pod, tc.cName)
if tc.expectedError != nil {
as.Equal(tc.expectedError, err, "expected test case [%d] to fail with error %v; got %v", idx, tc.expectedError, err)
} else {
as.Nil(err, "expected test case [%d] to not return an error; got %v", idx, err)
as.Equal(tc.expectedValue, actual, "expected test case [%d] to return %q; got %q instead", idx, tc.expectedValue, actual)
}
}
}

View File

@ -831,6 +831,9 @@ type Kubelet struct {
// should manage attachment/detachment of volumes scheduled to this node, // should manage attachment/detachment of volumes scheduled to this node,
// and disable kubelet from executing any attach/detach operations // and disable kubelet from executing any attach/detach operations
enableControllerAttachDetach bool enableControllerAttachDetach bool
// lastUpdatedNodeObject is a cached version of the node as last reported back to the api server.
lastUpdatedNodeObject atomic.Value
} }
// Validate given node IP belongs to the current host // Validate given node IP belongs to the current host
@ -1143,6 +1146,10 @@ func (kl *Kubelet) registerWithApiserver() {
glog.Errorf("Unable to construct api.Node object for kubelet: %v", err) glog.Errorf("Unable to construct api.Node object for kubelet: %v", err)
continue continue
} }
// Cache the node object.
kl.lastUpdatedNodeObject.Store(node)
glog.V(2).Infof("Attempting to register node %s", node.Name) glog.V(2).Infof("Attempting to register node %s", node.Name)
if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil { if _, err := kl.kubeClient.Core().Nodes().Create(node); err != nil {
if !apierrors.IsAlreadyExists(err) { if !apierrors.IsAlreadyExists(err) {
@ -1554,7 +1561,11 @@ func (kl *Kubelet) makeEnvironmentVariables(pod *api.Pod, container *api.Contain
return result, err return result, err
} }
case envVar.ValueFrom.ResourceFieldRef != nil: case envVar.ValueFrom.ResourceFieldRef != nil:
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, pod, container) defaultedPod, defaultedContainer, err := kl.defaultPodLimitsForDownwardApi(pod, container)
if err != nil {
return result, err
}
runtimeVal, err = containerResourceRuntimeValue(envVar.ValueFrom.ResourceFieldRef, defaultedPod, defaultedContainer)
if err != nil { if err != nil {
return result, err return result, err
} }
@ -1894,7 +1905,12 @@ func (kl *Kubelet) syncPod(o syncPodOptions) error {
} }
// Mount volumes and update the volume manager // Mount volumes and update the volume manager
podVolumes, err := kl.mountExternalVolumes(pod) // Default limits for containers here to have downward API expose user-friendly limits to pods.
defaultedPod, _, err := kl.defaultPodLimitsForDownwardApi(pod, nil)
if err != nil {
return err
}
podVolumes, err := kl.mountExternalVolumes(defaultedPod)
if err != nil { if err != nil {
ref, errGetRef := api.GetReference(pod) ref, errGetRef := api.GetReference(pod)
if errGetRef == nil && ref != nil { if errGetRef == nil && ref != nil {
@ -3507,6 +3523,10 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
} }
// Update the current status on the API server // Update the current status on the API server
_, err = kl.kubeClient.Core().Nodes().UpdateStatus(node) _, err = kl.kubeClient.Core().Nodes().UpdateStatus(node)
if err == nil {
// store recently updated node information.
kl.lastUpdatedNodeObject.Store(node)
}
return err return err
} }

View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"fmt"
"k8s.io/kubernetes/pkg/api"
)
// defaultPodLimitsForDownwardApi copies the input pod, and optional container,
// and applies default resource limits. it returns a copy of the input pod,
// and a copy of the input container (if specified) with default limits
// applied. if a container has no limit specified, it will default the limit to
// the node capacity.
// TODO: if/when we have pod level resources, we need to update this function
// to use those limits instead of node capacity.
func (kl *Kubelet) defaultPodLimitsForDownwardApi(pod *api.Pod, container *api.Container) (*api.Pod, *api.Container, error) {
if pod == nil {
return nil, nil, fmt.Errorf("invalid input, pod cannot be nil")
}
lastUpdatedNodeObject := kl.lastUpdatedNodeObject.Load()
if lastUpdatedNodeObject == nil {
return nil, nil, fmt.Errorf("failed to find node object in cache, expected a non-nil object in the cache.")
}
capacity := lastUpdatedNodeObject.(*api.Node).Status.Capacity
podCopy, err := api.Scheme.Copy(pod)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of pod object: %v", err)
}
outputPod, ok := podCopy.(*api.Pod)
if !ok {
return nil, nil, fmt.Errorf("unexpected type")
}
for idx := range outputPod.Spec.Containers {
mergeContainerResourceLimitsWithCapacity(&outputPod.Spec.Containers[idx], capacity)
}
var outputContainer *api.Container
if container != nil {
containerCopy, err := api.Scheme.DeepCopy(container)
if err != nil {
return nil, nil, fmt.Errorf("failed to perform a deep copy of container object: %v", err)
}
outputContainer, ok = containerCopy.(*api.Container)
if !ok {
return nil, nil, fmt.Errorf("unexpected type")
}
mergeContainerResourceLimitsWithCapacity(outputContainer, capacity)
}
return outputPod, outputContainer, nil
}
// mergeContainerResourceLimitsWithCapacity checks if a limit is applied for
// the container, and if not, it sets the limit based on the capacity.
func mergeContainerResourceLimitsWithCapacity(container *api.Container,
capacity api.ResourceList) {
if container.Resources.Limits == nil {
container.Resources.Limits = make(api.ResourceList)
}
for _, resource := range []api.ResourceName{api.ResourceCPU, api.ResourceMemory} {
if quantity, exists := container.Resources.Limits[resource]; !exists || quantity.IsZero() {
if cap, exists := capacity[resource]; exists {
container.Resources.Limits[resource] = *cap.Copy()
}
}
}
}

View File

@ -0,0 +1,89 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package kubelet
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/resource"
)
func TestPodResourceLimitsDefaulting(t *testing.T) {
tk := newTestKubelet(t)
node := &api.Node{
Status: api.NodeStatus{
Capacity: api.ResourceList{
api.ResourceCPU: resource.MustParse("10"),
api.ResourceMemory: resource.MustParse("10Gi"),
},
},
}
tk.kubelet.lastUpdatedNodeObject.Store(node)
cases := []struct {
pod *api.Pod
expected *api.Pod
}{
{
pod: getPod("0", "0"),
expected: getPod("10", "10Gi"),
},
{
pod: getPod("1", "0"),
expected: getPod("1", "10Gi"),
},
{
pod: getPod("", ""),
expected: getPod("10", "10Gi"),
},
{
pod: getPod("0", "1Mi"),
expected: getPod("10", "1Mi"),
},
}
as := assert.New(t)
for idx, tc := range cases {
actual, _, err := tk.kubelet.defaultPodLimitsForDownwardApi(tc.pod, nil)
as.Nil(err, "failed to default pod limits: %v", err)
as.Equal(tc.expected, actual, "test case [%d] failed. Expected: %+v, Got: %+v", idx, tc.expected, actual)
}
}
func getPod(cpuLimit, memoryLimit string) *api.Pod {
resources := api.ResourceRequirements{}
if cpuLimit != "" || memoryLimit != "" {
resources.Limits = make(api.ResourceList)
}
if cpuLimit != "" {
resources.Limits[api.ResourceCPU] = resource.MustParse(cpuLimit)
}
if memoryLimit != "" {
resources.Limits[api.ResourceMemory] = resource.MustParse(memoryLimit)
}
return &api.Pod{
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "foo",
Resources: resources,
},
},
},
}
}

View File

@ -278,7 +278,7 @@ func newTestKubeletWithImageList(t *testing.T, imageList []kubecontainer.Image)
} }
kubelet.evictionManager = evictionManager kubelet.evictionManager = evictionManager
kubelet.AddPodAdmitHandler(evictionAdmitHandler) kubelet.AddPodAdmitHandler(evictionAdmitHandler)
kubelet.lastUpdatedNodeObject.Store(&api.Node{})
return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil} return &TestKubelet{kubelet, fakeRuntime, mockCadvisor, fakeKubeClient, fakeMirrorClient, fakeClock, nil}
} }

View File

@ -128,6 +128,50 @@ var _ = framework.KubeDescribe("Downward API", func() {
testDownwardAPI(f, podName, env, expectations) testDownwardAPI(f, podName, env, expectations)
}) })
It("should provide default limits.cpu/memory from node capacity", func() {
podName := "downward-api-" + string(util.NewUUID())
env := []api.EnvVar{
{
Name: "CPU_LIMIT",
ValueFrom: &api.EnvVarSource{
ResourceFieldRef: &api.ResourceFieldSelector{
Resource: "limits.cpu",
},
},
},
{
Name: "MEMORY_LIMIT",
ValueFrom: &api.EnvVarSource{
ResourceFieldRef: &api.ResourceFieldSelector{
Resource: "limits.memory",
},
},
},
}
expectations := []string{
fmt.Sprintf("CPU_LIMIT=[1-9]"),
fmt.Sprintf("MEMORY_LIMIT=[1-9]"),
}
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: podName,
Labels: map[string]string{"name": podName},
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "dapi-container",
Image: "gcr.io/google_containers/busybox:1.24",
Command: []string{"sh", "-c", "env"},
Env: env,
},
},
RestartPolicy: api.RestartPolicyNever,
},
}
testDownwardAPIUsingPod(f, pod, env, expectations)
})
}) })
func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, expectations []string) { func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, expectations []string) {
@ -159,5 +203,9 @@ func testDownwardAPI(f *framework.Framework, podName string, env []api.EnvVar, e
}, },
} }
testDownwardAPIUsingPod(f, pod, env, expectations)
}
func testDownwardAPIUsingPod(f *framework.Framework, pod *api.Pod, env []api.EnvVar, expectations []string) {
f.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations) f.TestContainerOutputRegexp("downward api env vars", pod, 0, expectations)
} }