Remove RuntimeCache from sync path

This change removes RuntimeCache in the pod workers and the syncPod() function.
Note that it doesn't deprecate RuntimeCache completely as other components
still rely on the cache.
pull/6/head
Yu-Ju Hong 2016-01-20 13:26:02 -08:00
parent 32ab64ce5b
commit ff04de4fc0
7 changed files with 121 additions and 163 deletions

View File

@ -0,0 +1,48 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"time"
"k8s.io/kubernetes/pkg/types"
)
type fakeCache struct {
runtime Runtime
}
func NewFakeCache(runtime Runtime) Cache {
return &fakeCache{runtime: runtime}
}
func (c *fakeCache) Get(id types.UID) (*PodStatus, error) {
return c.runtime.GetPodStatus(id, "", "")
}
func (c *fakeCache) GetNewerThan(id types.UID, minTime time.Time) (*PodStatus, error) {
return c.Get(id)
}
func (c *fakeCache) Set(id types.UID, status *PodStatus, err error, timestamp time.Time) {
}
func (c *fakeCache) Delete(id types.UID) {
}
func (c *fakeCache) UpdateTime(_ time.Time) {
}

View File

@ -26,17 +26,17 @@ import (
// fakePodWorkers runs sync pod function in serial, so we can have
// deterministic behaviour in testing.
type fakePodWorkers struct {
syncPodFn syncPodFnType
runtimeCache kubecontainer.RuntimeCache
t TestingInterface
syncPodFn syncPodFnType
cache kubecontainer.Cache
t TestingInterface
}
func (f *fakePodWorkers) UpdatePod(pod *api.Pod, mirrorPod *api.Pod, updateType kubetypes.SyncPodType, updateComplete func()) {
pods, err := f.runtimeCache.GetPods()
status, err := f.cache.Get(pod.UID)
if err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
if err := f.syncPodFn(pod, mirrorPod, kubecontainer.Pods(pods).FindPodByID(pod.UID), kubetypes.SyncPodUpdate); err != nil {
if err := f.syncPodFn(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
f.t.Errorf("Unexpected error: %v", err)
}
}

View File

@ -447,7 +447,7 @@ func NewMainKubelet(
klet.runtimeCache = runtimeCache
klet.reasonCache = NewReasonCache()
klet.workQueue = queue.NewBasicWorkQueue()
klet.podWorkers = newPodWorkers(runtimeCache, klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.podWorkers = newPodWorkers(klet.syncPod, recorder, klet.workQueue, klet.resyncInterval, backOffPeriod, klet.podCache)
klet.backOff = util.NewBackOff(backOffPeriod, MaxContainerBackOff)
klet.podKillingCh = make(chan *kubecontainer.Pod, podKillingChannelCapacity)
@ -1571,9 +1571,16 @@ func parseResolvConf(reader io.Reader, dnsScrubber dnsScrubber) (nameservers []s
return nameservers, searches, nil
}
// Kill all running containers in a pod (includes the pod infra container).
func (kl *Kubelet) killPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
return kl.containerRuntime.KillPod(pod, runningPod)
// One of the following aruguements must be non-nil: runningPod, status.
// TODO: Modify containerRuntime.KillPod() to accept the right arguements.
func (kl *Kubelet) killPod(pod *api.Pod, runningPod *kubecontainer.Pod, status *kubecontainer.PodStatus) error {
var p kubecontainer.Pod
if runningPod != nil {
p = *runningPod
} else if status != nil {
p = kubecontainer.ConvertPodStatusToRunningPod(status)
}
return kl.containerRuntime.KillPod(pod, p)
}
type empty struct{}
@ -1593,8 +1600,7 @@ func (kl *Kubelet) makePodDataDirs(pod *api.Pod) error {
return nil
}
// TODO: Remove runningPod from the arguments.
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, podStatus *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
var firstSeenTime time.Time
if firstSeenTimeStr, ok := pod.Annotations[kubetypes.ConfigFirstSeenAnnotationKey]; ok {
firstSeenTime = kubetypes.ConvertToTimestamp(firstSeenTimeStr).Get()
@ -1610,29 +1616,21 @@ func (kl *Kubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecont
}
}
// Query the container runtime (or cache) to retrieve the pod status, and
// update it in the status manager.
podStatus, statusErr := kl.getRuntimePodStatus(pod)
apiPodStatus, err := kl.generatePodStatus(pod, podStatus, statusErr)
apiPodStatus, err := kl.generatePodStatus(pod, podStatus)
if err != nil {
return err
}
// Record the time it takes for the pod to become running.
existingStatus, ok := kl.statusManager.GetPodStatus(pod.UID)
// TODO: The logic seems wrong since the pod phase can become pending when
// the container runtime is temporarily not available.
if statusErr == nil && !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
if !ok || existingStatus.Phase == api.PodPending && apiPodStatus.Phase == api.PodRunning &&
!firstSeenTime.IsZero() {
metrics.PodStartLatency.Observe(metrics.SinceInMicroseconds(firstSeenTime))
}
kl.statusManager.SetPodStatus(pod, apiPodStatus)
if statusErr != nil {
return statusErr
}
// Kill pods we can't run.
if err := canRunPod(pod); err != nil || pod.DeletionTimestamp != nil {
if err := kl.killPod(pod, runningPod); err != nil {
if err := kl.killPod(pod, nil, podStatus); err != nil {
utilruntime.HandleError(err)
}
return err
@ -2100,7 +2098,7 @@ func (kl *Kubelet) podKiller() {
ch <- pod.ID
}()
glog.V(2).Infof("Killing unwanted pod %q", pod.Name)
err := kl.killPod(nil, *pod)
err := kl.killPod(nil, pod, nil)
if err != nil {
glog.Errorf("Failed killing the pod %q: %v", pod.Name, err)
}
@ -3092,7 +3090,7 @@ func (kl *Kubelet) getRuntimePodStatus(pod *api.Pod) (*kubecontainer.PodStatus,
return kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
}
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus, statusErr error) (api.PodStatus, error) {
func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodStatus) (api.PodStatus, error) {
glog.V(3).Infof("Generating status for %q", format.Pod(pod))
// TODO: Consider include the container information.
if kl.pastActiveDeadline(pod) {
@ -3104,16 +3102,6 @@ func (kl *Kubelet) generatePodStatus(pod *api.Pod, podStatus *kubecontainer.PodS
Message: "Pod was active on the node longer than specified deadline"}, nil
}
if statusErr != nil {
// TODO: Re-evaluate whether we should set the status to "Pending".
glog.Infof("Query container info for pod %q failed with error (%v)", format.Pod(pod), statusErr)
return api.PodStatus{
Phase: api.PodPending,
Reason: "GeneralError",
Message: fmt.Sprintf("Query container info failed with error (%v)", statusErr),
}, nil
}
// Convert the internal PodStatus to api.PodStatus.
s := kl.convertStatusToAPIStatus(pod, podStatus)
// Assume info is ready to process

View File

@ -150,10 +150,11 @@ func newTestKubelet(t *testing.T) *TestKubelet {
kubelet.containerRuntime = fakeRuntime
kubelet.runtimeCache = kubecontainer.NewFakeRuntimeCache(kubelet.containerRuntime)
kubelet.reasonCache = NewReasonCache()
kubelet.podCache = kubecontainer.NewFakeCache(kubelet.containerRuntime)
kubelet.podWorkers = &fakePodWorkers{
syncPodFn: kubelet.syncPod,
runtimeCache: kubelet.runtimeCache,
t: t,
syncPodFn: kubelet.syncPod,
cache: kubelet.podCache,
t: t,
}
kubelet.probeManager = prober.FakeManager{}
@ -3390,7 +3391,7 @@ func TestCreateMirrorPod(t *testing.T) {
}
pods := []*api.Pod{pod}
kl.podManager.SetPods(pods)
err := kl.syncPod(pod, nil, container.Pod{}, updateType)
err := kl.syncPod(pod, nil, &container.PodStatus{}, updateType)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3448,7 +3449,7 @@ func TestDeleteOutdatedMirrorPod(t *testing.T) {
pods := []*api.Pod{pod, mirrorPod}
kl.podManager.SetPods(pods)
err := kl.syncPod(pod, mirrorPod, container.Pod{}, kubetypes.SyncPodUpdate)
err := kl.syncPod(pod, mirrorPod, &container.PodStatus{}, kubetypes.SyncPodUpdate)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
@ -3614,7 +3615,7 @@ func TestHostNetworkAllowed(t *testing.T) {
},
}
kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err)
}
@ -3647,7 +3648,7 @@ func TestHostNetworkDisallowed(t *testing.T) {
},
},
}
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
if err == nil {
t.Errorf("expected pod infra creation to fail")
}
@ -3674,7 +3675,7 @@ func TestPrivilegeContainerAllowed(t *testing.T) {
},
}
kubelet.podManager.SetPods([]*api.Pod{pod})
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
if err != nil {
t.Errorf("expected pod infra creation to succeed: %v", err)
}
@ -3700,7 +3701,7 @@ func TestPrivilegeContainerDisallowed(t *testing.T) {
},
},
}
err := kubelet.syncPod(pod, nil, container.Pod{}, kubetypes.SyncPodUpdate)
err := kubelet.syncPod(pod, nil, &container.PodStatus{}, kubetypes.SyncPodUpdate)
if err == nil {
t.Errorf("expected pod infra creation to fail")
}

