mirror of https://github.com/k3s-io/k3s
356 lines
10 KiB
Go
356 lines
10 KiB
Go
/*
|
|
Copyright 2014 The Kubernetes Authors.
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
http://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package kubelet
|
|
|
|
import (
|
|
"reflect"
|
|
"sync"
|
|
"testing"
|
|
"time"
|
|
|
|
"k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
"k8s.io/client-go/tools/record"
|
|
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
|
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
|
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
|
|
"k8s.io/kubernetes/pkg/kubelet/util/queue"
|
|
)
|
|
|
|
// fakePodWorkers runs sync pod function in serial, so we can have
|
|
// deterministic behaviour in testing.
|
|
type fakePodWorkers struct {
|
|
syncPodFn syncPodFnType
|
|
cache kubecontainer.Cache
|
|
t TestingInterface
|
|
}
|
|
|
|
func (f *fakePodWorkers) UpdatePod(options *UpdatePodOptions) {
|
|
status, err := f.cache.Get(options.Pod.UID)
|
|
if err != nil {
|
|
f.t.Errorf("Unexpected error: %v", err)
|
|
}
|
|
if err := f.syncPodFn(syncPodOptions{
|
|
mirrorPod: options.MirrorPod,
|
|
pod: options.Pod,
|
|
podStatus: status,
|
|
updateType: options.UpdateType,
|
|
killPodOptions: options.KillPodOptions,
|
|
}); err != nil {
|
|
f.t.Errorf("Unexpected error: %v", err)
|
|
}
|
|
}
|
|
|
|
func (f *fakePodWorkers) ForgetNonExistingPodWorkers(desiredPods map[types.UID]empty) {}
|
|
|
|
func (f *fakePodWorkers) ForgetWorker(uid types.UID) {}
|
|
|
|
type TestingInterface interface {
|
|
Errorf(format string, args ...interface{})
|
|
}
|
|
|
|
func newPod(uid, name string) *v1.Pod {
|
|
return &v1.Pod{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
UID: types.UID(uid),
|
|
Name: name,
|
|
},
|
|
}
|
|
}
|
|
|
|
// syncPodRecord is a record of a sync pod call
|
|
type syncPodRecord struct {
|
|
name string
|
|
updateType kubetypes.SyncPodType
|
|
}
|
|
|
|
func createPodWorkers() (*podWorkers, map[types.UID][]syncPodRecord) {
|
|
lock := sync.Mutex{}
|
|
processed := make(map[types.UID][]syncPodRecord)
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeRuntime := &containertest.FakeRuntime{}
|
|
fakeCache := containertest.NewFakeCache(fakeRuntime)
|
|
podWorkers := newPodWorkers(
|
|
func(options syncPodOptions) error {
|
|
func() {
|
|
lock.Lock()
|
|
defer lock.Unlock()
|
|
pod := options.pod
|
|
processed[pod.UID] = append(processed[pod.UID], syncPodRecord{
|
|
name: pod.Name,
|
|
updateType: options.updateType,
|
|
})
|
|
}()
|
|
return nil
|
|
},
|
|
fakeRecorder,
|
|
queue.NewBasicWorkQueue(&clock.RealClock{}),
|
|
time.Second,
|
|
time.Second,
|
|
fakeCache,
|
|
)
|
|
return podWorkers, processed
|
|
}
|
|
|
|
func drainWorkers(podWorkers *podWorkers, numPods int) {
|
|
for {
|
|
stillWorking := false
|
|
podWorkers.podLock.Lock()
|
|
for i := 0; i < numPods; i++ {
|
|
if podWorkers.isWorking[types.UID(string(i))] {
|
|
stillWorking = true
|
|
}
|
|
}
|
|
podWorkers.podLock.Unlock()
|
|
if !stillWorking {
|
|
break
|
|
}
|
|
time.Sleep(50 * time.Millisecond)
|
|
}
|
|
}
|
|
|
|
func TestUpdatePod(t *testing.T) {
|
|
podWorkers, processed := createPodWorkers()
|
|
|
|
numPods := 20
|
|
for i := 0; i < numPods; i++ {
|
|
for j := i; j < numPods; j++ {
|
|
podWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: newPod(string(j), string(i)),
|
|
UpdateType: kubetypes.SyncPodCreate,
|
|
})
|
|
}
|
|
}
|
|
drainWorkers(podWorkers, numPods)
|
|
|
|
if len(processed) != numPods {
|
|
t.Errorf("Not all pods processed: %v", len(processed))
|
|
return
|
|
}
|
|
for i := 0; i < numPods; i++ {
|
|
uid := types.UID(i)
|
|
if len(processed[uid]) < 1 || len(processed[uid]) > i+1 {
|
|
t.Errorf("Pod %v processed %v times", i, len(processed[uid]))
|
|
continue
|
|
}
|
|
|
|
// PodWorker guarantees the first and the last event will be processed
|
|
first := 0
|
|
last := len(processed[uid]) - 1
|
|
if processed[uid][first].name != string(0) {
|
|
t.Errorf("Pod %v: incorrect order %v, %v", i, first, processed[uid][first])
|
|
|
|
}
|
|
if processed[uid][last].name != string(i) {
|
|
t.Errorf("Pod %v: incorrect order %v, %v", i, last, processed[uid][last])
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestUpdatePodDoesNotForgetSyncPodKill(t *testing.T) {
|
|
podWorkers, processed := createPodWorkers()
|
|
numPods := 20
|
|
for i := 0; i < numPods; i++ {
|
|
pod := newPod(string(i), string(i))
|
|
podWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: pod,
|
|
UpdateType: kubetypes.SyncPodCreate,
|
|
})
|
|
podWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: pod,
|
|
UpdateType: kubetypes.SyncPodKill,
|
|
})
|
|
podWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: pod,
|
|
UpdateType: kubetypes.SyncPodUpdate,
|
|
})
|
|
}
|
|
drainWorkers(podWorkers, numPods)
|
|
if len(processed) != numPods {
|
|
t.Errorf("Not all pods processed: %v", len(processed))
|
|
return
|
|
}
|
|
for i := 0; i < numPods; i++ {
|
|
uid := types.UID(i)
|
|
// each pod should be processed two times (create, kill, but not update)
|
|
syncPodRecords := processed[uid]
|
|
if len(syncPodRecords) < 2 {
|
|
t.Errorf("Pod %v processed %v times, but expected at least 2", i, len(syncPodRecords))
|
|
continue
|
|
}
|
|
if syncPodRecords[0].updateType != kubetypes.SyncPodCreate {
|
|
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[0].updateType, kubetypes.SyncPodCreate)
|
|
}
|
|
if syncPodRecords[1].updateType != kubetypes.SyncPodKill {
|
|
t.Errorf("Pod %v event was %v, but expected %v", i, syncPodRecords[1].updateType, kubetypes.SyncPodKill)
|
|
}
|
|
}
|
|
}
|
|
|
|
func TestForgetNonExistingPodWorkers(t *testing.T) {
|
|
podWorkers, _ := createPodWorkers()
|
|
|
|
numPods := 20
|
|
for i := 0; i < numPods; i++ {
|
|
podWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: newPod(string(i), "name"),
|
|
UpdateType: kubetypes.SyncPodUpdate,
|
|
})
|
|
}
|
|
drainWorkers(podWorkers, numPods)
|
|
|
|
if len(podWorkers.podUpdates) != numPods {
|
|
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
|
|
}
|
|
|
|
desiredPods := map[types.UID]empty{}
|
|
desiredPods[types.UID(2)] = empty{}
|
|
desiredPods[types.UID(14)] = empty{}
|
|
podWorkers.ForgetNonExistingPodWorkers(desiredPods)
|
|
if len(podWorkers.podUpdates) != 2 {
|
|
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
|
|
}
|
|
if _, exists := podWorkers.podUpdates[types.UID(2)]; !exists {
|
|
t.Errorf("No updates channel for pod 2")
|
|
}
|
|
if _, exists := podWorkers.podUpdates[types.UID(14)]; !exists {
|
|
t.Errorf("No updates channel for pod 14")
|
|
}
|
|
|
|
podWorkers.ForgetNonExistingPodWorkers(map[types.UID]empty{})
|
|
if len(podWorkers.podUpdates) != 0 {
|
|
t.Errorf("Incorrect number of open channels %v", len(podWorkers.podUpdates))
|
|
}
|
|
}
|
|
|
|
type simpleFakeKubelet struct {
|
|
pod *v1.Pod
|
|
mirrorPod *v1.Pod
|
|
podStatus *kubecontainer.PodStatus
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func (kl *simpleFakeKubelet) syncPod(options syncPodOptions) error {
|
|
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
|
|
return nil
|
|
}
|
|
|
|
func (kl *simpleFakeKubelet) syncPodWithWaitGroup(options syncPodOptions) error {
|
|
kl.pod, kl.mirrorPod, kl.podStatus = options.pod, options.mirrorPod, options.podStatus
|
|
kl.wg.Done()
|
|
return nil
|
|
}
|
|
|
|
// byContainerName sort the containers in a running pod by their names.
|
|
type byContainerName kubecontainer.Pod
|
|
|
|
func (b byContainerName) Len() int { return len(b.Containers) }
|
|
|
|
func (b byContainerName) Swap(i, j int) {
|
|
b.Containers[i], b.Containers[j] = b.Containers[j], b.Containers[i]
|
|
}
|
|
|
|
func (b byContainerName) Less(i, j int) bool {
|
|
return b.Containers[i].Name < b.Containers[j].Name
|
|
}
|
|
|
|
// TestFakePodWorkers verifies that the fakePodWorkers behaves the same way as the real podWorkers
|
|
// for their invocation of the syncPodFn.
|
|
func TestFakePodWorkers(t *testing.T) {
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
fakeRuntime := &containertest.FakeRuntime{}
|
|
fakeCache := containertest.NewFakeCache(fakeRuntime)
|
|
|
|
kubeletForRealWorkers := &simpleFakeKubelet{}
|
|
kubeletForFakeWorkers := &simpleFakeKubelet{}
|
|
|
|
realPodWorkers := newPodWorkers(kubeletForRealWorkers.syncPodWithWaitGroup, fakeRecorder, queue.NewBasicWorkQueue(&clock.RealClock{}), time.Second, time.Second, fakeCache)
|
|
fakePodWorkers := &fakePodWorkers{kubeletForFakeWorkers.syncPod, fakeCache, t}
|
|
|
|
tests := []struct {
|
|
pod *v1.Pod
|
|
mirrorPod *v1.Pod
|
|
}{
|
|
{
|
|
&v1.Pod{},
|
|
&v1.Pod{},
|
|
},
|
|
{
|
|
podWithUIDNameNs("12345678", "foo", "new"),
|
|
podWithUIDNameNs("12345678", "fooMirror", "new"),
|
|
},
|
|
{
|
|
podWithUIDNameNs("98765", "bar", "new"),
|
|
podWithUIDNameNs("98765", "barMirror", "new"),
|
|
},
|
|
}
|
|
|
|
for i, tt := range tests {
|
|
kubeletForRealWorkers.wg.Add(1)
|
|
realPodWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: tt.pod,
|
|
MirrorPod: tt.mirrorPod,
|
|
UpdateType: kubetypes.SyncPodUpdate,
|
|
})
|
|
fakePodWorkers.UpdatePod(&UpdatePodOptions{
|
|
Pod: tt.pod,
|
|
MirrorPod: tt.mirrorPod,
|
|
UpdateType: kubetypes.SyncPodUpdate,
|
|
})
|
|
|
|
kubeletForRealWorkers.wg.Wait()
|
|
|
|
if !reflect.DeepEqual(kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod) {
|
|
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.pod, kubeletForFakeWorkers.pod)
|
|
}
|
|
|
|
if !reflect.DeepEqual(kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod) {
|
|
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.mirrorPod, kubeletForFakeWorkers.mirrorPod)
|
|
}
|
|
|
|
if !reflect.DeepEqual(kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus) {
|
|
t.Errorf("%d: Expected: %#v, Actual: %#v", i, kubeletForRealWorkers.podStatus, kubeletForFakeWorkers.podStatus)
|
|
}
|
|
}
|
|
}
|
|
|
|
// TestKillPodNowFunc tests the blocking kill pod function works with pod workers as expected.
|
|
func TestKillPodNowFunc(t *testing.T) {
|
|
fakeRecorder := &record.FakeRecorder{}
|
|
podWorkers, processed := createPodWorkers()
|
|
killPodFunc := killPodNow(podWorkers, fakeRecorder)
|
|
pod := newPod("test", "test")
|
|
gracePeriodOverride := int64(0)
|
|
err := killPodFunc(pod, v1.PodStatus{Phase: v1.PodFailed, Reason: "reason", Message: "message"}, &gracePeriodOverride)
|
|
if err != nil {
|
|
t.Errorf("Unexpected error: %v", err)
|
|
}
|
|
if len(processed) != 1 {
|
|
t.Errorf("len(processed) expected: %v, actual: %v", 1, len(processed))
|
|
return
|
|
}
|
|
syncPodRecords := processed[pod.UID]
|
|
if len(syncPodRecords) != 1 {
|
|
t.Errorf("Pod processed %v times, but expected %v", len(syncPodRecords), 1)
|
|
}
|
|
if syncPodRecords[0].updateType != kubetypes.SyncPodKill {
|
|
t.Errorf("Pod update type was %v, but expected %v", syncPodRecords[0].updateType, kubetypes.SyncPodKill)
|
|
}
|
|
}
|