/* Copyright 2015 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package cache import ( "errors" "fmt" "sync" "sync/atomic" "k8s.io/klog" "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" v1helper "k8s.io/kubernetes/pkg/apis/core/v1/helper" priorityutil "k8s.io/kubernetes/pkg/scheduler/algorithm/priorities/util" ) var ( emptyResource = Resource{} generation int64 ) // ImageStateSummary provides summarized information about the state of an image. type ImageStateSummary struct { // Size of the image Size int64 // Used to track how many nodes have this image NumNodes int } // NodeInfo is node level aggregated information. type NodeInfo struct { // Overall node information. node *v1.Node pods []*v1.Pod podsWithAffinity []*v1.Pod usedPorts HostPortInfo // Total requested resource of all pods on this node. // It includes assumed pods which scheduler sends binding to apiserver but // didn't get it as scheduled yet. requestedResource *Resource nonzeroRequest *Resource // We store allocatedResources (which is Node.Status.Allocatable.*) explicitly // as int64, to avoid conversions and accessing map. allocatableResource *Resource // Cached taints of the node for faster lookup. taints []v1.Taint taintsErr error // imageStates holds the entry of an image if and only if this image is on the node. The entry can be used for // checking an image's existence and advanced usage (e.g., image locality scheduling policy) based on the image // state information. imageStates map[string]*ImageStateSummary // TransientInfo holds the information pertaining to a scheduling cycle. This will be destructed at the end of // scheduling cycle. // TODO: @ravig. Remove this once we have a clear approach for message passing across predicates and priorities. TransientInfo *TransientSchedulerInfo // Cached conditions of node for faster lookup. memoryPressureCondition v1.ConditionStatus diskPressureCondition v1.ConditionStatus pidPressureCondition v1.ConditionStatus // Whenever NodeInfo changes, generation is bumped. // This is used to avoid cloning it if the object didn't change. generation int64 } //initializeNodeTransientInfo initializes transient information pertaining to node. func initializeNodeTransientInfo() nodeTransientInfo { return nodeTransientInfo{AllocatableVolumesCount: 0, RequestedVolumes: 0} } // nextGeneration: Let's make sure history never forgets the name... // Increments the generation number monotonically ensuring that generation numbers never collide. // Collision of the generation numbers would be particularly problematic if a node was deleted and // added back with the same name. See issue#63262. func nextGeneration() int64 { return atomic.AddInt64(&generation, 1) } // nodeTransientInfo contains transient node information while scheduling. type nodeTransientInfo struct { // AllocatableVolumesCount contains number of volumes that could be attached to node. AllocatableVolumesCount int // Requested number of volumes on a particular node. RequestedVolumes int } // TransientSchedulerInfo is a transient structure which is destructed at the end of each scheduling cycle. // It consists of items that are valid for a scheduling cycle and is used for message passing across predicates and // priorities. Some examples which could be used as fields are number of volumes being used on node, current utilization // on node etc. // IMPORTANT NOTE: Make sure that each field in this structure is documented along with usage. Expand this structure // only when absolutely needed as this data structure will be created and destroyed during every scheduling cycle. type TransientSchedulerInfo struct { TransientLock sync.Mutex // NodeTransInfo holds the information related to nodeTransientInformation. NodeName is the key here. TransNodeInfo nodeTransientInfo } // NewTransientSchedulerInfo returns a new scheduler transient structure with initialized values. func NewTransientSchedulerInfo() *TransientSchedulerInfo { tsi := &TransientSchedulerInfo{ TransNodeInfo: initializeNodeTransientInfo(), } return tsi } // Resource is a collection of compute resource. type Resource struct { MilliCPU int64 Memory int64 EphemeralStorage int64 // We store allowedPodNumber (which is Node.Status.Allocatable.Pods().Value()) // explicitly as int, to avoid conversions and improve performance. AllowedPodNumber int // ScalarResources ScalarResources map[v1.ResourceName]int64 } // NewResource creates a Resource from ResourceList func NewResource(rl v1.ResourceList) *Resource { r := &Resource{} r.Add(rl) return r } // Add adds ResourceList into Resource. func (r *Resource) Add(rl v1.ResourceList) { if r == nil { return } for rName, rQuant := range rl { switch rName { case v1.ResourceCPU: r.MilliCPU += rQuant.MilliValue() case v1.ResourceMemory: r.Memory += rQuant.Value() case v1.ResourcePods: r.AllowedPodNumber += int(rQuant.Value()) case v1.ResourceEphemeralStorage: r.EphemeralStorage += rQuant.Value() default: if v1helper.IsScalarResourceName(rName) { r.AddScalar(rName, rQuant.Value()) } } } } // ResourceList returns a resource list of this resource. func (r *Resource) ResourceList() v1.ResourceList { result := v1.ResourceList{ v1.ResourceCPU: *resource.NewMilliQuantity(r.MilliCPU, resource.DecimalSI), v1.ResourceMemory: *resource.NewQuantity(r.Memory, resource.BinarySI), v1.ResourcePods: *resource.NewQuantity(int64(r.AllowedPodNumber), resource.BinarySI), v1.ResourceEphemeralStorage: *resource.NewQuantity(r.EphemeralStorage, resource.BinarySI), } for rName, rQuant := range r.ScalarResources { if v1helper.IsHugePageResourceName(rName) { result[rName] = *resource.NewQuantity(rQuant, resource.BinarySI) } else { result[rName] = *resource.NewQuantity(rQuant, resource.DecimalSI) } } return result } // Clone returns a copy of this resource. func (r *Resource) Clone() *Resource { res := &Resource{ MilliCPU: r.MilliCPU, Memory: r.Memory, AllowedPodNumber: r.AllowedPodNumber, EphemeralStorage: r.EphemeralStorage, } if r.ScalarResources != nil { res.ScalarResources = make(map[v1.ResourceName]int64) for k, v := range r.ScalarResources { res.ScalarResources[k] = v } } return res } // AddScalar adds a resource by a scalar value of this resource. func (r *Resource) AddScalar(name v1.ResourceName, quantity int64) { r.SetScalar(name, r.ScalarResources[name]+quantity) } // SetScalar sets a resource by a scalar value of this resource. func (r *Resource) SetScalar(name v1.ResourceName, quantity int64) { // Lazily allocate scalar resource map. if r.ScalarResources == nil { r.ScalarResources = map[v1.ResourceName]int64{} } r.ScalarResources[name] = quantity } // SetMaxResource compares with ResourceList and takes max value for each Resource. func (r *Resource) SetMaxResource(rl v1.ResourceList) { if r == nil { return } for rName, rQuantity := range rl { switch rName { case v1.ResourceMemory: if mem := rQuantity.Value(); mem > r.Memory { r.Memory = mem } case v1.ResourceCPU: if cpu := rQuantity.MilliValue(); cpu > r.MilliCPU { r.MilliCPU = cpu } case v1.ResourceEphemeralStorage: if ephemeralStorage := rQuantity.Value(); ephemeralStorage > r.EphemeralStorage { r.EphemeralStorage = ephemeralStorage } default: if v1helper.IsScalarResourceName(rName) { value := rQuantity.Value() if value > r.ScalarResources[rName] { r.SetScalar(rName, value) } } } } } // NewNodeInfo returns a ready to use empty NodeInfo object. // If any pods are given in arguments, their information will be aggregated in // the returned object. func NewNodeInfo(pods ...*v1.Pod) *NodeInfo { ni := &NodeInfo{ requestedResource: &Resource{}, nonzeroRequest: &Resource{}, allocatableResource: &Resource{}, TransientInfo: NewTransientSchedulerInfo(), generation: nextGeneration(), usedPorts: make(HostPortInfo), imageStates: make(map[string]*ImageStateSummary), } for _, pod := range pods { ni.AddPod(pod) } return ni } // Node returns overall information about this node. func (n *NodeInfo) Node() *v1.Node { if n == nil { return nil } return n.node } // Pods return all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) Pods() []*v1.Pod { if n == nil { return nil } return n.pods } // SetPods sets all pods scheduled (including assumed to be) on this node. func (n *NodeInfo) SetPods(pods []*v1.Pod) { n.pods = pods } // UsedPorts returns used ports on this node. func (n *NodeInfo) UsedPorts() HostPortInfo { if n == nil { return nil } return n.usedPorts } // SetUsedPorts sets the used ports on this node. func (n *NodeInfo) SetUsedPorts(newUsedPorts HostPortInfo) { n.usedPorts = newUsedPorts } // ImageStates returns the state information of all images. func (n *NodeInfo) ImageStates() map[string]*ImageStateSummary { if n == nil { return nil } return n.imageStates } // SetImageStates sets the state information of all images. func (n *NodeInfo) SetImageStates(newImageStates map[string]*ImageStateSummary) { n.imageStates = newImageStates } // PodsWithAffinity return all pods with (anti)affinity constraints on this node. func (n *NodeInfo) PodsWithAffinity() []*v1.Pod { if n == nil { return nil } return n.podsWithAffinity } // AllowedPodNumber returns the number of the allowed pods on this node. func (n *NodeInfo) AllowedPodNumber() int { if n == nil || n.allocatableResource == nil { return 0 } return n.allocatableResource.AllowedPodNumber } // Taints returns the taints list on this node. func (n *NodeInfo) Taints() ([]v1.Taint, error) { if n == nil { return nil, nil } return n.taints, n.taintsErr } // SetTaints sets the taints list on this node. func (n *NodeInfo) SetTaints(newTaints []v1.Taint) { n.taints = newTaints } // MemoryPressureCondition returns the memory pressure condition status on this node. func (n *NodeInfo) MemoryPressureCondition() v1.ConditionStatus { if n == nil { return v1.ConditionUnknown } return n.memoryPressureCondition } // DiskPressureCondition returns the disk pressure condition status on this node. func (n *NodeInfo) DiskPressureCondition() v1.ConditionStatus { if n == nil { return v1.ConditionUnknown } return n.diskPressureCondition } // PIDPressureCondition returns the pid pressure condition status on this node. func (n *NodeInfo) PIDPressureCondition() v1.ConditionStatus { if n == nil { return v1.ConditionUnknown } return n.pidPressureCondition } // RequestedResource returns aggregated resource request of pods on this node. func (n *NodeInfo) RequestedResource() Resource { if n == nil { return emptyResource } return *n.requestedResource } // SetRequestedResource sets the aggregated resource request of pods on this node. func (n *NodeInfo) SetRequestedResource(newResource *Resource) { n.requestedResource = newResource } // NonZeroRequest returns aggregated nonzero resource request of pods on this node. func (n *NodeInfo) NonZeroRequest() Resource { if n == nil { return emptyResource } return *n.nonzeroRequest } // SetNonZeroRequest sets the aggregated nonzero resource request of pods on this node. func (n *NodeInfo) SetNonZeroRequest(newResource *Resource) { n.nonzeroRequest = newResource } // AllocatableResource returns allocatable resources on a given node. func (n *NodeInfo) AllocatableResource() Resource { if n == nil { return emptyResource } return *n.allocatableResource } // SetAllocatableResource sets the allocatableResource information of given node. func (n *NodeInfo) SetAllocatableResource(allocatableResource *Resource) { n.allocatableResource = allocatableResource n.generation = nextGeneration() } // GetGeneration returns the generation on this node. func (n *NodeInfo) GetGeneration() int64 { if n == nil { return 0 } return n.generation } // SetGeneration sets the generation on this node. This is for testing only. func (n *NodeInfo) SetGeneration(newGeneration int64) { n.generation = newGeneration } // Clone returns a copy of this node. func (n *NodeInfo) Clone() *NodeInfo { clone := &NodeInfo{ node: n.node, requestedResource: n.requestedResource.Clone(), nonzeroRequest: n.nonzeroRequest.Clone(), allocatableResource: n.allocatableResource.Clone(), taintsErr: n.taintsErr, TransientInfo: n.TransientInfo, memoryPressureCondition: n.memoryPressureCondition, diskPressureCondition: n.diskPressureCondition, pidPressureCondition: n.pidPressureCondition, usedPorts: make(HostPortInfo), imageStates: n.imageStates, generation: n.generation, } if len(n.pods) > 0 { clone.pods = append([]*v1.Pod(nil), n.pods...) } if len(n.usedPorts) > 0 { // HostPortInfo is a map-in-map struct // make sure it's deep copied for ip, portMap := range n.usedPorts { clone.usedPorts[ip] = make(map[ProtocolPort]struct{}) for protocolPort, v := range portMap { clone.usedPorts[ip][protocolPort] = v } } } if len(n.podsWithAffinity) > 0 { clone.podsWithAffinity = append([]*v1.Pod(nil), n.podsWithAffinity...) } if len(n.taints) > 0 { clone.taints = append([]v1.Taint(nil), n.taints...) } return clone } // VolumeLimits returns volume limits associated with the node func (n *NodeInfo) VolumeLimits() map[v1.ResourceName]int64 { volumeLimits := map[v1.ResourceName]int64{} for k, v := range n.AllocatableResource().ScalarResources { if v1helper.IsAttachableVolumeResourceName(k) { volumeLimits[k] = v } } return volumeLimits } // String returns representation of human readable format of this NodeInfo. func (n *NodeInfo) String() string { podKeys := make([]string, len(n.pods)) for i, pod := range n.pods { podKeys[i] = pod.Name } return fmt.Sprintf("&NodeInfo{Pods:%v, RequestedResource:%#v, NonZeroRequest: %#v, UsedPort: %#v, AllocatableResource:%#v}", podKeys, n.requestedResource, n.nonzeroRequest, n.usedPorts, n.allocatableResource) } func hasPodAffinityConstraints(pod *v1.Pod) bool { affinity := pod.Spec.Affinity return affinity != nil && (affinity.PodAffinity != nil || affinity.PodAntiAffinity != nil) } // AddPod adds pod information to this NodeInfo. func (n *NodeInfo) AddPod(pod *v1.Pod) { res, non0CPU, non0Mem := calculateResource(pod) n.requestedResource.MilliCPU += res.MilliCPU n.requestedResource.Memory += res.Memory n.requestedResource.EphemeralStorage += res.EphemeralStorage if n.requestedResource.ScalarResources == nil && len(res.ScalarResources) > 0 { n.requestedResource.ScalarResources = map[v1.ResourceName]int64{} } for rName, rQuant := range res.ScalarResources { n.requestedResource.ScalarResources[rName] += rQuant } n.nonzeroRequest.MilliCPU += non0CPU n.nonzeroRequest.Memory += non0Mem n.pods = append(n.pods, pod) if hasPodAffinityConstraints(pod) { n.podsWithAffinity = append(n.podsWithAffinity, pod) } // Consume ports when pods added. n.UpdateUsedPorts(pod, true) n.generation = nextGeneration() } // RemovePod subtracts pod information from this NodeInfo. func (n *NodeInfo) RemovePod(pod *v1.Pod) error { k1, err := GetPodKey(pod) if err != nil { return err } for i := range n.podsWithAffinity { k2, err := GetPodKey(n.podsWithAffinity[i]) if err != nil { klog.Errorf("Cannot get pod key, err: %v", err) continue } if k1 == k2 { // delete the element n.podsWithAffinity[i] = n.podsWithAffinity[len(n.podsWithAffinity)-1] n.podsWithAffinity = n.podsWithAffinity[:len(n.podsWithAffinity)-1] break } } for i := range n.pods { k2, err := GetPodKey(n.pods[i]) if err != nil { klog.Errorf("Cannot get pod key, err: %v", err) continue } if k1 == k2 { // delete the element n.pods[i] = n.pods[len(n.pods)-1] n.pods = n.pods[:len(n.pods)-1] // reduce the resource data res, non0CPU, non0Mem := calculateResource(pod) n.requestedResource.MilliCPU -= res.MilliCPU n.requestedResource.Memory -= res.Memory n.requestedResource.EphemeralStorage -= res.EphemeralStorage if len(res.ScalarResources) > 0 && n.requestedResource.ScalarResources == nil { n.requestedResource.ScalarResources = map[v1.ResourceName]int64{} } for rName, rQuant := range res.ScalarResources { n.requestedResource.ScalarResources[rName] -= rQuant } n.nonzeroRequest.MilliCPU -= non0CPU n.nonzeroRequest.Memory -= non0Mem // Release ports when remove Pods. n.UpdateUsedPorts(pod, false) n.generation = nextGeneration() return nil } } return fmt.Errorf("no corresponding pod %s in pods of node %s", pod.Name, n.node.Name) } func calculateResource(pod *v1.Pod) (res Resource, non0CPU int64, non0Mem int64) { resPtr := &res for _, c := range pod.Spec.Containers { resPtr.Add(c.Resources.Requests) non0CPUReq, non0MemReq := priorityutil.GetNonzeroRequests(&c.Resources.Requests) non0CPU += non0CPUReq non0Mem += non0MemReq // No non-zero resources for GPUs or opaque resources. } return } // UpdateUsedPorts updates the UsedPorts of NodeInfo. func (n *NodeInfo) UpdateUsedPorts(pod *v1.Pod, add bool) { for j := range pod.Spec.Containers { container := &pod.Spec.Containers[j] for k := range container.Ports { podPort := &container.Ports[k] if add { n.usedPorts.Add(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) } else { n.usedPorts.Remove(podPort.HostIP, string(podPort.Protocol), podPort.HostPort) } } } } // SetNode sets the overall node information. func (n *NodeInfo) SetNode(node *v1.Node) error { n.node = node n.allocatableResource = NewResource(node.Status.Allocatable) n.taints = node.Spec.Taints for i := range node.Status.Conditions { cond := &node.Status.Conditions[i] switch cond.Type { case v1.NodeMemoryPressure: n.memoryPressureCondition = cond.Status case v1.NodeDiskPressure: n.diskPressureCondition = cond.Status case v1.NodePIDPressure: n.pidPressureCondition = cond.Status default: // We ignore other conditions. } } n.TransientInfo = NewTransientSchedulerInfo() n.generation = nextGeneration() return nil } // RemoveNode removes the overall information about the node. func (n *NodeInfo) RemoveNode(node *v1.Node) error { // We don't remove NodeInfo for because there can still be some pods on this node - // this is because notifications about pods are delivered in a different watch, // and thus can potentially be observed later, even though they happened before // node removal. This is handled correctly in cache.go file. n.node = nil n.allocatableResource = &Resource{} n.taints, n.taintsErr = nil, nil n.memoryPressureCondition = v1.ConditionUnknown n.diskPressureCondition = v1.ConditionUnknown n.pidPressureCondition = v1.ConditionUnknown n.imageStates = make(map[string]*ImageStateSummary) n.generation = nextGeneration() return nil } // FilterOutPods receives a list of pods and filters out those whose node names // are equal to the node of this NodeInfo, but are not found in the pods of this NodeInfo. // // Preemption logic simulates removal of pods on a node by removing them from the // corresponding NodeInfo. In order for the simulation to work, we call this method // on the pods returned from SchedulerCache, so that predicate functions see // only the pods that are not removed from the NodeInfo. func (n *NodeInfo) FilterOutPods(pods []*v1.Pod) []*v1.Pod { node := n.Node() if node == nil { return pods } filtered := make([]*v1.Pod, 0, len(pods)) for _, p := range pods { if p.Spec.NodeName != node.Name { filtered = append(filtered, p) continue } // If pod is on the given node, add it to 'filtered' only if it is present in nodeInfo. podKey, _ := GetPodKey(p) for _, np := range n.Pods() { npodkey, _ := GetPodKey(np) if npodkey == podKey { filtered = append(filtered, p) break } } } return filtered } // GetPodKey returns the string key of a pod. func GetPodKey(pod *v1.Pod) (string, error) { uid := string(pod.UID) if len(uid) == 0 { return "", errors.New("Cannot get cache key for pod with empty UID") } return uid, nil } // Filter implements PodFilter interface. It returns false only if the pod node name // matches NodeInfo.node and the pod is not found in the pods list. Otherwise, // returns true. func (n *NodeInfo) Filter(pod *v1.Pod) bool { if pod.Spec.NodeName != n.node.Name { return true } for _, p := range n.pods { if p.Name == pod.Name && p.Namespace == pod.Namespace { return true } } return false }