Enable kubecontainer.Cache in kubelet

pull/6/head
Yu-Ju Hong 2016-01-08 13:20:30 -08:00
parent bd4c0427db
commit 07cf5cff48
3 changed files with 86 additions and 115 deletions

View File

@ -116,8 +116,14 @@ const (
plegChannelCapacity = 1000
// Generic PLEG relies on relisting for discovering container events.
// The period directly affects the response time of kubelet.
plegRelistPeriod = time.Second * 3
// A longer period means that kubelet will take longer to detect container
// changes and to update pod status. On the other hand, a shorter period
// will cause more frequent relisting (e.g., container runtime operations),
// leading to higher cpu usage.
// Note that even though we set the period to 1s, the relisting itself can
// take more than 1s to finish if the container runtime responds slowly
// and/or when there are many container changes in one cycle.
plegRelistPeriod = time.Second * 1
// backOffPeriod is the period to back off when pod syncing resulting in an
// error. It is also used as the base period for the exponential backoff
@ -341,6 +347,8 @@ func NewMainKubelet(
klet.livenessManager = proberesults.NewManager()
klet.podCache = kubecontainer.NewCache()
// Initialize the runtime.
switch containerRuntime {
case "docker":
@ -366,8 +374,6 @@ func NewMainKubelet(
imageBackOff,
serializeImagePulls,
)
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
case "rkt":
conf := &rkt.Config{
Path: rktPath,
@ -388,14 +394,13 @@ func NewMainKubelet(
return nil, err
}
klet.containerRuntime = rktRuntime
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, nil)
// No Docker daemon to put in a container.
dockerDaemonContainer = ""
default:
return nil, fmt.Errorf("unsupported container runtime %q specified", containerRuntime)
}
klet.pleg = pleg.NewGenericPLEG(klet.containerRuntime, plegChannelCapacity, plegRelistPeriod, klet.podCache)
klet.runtimeState = newRuntimeState(maxWaitForContainerRuntime, configureCBR0, podCIDR, klet.isContainerRuntimeVersionCompatible)
// setup containerGC
@ -442,7 +447,7 @@ func NewMainKubelet(
}
klet.runtimeCache = runtimeCache
klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod)
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
@ -579,6 +584,9 @@ type Kubelet struct {
// Generates pod events.
pleg pleg.PodLifecycleEventGenerator
// Store kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
// The name of the resource-only container to run the Kubelet in (empty for no container).
// Name must be absolute.
resourceContainer string
@ -1570,31 +1578,32 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil
}
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) (syncErr error) {
start := kl.clock.Now()
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; !ok {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
} else {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
// TODO: Remove runningPod from the arguments.
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
if updateType == kubetypes.SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime := kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
} else {
glog.V(3).Infof("First seen time not recorded for pod %q", pod.UID)
}
}
// Before returning, regenerate status and store it in the cache.
defer func() {
status, err := kl.generatePodStatus(pod)
if err != nil {
glog.Errorf("Unable to generate status for pod %q with error(%v)", format.Pod(pod), err)
// Propagate the error upstream.
syncErr = err
} else {
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
if !ok || existingStatus.Phase == api.PodPending && status.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, status)
}
}()
// Query the container runtime (or cache) to retrieve the pod status, and
// update it in the status manager.
podStatus, err := kl.getRuntimePodStatus(pod)
if err != nil {
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err)
return err
}
var apiPodStatus api.PodStatus
apiPodStatus, err = kl.generatePodStatus(pod, podStatus)
if err != nil {
return err
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
// Kill pods we can't run.
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
@ -1645,51 +1654,6 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}
kl.volumeManager.SetVolumes(pod.UID, podVolumes)
// The kubelet is the source of truth for pod status. It ignores the status sent from
// the apiserver and regenerates status for every pod update, incrementally updating
// the status it received at pod creation time.
//
// The container runtime needs 2 pieces of information from the status to sync a pod:
// The terminated state of containers (to restart them) and the podIp (for liveness probes).
// New pods don't have either, so we skip the expensive status generation step.
//
// If we end up here with a create event for an already running pod, it could result in a
// restart of its containers. This cannot happen unless the kubelet restarts, because the
// delete before the second create would cancel this pod worker.
//
// If the kubelet restarts, we have a bunch of running containers for which we get create
// events. This is ok, because the pod status for these will include the podIp and terminated
// status. Any race conditions here effectively boils down to -- the pod worker didn't sync
// state of a newly started container with the apiserver before the kubelet restarted, so
// it's OK to pretend like the kubelet started them after it restarted.
var apiPodStatus api.PodStatus
var podStatus *kubecontainer.PodStatus
// Always generate the kubecontainer.PodStatus to know whether there are
// running containers associated with the pod.
podStatusPtr, apiPodStatusPtr, err := kl.containerRuntime.GetPodStatusAndAPIPodStatus(pod)
if err != nil {
glog.Errorf("Unable to get status for pod %q: %v", format.Pod(pod), err)
return err
}
apiPodStatus = *apiPodStatusPtr
podStatus = podStatusPtr
if updateType == kubetypes.SyncPodCreate {
// This is the first time we are syncing the pod. Record the latency
// since kubelet first saw the pod if firstSeenTime is set.
if !firstSeenTime.IsZero() {
metrics.PodWorkerStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
// kubelet may have just been restarted. Re-use the last known
// apiPodStatus.
apiPodStatus = pod.Status
apiPodStatus.StartTime = &unversioned.Time{Time: start}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
glog.V(3).Infof("Reusing api pod status for new pod %q", format.Pod(pod))
}
pullSecrets, err := kl.getPullSecretsForPod(pod)
if err != nil {
glog.Errorf("Unable to get pull secrets for pod %q: %v", format.Pod(pod), err)
@ -2291,10 +2255,6 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan kubetypes.PodUpdate, handler
glog.Errorf("Kubelet does not support snapshot update")
}
case e := <-plegCh:
// Filter out started events since we don't use them now.
if e.Type == pleg.ContainerStarted {
break
}
pod, ok := kl.podManager.GetPodByUID(e.ID)
if !ok {
// If the pod no longer exists, ignore the event.
@ -3088,19 +3048,21 @@ func GetPhase(spec *api.PodSpec, info []api.ContainerStatus) api.PodPhase {
}
}
// By passing the pod directly, this method avoids pod lookup, which requires
// grabbing a lock.
// TODO(random-liu): api.PodStatus is named as podStatus, this maybe confusing, this may happen in other functions
// after refactoring, modify them later.
func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
// Get the internal PodStatus from the cache if the cache exists;
// otherwise, query the runtime directly.
func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus, error) {
start := kl.clock.Now()
defer func() {
metrics.PodStatusLatency.Observe(metrics.SinceInMicroseconds(start))
}()
if kl.podCache != nil {
return kl.podCache.Get(pod.UID)
}
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
}
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// TODO: Consider include the container information.
if kl.pastActiveDeadline(pod) {
reason := "DeadlineExceeded"
@ -3111,44 +3073,32 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod) (api.PodStatus, error) {
Message: "Pod was active on the node longer than specified deadline"}, nil
}
spec := &pod.Spec
podStatus, err := kl.containerRuntime.GetAPIPodStatus(pod)
// Ask the runtime to convert the internal PodStatus to api.PodStatus.
s, err := kl.containerRuntime.ConvertPodStatusToAPIPodStatus(pod, podStatus)
if err != nil {
// Error handling
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), err)
if strings.Contains(err.Error(), "resource temporarily unavailable") {
// Leave upstream layer to decide what to do
return api.PodStatus{}, err
}
pendingStatus := api.PodStatus{
Phase: api.PodPending,
Reason: "GeneralError",
Message: fmt.Sprintf("Query container info failed with error (%v)", err),
}
return pendingStatus, nil
glog.Infof("Failed to convert PodStatus to api.PodStatus for %q: %v", format.Pod(pod), err)
return api.PodStatus{}, err
}
// Assume info is ready to process
podStatus.Phase = GetPhase(spec, podStatus.ContainerStatuses)
kl.probeManager.UpdatePodStatus(pod.UID, podStatus)
podStatus.Conditions = append(podStatus.Conditions, status.GeneratePodReadyCondition(spec, podStatus.ContainerStatuses, podStatus.Phase))
spec := &pod.Spec
s.Phase = GetPhase(spec, s.ContainerStatuses)
kl.probeManager.UpdatePodStatus(pod.UID, s)
s.Conditions = append(s.Conditions, status.GeneratePodReadyCondition(spec, s.ContainerStatuses, s.Phase))
if !kl.standaloneMode {
hostIP, err := kl.GetHostIP()
if err != nil {
glog.V(4).Infof("Cannot get host IP: %v", err)
} else {
podStatus.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && podStatus.PodIP == "" {
podStatus.PodIP = hostIP.String()
s.HostIP = hostIP.String()
if podUsesHostNetwork(pod) && s.PodIP == "" {
s.PodIP = hostIP.String()
}
}
}
return *podStatus, nil
return *s, nil
}
// Returns logs of current machine.

