diff --git a/pkg/client/fake_minions.go b/pkg/client/fake_minions.go index d3e195673d..57593edbf0 100644 --- a/pkg/client/fake_minions.go +++ b/pkg/client/fake_minions.go @@ -19,6 +19,9 @@ package client import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // FakeNodes implements MinionInterface. Meant to be embedded into a struct to get a default @@ -56,3 +59,8 @@ func (c *FakeNodes) Update(minion *api.Node) (*api.Node, error) { c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "update-minion", Value: minion}) return &api.Node{}, nil } + +func (c *FakeNodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + c.Fake.Actions = append(c.Fake.Actions, FakeAction{Action: "watch-minions", Value: resourceVersion}) + return c.Fake.Watch, c.Fake.Err +} diff --git a/pkg/client/minions.go b/pkg/client/minions.go index 760296761b..eb25744409 100644 --- a/pkg/client/minions.go +++ b/pkg/client/minions.go @@ -21,6 +21,9 @@ import ( "fmt" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) type NodesInterface interface { @@ -33,6 +36,7 @@ type NodeInterface interface { List() (*api.NodeList, error) Delete(name string) error Update(*api.Node) (*api.Node, error) + Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) } // nodes implements NodesInterface @@ -94,3 +98,15 @@ func (c *nodes) Update(minion *api.Node) (*api.Node, error) { err := c.r.Put().Resource(c.resourceName()).Name(minion.Name).Body(minion).Do().Into(result) return result, err } + +// Watch returns a watch.Interface that watches the requested nodes. +func (c *nodes) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return c.r.Get(). + Prefix("watch"). + Namespace(api.NamespaceAll). + Resource(c.resourceName()). + Param("resourceVersion", resourceVersion). + LabelsSelectorParam(api.LabelSelectorQueryParam(c.r.APIVersion()), label). + FieldsSelectorParam(api.FieldSelectorQueryParam(c.r.APIVersion()), field). + Watch() +} diff --git a/pkg/cloudprovider/controller/nodecontroller_test.go b/pkg/cloudprovider/controller/nodecontroller_test.go index 899e71c055..e46ca1ad68 100644 --- a/pkg/cloudprovider/controller/nodecontroller_test.go +++ b/pkg/cloudprovider/controller/nodecontroller_test.go @@ -30,8 +30,11 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/api/resource" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" fake_cloud "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/fake" + "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/probe" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // FakeNodeHandler is a fake implementation of NodesInterface and NodeInterface. It @@ -114,6 +117,10 @@ func (m *FakeNodeHandler) Update(node *api.Node) (*api.Node, error) { return node, nil } +func (m *FakeNodeHandler) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return nil, nil +} + // FakeKubeletClient is a fake implementation of KubeletClient. type FakeKubeletClient struct { Status probe.Result diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 9d2cf33a92..c9d1412636 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -170,6 +170,27 @@ func NewMainKubelet( } serviceLister := &cache.StoreToServiceLister{serviceStore} + serviceStore = cache.NewStore(cache.MetaNamespaceKeyFunc) + if kubeClient != nil { + // TODO: cache.NewListWatchFromClient is limited as it takes a client implementation rather + // than an interface. There is no way to construct a list+watcher using resource name. + listWatch := &cache.ListWatch{ + // TODO: currently, we are watching all nodes. To make it more efficient, + // we should be watching only a node with Name equal to kubelet's Hostname. + // To make it possible, we need to add field selector to ListFunc and WatchFunc, + // and selection by field needs to be implemented in WatchMinions function in pkg/registry/etcd. + ListFunc: func() (runtime.Object, error) { + return kubeClient.Nodes().List() + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return kubeClient.Nodes().Watch( + labels.Everything(), fields.Everything(), resourceVersion) + }, + } + cache.NewReflector(listWatch, &api.Service{}, serviceStore, 0).Run() + } + nodeLister := &cache.StoreToNodeLister{serviceStore} + containerGC, err := newContainerGC(dockerClient, containerGCPolicy) if err != nil { return nil, err @@ -197,6 +218,7 @@ func NewMainKubelet( clusterDomain: clusterDomain, clusterDNS: clusterDNS, serviceLister: serviceLister, + nodeLister: nodeLister, masterServiceNamespace: masterServiceNamespace, prober: newProbeHolder(), readiness: newReadinessStates(), @@ -244,6 +266,11 @@ type serviceLister interface { List() (api.ServiceList, error) } +type nodeLister interface { + List() (machines api.NodeList, err error) + GetNodeInfo(id string) (*api.Node, error) +} + // Kubelet is the main kubelet implementation. type Kubelet struct { hostname string @@ -306,6 +333,7 @@ type Kubelet struct { masterServiceNamespace string serviceLister serviceLister + nodeLister nodeLister // Volume plugins. volumePluginMgr volume.VolumePluginMgr @@ -477,6 +505,20 @@ func (kl *Kubelet) listPodsFromDisk() ([]types.UID, error) { return pods, nil } +func (kl *Kubelet) GetNode() (*api.Node, error) { + l, err := kl.nodeLister.List() + if err != nil { + return nil, errors.New("cannot list nodes") + } + host := kl.GetHostname() + for _, n := range l.Items { + if n.Name == host { + return &n, nil + } + } + return nil, fmt.Errorf("node %v not found", host) +} + // Starts garbage collection theads. func (kl *Kubelet) StartGarbageCollection() { go util.Forever(func() { @@ -1501,7 +1543,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric kl.removeOrphanedStatuses(podFullNames) // Filter out the rejected pod. They don't have running containers. - kl.handleNotfittingPods(allPods) + kl.handleNotFittingPods(allPods) var pods []api.Pod for _, pod := range allPods { status, ok := kl.getPodStatusFromCache(GetPodFullName(&pod)) @@ -1647,9 +1689,8 @@ func (s podsByCreationTime) Less(i, j int) bool { return s[i].CreationTimestamp.Before(s[j].CreationTimestamp) } -// getHostPortConflicts detects pods with conflicted host ports and return them. -func getHostPortConflicts(pods []api.Pod) []api.Pod { - conflicts := []api.Pod{} +// checkHostPortConflicts detects pods with conflicted host ports. +func checkHostPortConflicts(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) { ports := map[int]bool{} extract := func(p *api.ContainerPort) int { return p.HostPort } @@ -1660,48 +1701,65 @@ func getHostPortConflicts(pods []api.Pod) []api.Pod { pod := &pods[i] if errs := validation.AccumulateUniquePorts(pod.Spec.Containers, ports, extract); len(errs) != 0 { glog.Errorf("Pod %q: HostPort is already allocated, ignoring: %v", GetPodFullName(pod), errs) - conflicts = append(conflicts, *pod) + notFitting = append(notFitting, *pod) continue } + fitting = append(fitting, *pod) } - - return conflicts + return } -func (kl *Kubelet) getPodsExceedingCapacity(pods []api.Pod) []api.Pod { +// checkCapacityExceeded detects pods that exceeds node's resources. +func (kl *Kubelet) checkCapacityExceeded(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) { info, err := kl.GetCachedMachineInfo() if err != nil { glog.Error("error getting machine info: %v", err) - return []api.Pod{} + return pods, []api.Pod{} } // Respect the pod creation order when resolving conflicts. sort.Sort(podsByCreationTime(pods)) capacity := CapacityFromMachineInfo(info) - return scheduler.GetPodsExceedingCapacity(pods, capacity) + return scheduler.CheckPodsExceedingCapacity(pods, capacity) +} + +// checkNodeSelectorMatching detects pods that do not match node's labels. +func (kl *Kubelet) checkNodeSelectorMatching(pods []api.Pod) (fitting []api.Pod, notFitting []api.Pod) { + node, err := kl.GetNode() + if err != nil { + glog.Errorf("error getting node: %v", err) + return pods, []api.Pod{} + } + for _, pod := range pods { + if !scheduler.PodMatchesNodeLabels(&pod, node) { + notFitting = append(notFitting, pod) + continue + } + fitting = append(fitting, pod) + } + return } // handleNotfittingPods handles pods that do not fit on the node. -// Currently conflicts on Port.HostPort values and exceeding node capacity are handled. -func (kl *Kubelet) handleNotfittingPods(pods []api.Pod) { - conflicts := getHostPortConflicts(pods) - conflictsMap := map[types.UID]bool{} - for _, pod := range conflicts { +// Currently conflicts on Port.HostPort values, matching node's labels and exceeding node's capacity are handled. +func (kl *Kubelet) handleNotFittingPods(pods []api.Pod) { + fitting, notFitting := checkHostPortConflicts(pods) + for _, pod := range notFitting { kl.recorder.Eventf(&pod, "hostPortConflict", "Cannot start the pod due to host port conflict.") kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ Phase: api.PodFailed, Message: "Pod cannot be started due to host port conflict"}) - conflictsMap[pod.UID] = true } - remainingPods := []api.Pod{} - for _, pod := range pods { - if !conflictsMap[pod.UID] { - remainingPods = append(remainingPods, pod) - } + fitting, notFitting = kl.checkNodeSelectorMatching(fitting) + for _, pod := range notFitting { + kl.recorder.Eventf(&pod, "nodeSelectorMismatching", "Cannot start the pod due to node selector mismatch.") + kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ + Phase: api.PodFailed, + Message: "Pod cannot be started due to node selector mismatch"}) } - conflicts = kl.getPodsExceedingCapacity(remainingPods) - for _, pod := range conflicts { + fitting, notFitting = kl.checkCapacityExceeded(fitting) + for _, pod := range notFitting { kl.recorder.Eventf(&pod, "capacityExceeded", "Cannot start the pod due to exceeded capacity.") kl.setPodStatusInCache(GetPodFullName(&pod), api.PodStatus{ Phase: api.PodFailed, diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 7d19852656..1e179f474c 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -18,6 +18,7 @@ package kubelet import ( "bytes" + "errors" "fmt" "io" "io/ioutil" @@ -95,6 +96,7 @@ func newTestKubelet(t *testing.T) *TestKubelet { kubelet.sourcesReady = func() bool { return true } kubelet.masterServiceNamespace = api.NamespaceDefault kubelet.serviceLister = testServiceLister{} + kubelet.nodeLister = testNodeLister{} kubelet.readiness = newReadinessStates() kubelet.recorder = fakeRecorder kubelet.podStatuses = map[string]api.PodStatus{} @@ -1812,6 +1814,20 @@ func (ls testServiceLister) List() (api.ServiceList, error) { }, nil } +type testNodeLister struct { + nodes []api.Node +} + +func (ls testNodeLister) GetNodeInfo(id string) (*api.Node, error) { + return nil, errors.New("not implemented") +} + +func (ls testNodeLister) List() (api.NodeList, error) { + return api.NodeList{ + Items: ls.nodes, + }, nil +} + func TestMakeEnvironmentVariables(t *testing.T) { services := []api.Service{ { @@ -2792,7 +2808,7 @@ func TestGetHostPortConflicts(t *testing.T) { {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 83}}}}}}, } // Pods should not cause any conflict. - conflicts := getHostPortConflicts(pods) + _, conflicts := checkHostPortConflicts(pods) if len(conflicts) != 0 { t.Errorf("expected no conflicts, Got %#v", conflicts) } @@ -2802,7 +2818,7 @@ func TestGetHostPortConflicts(t *testing.T) { Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 81}}}}}, } pods = append(pods, expected) - if actual := getHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.Pod{expected}) { + if _, actual := checkHostPortConflicts(pods); !reflect.DeepEqual(actual, []api.Pod{expected}) { t.Errorf("expected %#v, Got %#v", expected, actual) } } @@ -2838,7 +2854,7 @@ func TestHandlePortConflicts(t *testing.T) { // The newer pod should be rejected. conflictedPodName := GetPodFullName(&pods[0]) - kl.handleNotfittingPods(pods) + kl.handleNotFittingPods(pods) if len(kl.podStatuses) != 1 { t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } @@ -2862,6 +2878,59 @@ func TestHandlePortConflicts(t *testing.T) { } } +// Tests that we handle not matching labels selector correctly by setting the failed status in status map. +func TestHandleNodeSelector(t *testing.T) { + testKubelet := newTestKubelet(t) + kl := testKubelet.kubelet + kl.nodeLister = testNodeLister{nodes: []api.Node{ + {ObjectMeta: api.ObjectMeta{Name: "testnode", Labels: map[string]string{"key": "B"}}}, + }} + testKubelet.fakeCadvisor.On("MachineInfo").Return(&cadvisorApi.MachineInfo{}, nil) + pods := []api.Pod{ + { + ObjectMeta: api.ObjectMeta{ + UID: "123456789", + Name: "podA", + Namespace: "foo", + }, + Spec: api.PodSpec{NodeSelector: map[string]string{"key": "A"}}, + }, + { + ObjectMeta: api.ObjectMeta{ + UID: "987654321", + Name: "podB", + Namespace: "foo", + }, + Spec: api.PodSpec{NodeSelector: map[string]string{"key": "B"}}, + }, + } + // The first pod should be rejected. + notfittingPodName := GetPodFullName(&pods[0]) + + kl.handleNotFittingPods(pods) + if len(kl.podStatuses) != 1 { + t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) + } + // Check pod status stored in the status map. + status, ok := kl.podStatuses[notfittingPodName] + if !ok { + t.Fatalf("status of pod %q is not found in the status map.", notfittingPodName) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } + + // Check if we can retrieve the pod status from GetPodStatus(). + kl.pods = pods + status, err := kl.GetPodStatus(notfittingPodName, "") + if err != nil { + t.Fatalf("unable to retrieve pod status for pod %q: #v.", notfittingPodName, err) + } + if status.Phase != api.PodFailed { + t.Fatalf("expected pod status %q. Got %q.", api.PodFailed, status.Phase) + } +} + // Tests that we handle exceeded resources correctly by setting the failed status in status map. func TestHandleMemExceeded(t *testing.T) { testKubelet := newTestKubelet(t) @@ -2897,7 +2966,7 @@ func TestHandleMemExceeded(t *testing.T) { // The newer pod should be rejected. notfittingPodName := GetPodFullName(&pods[0]) - kl.handleNotfittingPods(pods) + kl.handleNotFittingPods(pods) if len(kl.podStatuses) != 1 { t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } @@ -2931,7 +3000,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) { {Spec: api.PodSpec{Containers: []api.Container{{Ports: []api.ContainerPort{{HostPort: 80}}}}}}, } // Run once to populate the status map. - kl.handleNotfittingPods(pods) + kl.handleNotFittingPods(pods) if len(kl.podStatuses) != 1 { t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses) } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go index 7f7fb8afb8..9149d4cdc6 100644 --- a/pkg/kubelet/runonce.go +++ b/pkg/kubelet/runonce.go @@ -55,7 +55,7 @@ func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results [] if kl.dockerPuller == nil { kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) } - kl.handleNotfittingPods(pods) + kl.handleNotFittingPods(pods) ch := make(chan RunPodResult) for i := range pods { diff --git a/pkg/kubelet/runonce_test.go b/pkg/kubelet/runonce_test.go index b7a6941525..0d50a9a80c 100644 --- a/pkg/kubelet/runonce_test.go +++ b/pkg/kubelet/runonce_test.go @@ -78,6 +78,7 @@ func TestRunOnce(t *testing.T) { recorder: &record.FakeRecorder{}, cadvisor: cadvisor, podStatuses: make(map[string]api.PodStatus), + nodeLister: testNodeLister{}, } kb.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil)) diff --git a/pkg/master/rest_to_nodes.go b/pkg/master/rest_to_nodes.go index 08e0b27fde..c6af8d7bba 100644 --- a/pkg/master/rest_to_nodes.go +++ b/pkg/master/rest_to_nodes.go @@ -24,6 +24,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/fields" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) // RESTStorageToNodes will take a RESTStorage object and return a client interface @@ -92,3 +93,8 @@ func (n *nodeAdaptor) Delete(name string) error { func (n *nodeAdaptor) Update(minion *api.Node) (*api.Node, error) { return nil, errors.New("direct update not implemented") } + +// Watch watches for nodes. +func (n *nodeAdaptor) Watch(label labels.Selector, field fields.Selector, resourceVersion string) (watch.Interface, error) { + return nil, errors.New("direct watch not implemented") +} diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index 43cbd74c9b..b902714c57 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -101,8 +101,7 @@ func getResourceRequest(pod *api.Pod) resourceRequest { return result } -func GetPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) []api.Pod { - exceedingPods := []api.Pod{} +func CheckPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) (fitting []api.Pod, notFitting []api.Pod) { totalMilliCPU := capacity.Cpu().MilliValue() totalMemory := capacity.Memory().Value() milliCPURequested := int64(0) @@ -113,14 +112,15 @@ func GetPodsExceedingCapacity(pods []api.Pod, capacity api.ResourceList) []api.P fitsMemory := totalMemory == 0 || (totalMemory-memoryRequested) >= podRequest.memory if !fitsCPU || !fitsMemory { // the pod doesn't fit - exceedingPods = append(exceedingPods, pods[ix]) - } else { - // the pod fits - milliCPURequested += podRequest.milliCPU - memoryRequested += podRequest.memory + notFitting = append(notFitting, pods[ix]) + continue } + // the pod fits + milliCPURequested += podRequest.milliCPU + memoryRequested += podRequest.memory + fitting = append(fitting, pods[ix]) } - return exceedingPods + return } // PodFitsResources calculates fit based on requested, rather than used resources @@ -137,7 +137,8 @@ func (r *ResourceFit) PodFitsResources(pod api.Pod, existingPods []api.Pod, node pods := []api.Pod{} copy(pods, existingPods) pods = append(existingPods, pod) - if len(GetPodsExceedingCapacity(pods, info.Spec.Capacity)) > 0 { + _, exceeding := CheckPodsExceedingCapacity(pods, info.Spec.Capacity) + if len(exceeding) > 0 { return false, nil } return true, nil @@ -157,20 +158,24 @@ func NewSelectorMatchPredicate(info NodeInfo) FitPredicate { return selector.PodSelectorMatches } +func PodMatchesNodeLabels(pod *api.Pod, node *api.Node) bool { + if len(pod.Spec.NodeSelector) == 0 { + return true + } + selector := labels.SelectorFromSet(pod.Spec.NodeSelector) + return selector.Matches(labels.Set(node.Labels)) +} + type NodeSelector struct { info NodeInfo } func (n *NodeSelector) PodSelectorMatches(pod api.Pod, existingPods []api.Pod, node string) (bool, error) { - if len(pod.Spec.NodeSelector) == 0 { - return true, nil - } - selector := labels.SelectorFromSet(pod.Spec.NodeSelector) minion, err := n.info.GetNodeInfo(node) if err != nil { return false, err } - return selector.Matches(labels.Set(minion.Labels)), nil + return PodMatchesNodeLabels(&pod, minion), nil } func PodFitsHost(pod api.Pod, existingPods []api.Pod, node string) (bool, error) {