2017-08-22 18:12:24 +00:00
/ *
Copyright 2017 The Kubernetes Authors .
Licensed under the Apache License , Version 2.0 ( the "License" ) ;
you may not use this file except in compliance with the License .
You may obtain a copy of the License at
http : //www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing , software
distributed under the License is distributed on an "AS IS" BASIS ,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND , either express or implied .
See the License for the specific language governing permissions and
limitations under the License .
* /
package cpumanager
import (
2017-09-01 17:46:39 +00:00
"fmt"
2017-08-23 06:49:30 +00:00
"math"
2017-09-01 17:46:39 +00:00
"sync"
"time"
"github.com/golang/glog"
cadvisorapi "github.com/google/cadvisor/info/v1"
2017-08-22 18:12:24 +00:00
"k8s.io/api/core/v1"
2017-09-01 17:46:39 +00:00
"k8s.io/apimachinery/pkg/util/wait"
2017-08-22 18:12:24 +00:00
runtimeapi "k8s.io/kubernetes/pkg/kubelet/apis/cri/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/state"
2017-08-23 06:49:30 +00:00
"k8s.io/kubernetes/pkg/kubelet/cm/cpumanager/topology"
2017-09-01 17:46:39 +00:00
"k8s.io/kubernetes/pkg/kubelet/cm/cpuset"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
2017-08-22 18:12:24 +00:00
"k8s.io/kubernetes/pkg/kubelet/status"
)
// ActivePodsFunc is a function that returns a list of pods to reconcile.
type ActivePodsFunc func ( ) [ ] * v1 . Pod
type runtimeService interface {
UpdateContainerResources ( id string , resources * runtimeapi . LinuxContainerResources ) error
}
type policyName string
// Manager interface provides methods for Kubelet to manage pod cpus.
type Manager interface {
// Start is called during Kubelet initialization.
Start ( activePods ActivePodsFunc , podStatusProvider status . PodStatusProvider , containerRuntime runtimeService )
// AddContainer is called between container create and container start
// so that initial CPU affinity settings can be written through to the
// container runtime before the first process begins to execute.
AddContainer ( p * v1 . Pod , c * v1 . Container , containerID string ) error
// RemoveContainer is called after Kubelet decides to kill or delete a
// container. After this call, the CPU manager stops trying to reconcile
// that container and any CPUs dedicated to the container are freed.
RemoveContainer ( containerID string ) error
// State returns a read-only interface to the internal CPU manager state.
State ( ) state . Reader
}
2017-09-01 17:46:39 +00:00
type manager struct {
sync . Mutex
policy Policy
// reconcilePeriod is the duration between calls to reconcileState.
reconcilePeriod time . Duration
// state allows pluggable CPU assignment policies while sharing a common
// representation of state for the system to inspect and reconcile.
state state . State
// containerRuntime is the container runtime service interface needed
// to make UpdateContainerResources() calls against the containers.
containerRuntime runtimeService
// activePods is a method for listing active pods on the node
// so all the containers can be updated in the reconciliation loop.
activePods ActivePodsFunc
// podStatusProvider provides a method for obtaining pod statuses
// and the containerID of their containers
podStatusProvider status . PodStatusProvider
machineInfo * cadvisorapi . MachineInfo
nodeAllocatableReservation v1 . ResourceList
}
var _ Manager = & manager { }
// NewManager creates new cpu manager based on provided policy
func NewManager (
cpuPolicyName string ,
reconcilePeriod time . Duration ,
machineInfo * cadvisorapi . MachineInfo ,
nodeAllocatableReservation v1 . ResourceList ,
) ( Manager , error ) {
var policy Policy
switch policyName ( cpuPolicyName ) {
case PolicyNone :
policy = NewNonePolicy ( )
2017-08-23 06:49:30 +00:00
case PolicyStatic :
topo , err := topology . Discover ( machineInfo )
if err != nil {
return nil , err
}
glog . Infof ( "[cpumanager] detected CPU topology: %v" , topo )
reservedCPUs , ok := nodeAllocatableReservation [ v1 . ResourceCPU ]
if ! ok {
// The static policy cannot initialize without this information. Panic!
panic ( "[cpumanager] unable to determine reserved CPU resources for static policy" )
}
if reservedCPUs . IsZero ( ) {
// Panic!
//
// The static policy requires this to be nonzero. Zero CPU reservation
// would allow the shared pool to be completely exhausted. At that point
// either we would violate our guarantee of exclusivity or need to evict
// any pod that has at least one container that requires zero CPUs.
// See the comments in policy_static.go for more details.
panic ( "[cpumanager] the static policy requires systemreserved.cpu + kubereserved.cpu to be greater than zero" )
}
// Take the ceiling of the reservation, since fractional CPUs cannot be
// exclusively allocated.
reservedCPUsFloat := float64 ( reservedCPUs . MilliValue ( ) ) / 1000
numReservedCPUs := int ( math . Ceil ( reservedCPUsFloat ) )
policy = NewStaticPolicy ( topo , numReservedCPUs )
2017-09-01 17:46:39 +00:00
default :
2017-08-23 06:49:30 +00:00
glog . Errorf ( "[cpumanager] Unknown policy \"%s\", falling back to default policy \"%s\"" , cpuPolicyName , PolicyNone )
2017-09-01 17:46:39 +00:00
policy = NewNonePolicy ( )
}
manager := & manager {
policy : policy ,
reconcilePeriod : reconcilePeriod ,
state : state . NewMemoryState ( ) ,
machineInfo : machineInfo ,
nodeAllocatableReservation : nodeAllocatableReservation ,
}
return manager , nil
}
func ( m * manager ) Start ( activePods ActivePodsFunc , podStatusProvider status . PodStatusProvider , containerRuntime runtimeService ) {
glog . Infof ( "[cpumanger] starting with %s policy" , m . policy . Name ( ) )
glog . Infof ( "[cpumanger] reconciling every %v" , m . reconcilePeriod )
m . activePods = activePods
m . podStatusProvider = podStatusProvider
m . containerRuntime = containerRuntime
m . policy . Start ( m . state )
if m . policy . Name ( ) == string ( PolicyNone ) {
return
}
go wait . Until ( func ( ) { m . reconcileState ( ) } , m . reconcilePeriod , wait . NeverStop )
}
func ( m * manager ) AddContainer ( p * v1 . Pod , c * v1 . Container , containerID string ) error {
m . Lock ( )
err := m . policy . AddContainer ( m . state , p , c , containerID )
if err != nil {
glog . Errorf ( "[cpumanager] AddContainer error: %v" , err )
m . Unlock ( )
return err
}
cpus := m . state . GetCPUSetOrDefault ( containerID )
m . Unlock ( )
2017-09-27 08:08:23 +00:00
if ! cpus . IsEmpty ( ) {
err = m . updateContainerCPUSet ( containerID , cpus )
if err != nil {
glog . Errorf ( "[cpumanager] AddContainer error: %v" , err )
return err
}
} else {
glog . V ( 5 ) . Infof ( "[cpumanager] update container resources is skipped due to cpu set is empty" )
2017-09-01 17:46:39 +00:00
}
2017-09-27 08:08:23 +00:00
2017-09-01 17:46:39 +00:00
return nil
}
func ( m * manager ) RemoveContainer ( containerID string ) error {
m . Lock ( )
defer m . Unlock ( )
err := m . policy . RemoveContainer ( m . state , containerID )
if err != nil {
glog . Errorf ( "[cpumanager] RemoveContainer error: %v" , err )
return err
}
return nil
}
func ( m * manager ) State ( ) state . Reader {
return m . state
}
type reconciledContainer struct {
podName string
containerName string
containerID string
}
func ( m * manager ) reconcileState ( ) ( success [ ] reconciledContainer , failure [ ] reconciledContainer ) {
success = [ ] reconciledContainer { }
failure = [ ] reconciledContainer { }
for _ , pod := range m . activePods ( ) {
allContainers := pod . Spec . InitContainers
allContainers = append ( allContainers , pod . Spec . Containers ... )
for _ , container := range allContainers {
status , ok := m . podStatusProvider . GetPodStatus ( pod . UID )
if ! ok {
glog . Warningf ( "[cpumanager] reconcileState: skipping pod; status not found (pod: %s, container: %s)" , pod . Name , container . Name )
failure = append ( failure , reconciledContainer { pod . Name , container . Name , "" } )
break
}
containerID , err := findContainerIDByName ( & status , container . Name )
if err != nil {
glog . Warningf ( "[cpumanager] reconcileState: skipping container; ID not found in status (pod: %s, container: %s, error: %v)" , pod . Name , container . Name , err )
failure = append ( failure , reconciledContainer { pod . Name , container . Name , "" } )
continue
}
cset := m . state . GetCPUSetOrDefault ( containerID )
if cset . IsEmpty ( ) {
// NOTE: This should not happen outside of tests.
glog . Infof ( "[cpumanager] reconcileState: skipping container; assigned cpuset is empty (pod: %s, container: %s)" , pod . Name , container . Name )
failure = append ( failure , reconciledContainer { pod . Name , container . Name , containerID } )
continue
}
2017-10-31 13:39:20 +00:00
glog . V ( 4 ) . Infof ( "[cpumanager] reconcileState: updating container (pod: %s, container: %s, container id: %s, cpuset: \"%v\")" , pod . Name , container . Name , containerID , cset )
2017-09-01 17:46:39 +00:00
err = m . updateContainerCPUSet ( containerID , cset )
if err != nil {
glog . Errorf ( "[cpumanager] reconcileState: failed to update container (pod: %s, container: %s, container id: %s, cpuset: \"%v\", error: %v)" , pod . Name , container . Name , containerID , cset , err )
failure = append ( failure , reconciledContainer { pod . Name , container . Name , containerID } )
continue
}
success = append ( success , reconciledContainer { pod . Name , container . Name , containerID } )
}
}
return success , failure
}
func findContainerIDByName ( status * v1 . PodStatus , name string ) ( string , error ) {
for _ , container := range status . ContainerStatuses {
if container . Name == name && container . ContainerID != "" {
cid := & kubecontainer . ContainerID { }
err := cid . ParseString ( container . ContainerID )
if err != nil {
return "" , err
}
return cid . ID , nil
}
}
return "" , fmt . Errorf ( "unable to find ID for container with name %v in pod status (it may not be running)" , name )
}
func ( m * manager ) updateContainerCPUSet ( containerID string , cpus cpuset . CPUSet ) error {
// TODO: Consider adding a `ResourceConfigForContainer` helper in
// helpers_linux.go similar to what exists for pods.
// It would be better to pass the full container resources here instead of
// this patch-like partial resources.
return m . containerRuntime . UpdateContainerResources (
containerID ,
& runtimeapi . LinuxContainerResources {
CpusetCpus : cpus . String ( ) ,
} )
}