View File

@ -71,6 +71,9 @@ type podWorkers struct {
// resyncInterval is the duration to wait until the next sync.
resyncInterval time.Duration
// podCache stores kubecontainer.PodStatus for all pods.
podCache kubecontainer.Cache
}
type workUpdate struct {
@ -88,7 +91,7 @@ type workUpdate struct {
}
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration) *podWorkers {
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
@ -99,13 +102,27 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
workQueue: workQueue,
resyncInterval: resyncInterval,
backOffPeriod: backOffPeriod,
podCache: podCache,
}
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time
for newWork := range podUpdates {
err := func() (err error) {
podID := newWork.pod.UID
if p.podCache != nil {
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
// TODO: We don't consume the return PodStatus yet, but we
// should pass it to syncPod() eventually.
p.podCache.GetNewerThan(podID, minRuntimeCacheTime)
}
// TODO: Deprecate the runtime cache.
// We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
@ -206,10 +223,13 @@ func (p *podWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty
func (p *podWorkers) wrapUp(uid types.UID, syncErr error) {
// Requeue the last update if the last sync returned error.
if syncErr != nil {
p.workQueue.Enqueue(uid, p.backOffPeriod)
} else {
switch {
case syncErr == nil:
// No error; requeue at the regular resync interval.
p.workQueue.Enqueue(uid, p.resyncInterval)
default:
// Error occurred during the sync; back off and then retry.
p.workQueue.Enqueue(uid, p.backOffPeriod)
}
p.checkForUpdates(uid)
}

View File

@ -60,6 +60,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
queue.NewBasicWorkQueue(),
time.Second,
time.Second,
nil,
)
return podWorkers, processed
}
@ -190,7 +191,7 @@ func TestFakePodWorkers(t *testing.T) {
kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second)
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
tests := []struct {