Running resource predicate on kubelet.

Added checking on kubelet if scheduled pods do not exceed resources. Related to #5207.
pull/6/head
Jerzy Szczepkowski 2015-03-16 13:50:00 +01:00
parent cbf57ee324
commit 5845f6ad48
8 changed files with 183 additions and 62 deletions

View File

@ -32,7 +32,6 @@ import (
"time"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/validation"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
@ -47,6 +46,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/probe"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
utilErrors "github.com/GoogleCloudPlatform/kubernetes/pkg/util/errors"
@ -321,6 +321,9 @@ type Kubelet struct {
// Manager for images.
imageManager imageManager
// Cached MachineInfo returned by cadvisor.
machineInfo *cadvisorApi.MachineInfo
}
// getRootDir returns the full path to the directory under which kubelet can
@ -1477,7 +1480,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
kl.removeOrphanedStatuses(podFullNames)
// Filter out the rejected pod. They don't have running containers.
kl.handleHostPortConflicts(allPods)
kl.handleNotfittingPods(allPods)
var pods []api.Pod
for _, pod := range allPods {
status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod))
@ -1636,14 +1639,44 @@ func getHostPortConflicts(pods []api.Pod) []api.Pod {
return conflicts
}
// handleHostPortConflicts handles pods that conflict on Port.HostPort values.
func (kl *Kubelet) handleHostPortConflicts(pods []api.Pod) {
func (kl *Kubelet) getPodsExceedingCapacity(pods []api.Pod) []api.Pod {
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Error("error getting machine info: %v", err)
return []api.Pod{}
}
// Respect the pod creation order when resolving conflicts.
sort.Sort(podsByCreationTime(pods))
capacity := CapacityFromMachineInfo(info)
return scheduler.GetPodsExceedingCapacity(pods, capacity)
}
// handleNotfittingPods handles pods that do not fit on the node.
// Currently conflicts on Port.HostPort values and exceeding node capacity are handled.
func (kl *Kubelet) handleNotfittingPods(pods []api.Pod) {
conflicts := getHostPortConflicts(pods)
conflictsMap := map[types.UID]bool{}
for _, pod := range conflicts {
kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to host port conflict"})
conflictsMap[pod.UID] = true
}
remainingPods := []api.Pod{}
for _, pod := range pods {
if !conflictsMap[pod.UID] {
remainingPods = append(remainingPods, pod)
}
}
conflicts = kl.getPodsExceedingCapacity(remainingPods)
for _, pod := range conflicts {
kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.")
kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{
Phase: api.PodFailed,
Message: "Pod cannot be started due to exceeded capacity"})
}
}
@ -1867,20 +1900,13 @@ func (kl *Kubelet) tryUpdateNodeStatus() error {
// TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start
// cAdvisor locally, e.g. for test-cmd.sh, and in integration test.
info, err := kl.GetMachineInfo()
info, err := kl.GetCachedMachineInfo()
if err != nil {
glog.Error("error getting machine info: %v", err)
} else {
node.Status.NodeInfo.MachineID = info.MachineID
node.Status.NodeInfo.SystemUUID = info.SystemUUID
node.Spec.Capacity = api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
node.Spec.Capacity = CapacityFromMachineInfo(info)
}
newCondition := api.NodeCondition{
@ -2155,6 +2181,14 @@ func (kl *Kubelet) GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisor
return kl.cadvisor.ContainerInfo("/", req)
}
func (kl *Kubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
return kl.cadvisor.MachineInfo()
// GetCachedMachineInfo assumes that the machine info can't change without a reboot
func (kl *Kubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
if kl.machineInfo == nil {
info, err := kl.cadvisor.MachineInfo()
if err != nil {
return nil, err
}
kl.machineInfo = info
}
return kl.machineInfo, nil
}

View File

@ -412,6 +412,7 @@ var emptyPodUIDs map[types.UID]metrics.SyncPodType
func TestSyncPodsDoesNothing(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -454,6 +455,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
func TestSyncPodsWithTerminationLog(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -506,6 +508,7 @@ func matchString(t *testing.T, pattern, str string) bool {
func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -557,6 +560,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -604,6 +608,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -648,6 +653,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -708,6 +714,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -779,6 +786,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
ready := false
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
kubelet.sourcesReady = func() bool { return ready }
@ -823,6 +831,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
func TestSyncPodsDeletes(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
fakeDocker.ContainerList = []docker.APIContainers{
@ -1694,6 +1703,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
func TestSyncPodsWithPullPolicy(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kubelet := testKubelet.kubelet
fakeDocker := testKubelet.fakeDocker
waitGroup := testKubelet.waitGroup
@ -2758,6 +2768,8 @@ func TestGetHostPortConflicts(t *testing.T) {
func TestHandlePortConflicts(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
spec := api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}
pods := []api.Pod{
{
@ -2783,7 +2795,7 @@ func TestHandlePortConflicts(t *testing.T) {
// The newer pod should be rejected.
conflictedPodName := GetPodFullName(&pods[0])
kl.handleHostPortConflicts(pods)
kl.handleNotfittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
@ -2807,15 +2819,76 @@ func TestHandlePortConflicts(t *testing.T) {
}
}
// Tests that we handle exceeded resources correctly by setting the failed status in status map.
func TestHandleMemExceeded(t *testing.T) {
testKubelet := newTestKubelet(t)
kl := testKubelet.kubelet
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{MemoryCapacity: 100}, nil)
spec := api.PodSpec{Containers: []api.Container{{Resources: api.ResourceRequirements{
Limits: api.ResourceList{
"memory": resource.MustParse("90"),
},
}}}}
pods := []api.Pod{
{
ObjectMeta: api.ObjectMeta{
UID: "123456789",
Name: "newpod",
Namespace: "foo",
},
Spec: spec,
},
{
ObjectMeta: api.ObjectMeta{
UID: "987654321",
Name: "oldpod",
Namespace: "foo",
},
Spec: spec,
},
}
// Make sure the Pods are in the reverse order of creation time.
pods[1].CreationTimestamp = util.NewTime(time.Now())
pods[0].CreationTimestamp = util.NewTime(time.Now().Add(1 * time.Second))
// The newer pod should be rejected.
notfittingPodName := GetPodFullName(&pods[0])
kl.handleNotfittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
// Check pod status stored in the status map.
status, ok := kl.podStatuses[notfittingPodName]
if !ok {
t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
// Check if we can retrieve the pod status from GetPodStatus().
kl.pods = pods
status, err := kl.GetPodStatus(notfittingPodName, "")
if err != nil {
t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err)
}
if status.Phase != api.PodFailed {
t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase)
}
}
func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
pods := []api.Pod{
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
{Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}},
}
// Run once to populate the status map.
kl.handleHostPortConflicts(pods)
kl.handleNotfittingPods(pods)
if len(kl.podStatuses) != 1 {
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
}
@ -3064,6 +3137,7 @@ func TestCreateMirrorPod(t *testing.T) {
func TestDeleteOrphanedMirrorPods(t *testing.T) {
testKubelet := newTestKubelet(t)
testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kl := testKubelet.kubelet
manager := testKubelet.fakeMirrorManager
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}

View File

@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []
if kl.dockerPuller == nil {
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
}
kl.handleHostPortConflicts(pods)
kl.handleNotfittingPods(pods)
ch := make(chan RunPodResult)
for i := range pods {

View File

@ -24,8 +24,10 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
docker "github.com/fsouza/go-dockerclient"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
type listContainersResult struct {
@ -68,10 +70,14 @@ func (d *testDocker) InspectContainer(id string) (*docker.Container, error) {
}
func TestRunOnce(t *testing.T) {
cadvisor := &cadvisor.Mock{}
cadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil)
kb := &Kubelet{
rootDirectory: "/tmp/kubelet",
recorder: &record.FakeRecorder{},
cadvisor: cadvisor,
}
if err := kb.setupDataDirs(); err != nil {
t.Errorf("Failed to init data dirs: %v", err)
}

View File

@ -33,7 +33,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/healthz"
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
@ -50,7 +49,6 @@ import (
type Server struct {
host HostInterface
mux *http.ServeMux
machineInfo *cadvisorApi.MachineInfo
}
type TLSOptions struct {
@ -84,7 +82,7 @@ type HostInterface interface {
GetContainerInfo(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
GetDockerVersion() ([]uint, error)
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error)
GetPods() ([]api.Pod, util.StringSet)
GetPodByName(namespace, name string) (*api.Pod, bool)
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
@ -329,33 +327,14 @@ func (s *Server) handleLogs(w http.ResponseWriter, req *http.Request) {
s.host.ServeLogs(w, req)
}
// getCachedMachineInfo assumes that the machine info can't change without a reboot
func (s *Server) getCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
if s.machineInfo == nil {
info, err := s.host.GetMachineInfo()
if err != nil {
return nil, err
}
s.machineInfo = info
}
return s.machineInfo, nil
}
// handleNodeInfoVersioned handles node info requests against the Kubelet.
func (s *Server) handleNodeInfoVersioned(w http.ResponseWriter, req *http.Request) {
info, err := s.getCachedMachineInfo()
info, err := s.host.GetCachedMachineInfo()
if err != nil {
s.error(w, err)
return
}
capacity := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
capacity := CapacityFromMachineInfo(info)
data, err := json.Marshal(api.NodeInfo{
Capacity: capacity,
NodeSystemInfo: api.NodeSystemInfo{
@ -363,6 +342,7 @@ func (s *Server) handleNodeInfoVersioned(w http.ResponseWriter, req *http.Reques
SystemUUID: info.SystemUUID,
},
})
if err != nil {
s.error(w, err)
return
@ -373,7 +353,7 @@ func (s *Server) handleNodeInfoVersioned(w http.ResponseWriter, req *http.Reques
// handleSpec handles spec requests against the Kubelet.
func (s *Server) handleSpec(w http.ResponseWriter, req *http.Request) {
info, err := s.getCachedMachineInfo()
info, err := s.host.GetCachedMachineInfo()
if err != nil {
s.error(w, err)
return

View File

@ -76,7 +76,7 @@ func (fk *fakeKubelet) GetDockerVersion() ([]uint, error) {
return fk.dockerVersionFunc()
}
func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
func (fk *fakeKubelet) GetCachedMachineInfo() (*cadvisorApi.MachineInfo, error) {
return fk.machineInfoFunc()
}

View File

@ -17,10 +17,13 @@ limitations under the License.
package kubelet
import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource"
"github.com/GoogleCloudPlatform/kubernetes/pkg/capabilities"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/record"
"github.com/golang/glog"
cadvisorApi "github.com/google/cadvisor/info/v1"
)
// TODO: move this into pkg/capabilities
@ -40,3 +43,15 @@ func SetupEventSending(client *client.Client, hostname string) {
glog.Infof("Sending events to api server.")
record.StartRecording(client.Events(""))
}
func CapacityFromMachineInfo(info *cadvisorApi.MachineInfo) api.ResourceList {
c := api.ResourceList{
api.ResourceCPU: *resource.NewMilliQuantity(
int64(info.NumCores*1000),
resource.DecimalSI),
api.ResourceMemory: *resource.NewQuantity(
info.MemoryCapacity,
resource.BinarySI),
}
return c
}

View File

@ -22,7 +22,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/golang/glog"
)
type NodeInfo interface {
@ -102,6 +101,28 @@ func getResourceRequest(pod *api.Pod) resourceRequest {
return result
}
func GetPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) []api.Pod {
exceedingPods := []api.Pod{}
totalMilliCPU := capacity.Cpu().MilliValue()
totalMemory := capacity.Memory().Value()
milliCPURequested := int64(0)
memoryRequested := int64(0)
for ix := range pods {
podRequest := getResourceRequest(&pods[ix])
fitsCPU := totalMilliCPU == 0 || (totalMilliCPU-milliCPURequested) >= podRequest.milliCPU
fitsMemory := totalMemory == 0 || (totalMemory-memoryRequested) >= podRequest.memory
if !fitsCPU || !fitsMemory {
// the pod doesn't fit
exceedingPods = append(exceedingPods, pods[ix])
} else {
// the pod fits
milliCPURequested += podRequest.milliCPU
memoryRequested += podRequest.memory
}
}
return exceedingPods
}
// PodFitsResources calculates fit based on requested, rather than used resources
func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {
podRequest := getResourceRequest(&pod)
@ -113,22 +134,13 @@ func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node
if err != nil {
return false, err
}
milliCPURequested := int64(0)
memoryRequested := int64(0)
for ix := range existingPods {
existingRequest := getResourceRequest(&existingPods[ix])
milliCPURequested += existingRequest.milliCPU
memoryRequested += existingRequest.memory
pods := []api.Pod{}
copy(pods, existingPods)
pods = append(existingPods, pod)
if len(GetPodsExceedingCapacity(pods, info.Spec.Capacity)) > 0 {
return false, nil
}
totalMilliCPU := info.Spec.Capacity.Cpu().MilliValue()
totalMemory := info.Spec.Capacity.Memory().Value()
fitsCPU := totalMilliCPU == 0 || (totalMilliCPU-milliCPURequested) >= podRequest.milliCPU
fitsMemory := totalMemory == 0 || (totalMemory-memoryRequested) >= podRequest.memory
glog.V(3).Infof("Calculated fit: cpu: %v, memory %v", fitsCPU, fitsMemory)
return fitsCPU && fitsMemory, nil
return true, nil
}
func NewResourceFitPredicate(info NodeInfo) FitPredicate {