View File

@ -37,7 +37,7 @@ type PodWorkers interface {
ForgetWorker(uid types.UID)
}
type syncPodFnType func(*api.Pod, *api.Pod, kubecontainer.Pod, kubetypes.SyncPodType) error
type syncPodFnType func(*api.Pod, *api.Pod, *kubecontainer.PodStatus, kubetypes.SyncPodType) error
type podWorkers struct {
// Protects all per worker fields.
@ -53,8 +53,6 @@ type podWorkers struct {
// Tracks the last undelivered work item for this pod - a work item is
// undelivered if it comes in while the worker is working.
lastUndeliveredWorkUpdate map[types.UID]workUpdate
// runtimeCache is used for listing running containers.
runtimeCache kubecontainer.RuntimeCache
workQueue queue.WorkQueue
@ -90,13 +88,12 @@ type workUpdate struct {
updateType kubetypes.SyncPodType
}
func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnType,
recorder record.EventRecorder, workQueue queue.WorkQueue, resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
func newPodWorkers(syncPodFn syncPodFnType, recorder record.EventRecorder, workQueue queue.WorkQueue,
resyncInterval, backOffPeriod time.Duration, podCache kubecontainer.Cache) *podWorkers {
return &podWorkers{
podUpdates: map[types.UID]chan workUpdate{},
isWorking: map[types.UID]bool{},
lastUndeliveredWorkUpdate: map[types.UID]workUpdate{},
runtimeCache: runtimeCache,
syncPodFn: syncPodFn,
recorder: recorder,
workQueue: workQueue,
@ -107,45 +104,31 @@ func newPodWorkers(runtimeCache kubecontainer.RuntimeCache, syncPodFn syncPodFnT
}
func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
var minRuntimeCacheTime time.Time
var lastSyncTime time.Time
for newWork := range podUpdates {
err := func() (err error) {
err := func() error {
podID := newWork.pod.UID
if p.podCache != nil {
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
// TODO: We don't consume the return PodStatus yet, but we
// should pass it to syncPod() eventually.
p.podCache.GetNewerThan(podID, minRuntimeCacheTime)
}
// TODO: Deprecate the runtime cache.
// We would like to have the state of the containers from at least
// the moment when we finished the previous processing of that pod.
if err := p.runtimeCache.ForceUpdateIfOlder(minRuntimeCacheTime); err != nil {
glog.Errorf("Error updating the container runtime cache: %v", err)
// This is a blocking call that would return only if the cache
// has an entry for the pod that is newer than minRuntimeCache
// Time. This ensures the worker doesn't start syncing until
// after the cache is at least newer than the finished time of
// the previous sync.
status, err := p.podCache.GetNewerThan(podID, lastSyncTime)
if err != nil {
return err
}
pods, err := p.runtimeCache.GetPods()
err = p.syncPodFn(newWork.pod, newWork.mirrorPod, status, newWork.updateType)
lastSyncTime = time.Now()
if err != nil {
glog.Errorf("Error getting pods while syncing pod: %v", err)
return err
}
err = p.syncPodFn(newWork.pod, newWork.mirrorPod,
kubecontainer.Pods(pods).FindPodByID(newWork.pod.UID), newWork.updateType)
minRuntimeCacheTime = time.Now()
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
return err
}
newWork.updateCompleteFn()
return nil
}()
if err != nil {
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
p.recorder.Eventf(newWork.pod, api.EventTypeWarning, kubecontainer.FailedSync, "Error syncing pod, skipping: %v", err)
}
p.wrapUp(newWork.pod.UID, err)
}
}

View File

@ -18,7 +18,6 @@ package kubelet
import (
"reflect"
"sort"
"sync"
"testing"
"time"
@ -45,10 +44,9 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
processed := make(map[types.UID][]string)
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &kubecontainer.FakeRuntime{}
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime)
fakeCache := kubecontainer.NewFakeCache(fakeRuntime)
podWorkers := newPodWorkers(
fakeRuntimeCache,
func(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
func(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
func() {
lock.Lock()
defer lock.Unlock()
@ -60,7 +58,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
queue.NewBasicWorkQueue(),
time.Second,
time.Second,
nil,
fakeCache,
)
return podWorkers, processed
}
@ -151,19 +149,19 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
}
type simpleFakeKubelet struct {
pod *api.Pod
mirrorPod *api.Pod
runningPod kubecontainer.Pod
wg sync.WaitGroup
pod *api.Pod
mirrorPod *api.Pod
podStatus *kubecontainer.PodStatus
wg sync.WaitGroup
}
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
func (kl *simpleFakeKubelet) syncPod(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
return nil
}
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, runningPod kubecontainer.Pod, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.runningPod = pod, mirrorPod, runningPod
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(pod *api.Pod, mirrorPod *api.Pod, status *kubecontainer.PodStatus, updateType kubetypes.SyncPodType) error {
kl.pod, kl.mirrorPod, kl.podStatus = pod, mirrorPod, status
kl.wg.Done()
return nil
}
@ -186,25 +184,21 @@ func (b byContainerName) Less(i, j int) bool {
func TestFakePodWorkers(t *testing.T) {
fakeRecorder := &record.FakeRecorder{}
fakeRuntime := &kubecontainer.FakeRuntime{}
fakeRuntimeCache := kubecontainer.NewFakeRuntimeCache(fakeRuntime)
fakeCache := kubecontainer.NewFakeCache(fakeRuntime)
kubeletForRealWorkers := &simpleFakeKubelet{}
kubeletForFakeWorkers := &simpleFakeKubelet{}
realPodWorkers := newPodWorkers(fakeRuntimeCache, kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, nil)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeRuntimeCache, t}
realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(), time.Second, time.Second, fakeCache)
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
tests := []struct {
pod *api.Pod
mirrorPod *api.Pod
podList []*kubecontainer.Pod
containersInRunningPod int
pod *api.Pod
mirrorPod *api.Pod
}{
{
&api.Pod{},
&api.Pod{},
[]*kubecontainer.Pod{},
0,
},
{
&api.Pod{
@ -221,29 +215,6 @@ func TestFakePodWorkers(t *testing.T) {
Namespace: "new",
},
},
[]*kubecontainer.Pod{
{
ID: "12345678",
Name: "foo",
Namespace: "new",
Containers: []*kubecontainer.Container{
{
Name: "fooContainer",
},
},
},
{
ID: "12345678",
Name: "fooMirror",
Namespace: "new",
Containers: []*kubecontainer.Container{
{
Name: "fooContainerMirror",
},
},
},
},
1,
},
{
&api.Pod{
@ -260,42 +231,11 @@ func TestFakePodWorkers(t *testing.T) {
Namespace: "new",
},
},
[]*kubecontainer.Pod{
{
ID: "98765",
Name: "bar",
Namespace: "new",
Containers: []*kubecontainer.Container{
{
Name: "barContainer0",
},
{
Name: "barContainer1",
},
},
},
{
ID: "98765",
Name: "barMirror",
Namespace: "new",
Containers: []*kubecontainer.Container{
{
Name: "barContainerMirror0",
},
{
Name: "barContainerMirror1",
},
},
},
},
2,
},
}
for i, tt := range tests {
kubeletForRealWorkers.wg.Add(1)
fakeRuntime.PodList = tt.podList
realPodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
fakePodWorkers.UpdatePod(tt.pod, tt.mirrorPod, kubetypes.SyncPodUpdate, func() {})
@ -309,14 +249,8 @@ func TestFakePodWorkers(t *testing.T) {
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
}
if tt.containersInRunningPod != len(kubeletForFakeWorkers.runningPod.Containers) {
t.Errorf("%d: Expected: %#v, Actual: %#v", i, tt.containersInRunningPod, len(kubeletForFakeWorkers.runningPod.Containers))
}
sort.Sort(byContainerName(kubeletForRealWorkers.runningPod))
sort.Sort(byContainerName(kubeletForFakeWorkers.runningPod))
if !reflect.DeepEqual(kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod) {
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.runningPod, kubeletForFakeWorkers.runningPod)
if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
}
}
}

View File

@ -122,13 +122,17 @@ func (kl *Kubelet) runPod(pod *api.Pod, retryDelay time.Duration) error {
}
glog.Infof("pod %q containers not running: syncing", pod.Name)
status, err := kl.containerRuntime.GetPodStatus(pod.UID, pod.Name, pod.Namespace)
if err != nil {
glog.Errorf("Unable to get status for pod %q: %v", pod.Name, err)
}
glog.Infof("Creating a mirror pod for static pod %q", format.Pod(pod))
if err := kl.podManager.CreateMirrorPod(pod); err != nil {
glog.Errorf("Failed creating a mirror pod %q: %v", format.Pod(pod), err)
}
mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
if err = kl.syncPod(pod, mirrorPod, p, kubetypes.SyncPodUpdate); err != nil {
if err = kl.syncPod(pod, mirrorPod, status, kubetypes.SyncPodUpdate); err != nil {
return fmt.Errorf("error syncing pod: %v", err)
}
if retry >= runOnceMaxRetries {