kubelet: fix checkpoint manager logic bug on restore

pull/8/head
Ryan Phillips 2018-05-08 14:12:20 -05:00
parent 5686fcfcf8
commit 6469c8e333
3 changed files with 57 additions and 10 deletions

View File

@ -108,6 +108,8 @@ go_test(
"//pkg/apis/core:go_default_library",
"//pkg/apis/core/v1:go_default_library",
"//pkg/apis/core/validation:go_default_library",
"//pkg/kubelet/checkpoint:go_default_library",
"//pkg/kubelet/checkpointmanager:go_default_library",
"//pkg/kubelet/types:go_default_library",
"//pkg/securitycontext:go_default_library",
"//vendor/github.com/stretchr/testify/assert:go_default_library",

View File

@ -113,17 +113,20 @@ func (c *PodConfig) Sync() {
// Restore restores pods from the checkpoint path, *once*
func (c *PodConfig) Restore(path string, updates chan<- interface{}) error {
var err error
if c.checkpointManager == nil {
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err == nil {
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
}
}
if c.checkpointManager != nil {
return nil
}
return err
var err error
c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path)
if err != nil {
return err
}
pods, err := checkpoint.LoadPods(c.checkpointManager)
if err != nil {
return err
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource}
return nil
}
// podStorage manages the current pod state at any point in time and ensures updates
@ -311,6 +314,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de
}
case kubetypes.RESTORE:
glog.V(4).Infof("Restoring pods for source %s", source)
for _, value := range update.Pods {
restorePods = append(restorePods, value)
}
default:
glog.Warningf("Received invalid update type: %v", update)

View File

@ -17,7 +17,9 @@ limitations under the License.
package config
import (
"io/ioutil"
"math/rand"
"os"
"reflect"
"sort"
"strconv"
@ -30,6 +32,9 @@ import (
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/tools/record"
"k8s.io/kubernetes/pkg/apis/core"
"k8s.io/kubernetes/pkg/kubelet/checkpoint"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext"
)
@ -85,6 +90,14 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}
func createPodConfigTesterByChannel(mode PodConfigNotificationMode, channelName string) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
channel := config.Channel(channelName)
ch := config.Updates()
return channel, ch, config
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
@ -413,3 +426,29 @@ func TestPodUpdateLabels(t *testing.T) {
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod))
}
func TestPodRestore(t *testing.T) {
tmpDir, _ := ioutil.TempDir("", "")
defer os.RemoveAll(tmpDir)
pod := CreateValidPod("api-server", "kube-default")
pod.Annotations = make(map[string]string, 0)
pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource
pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true"
// Create Checkpointer
checkpointManager, err := checkpointmanager.NewCheckpointManager(tmpDir)
if err != nil {
t.Fatalf("failed to initialize checkpoint manager: %v", err)
}
if err := checkpoint.WritePod(checkpointManager, pod); err != nil {
t.Fatalf("Error writing checkpoint for pod: %v", pod.GetName())
}
// Restore checkpoint
channel, ch, config := createPodConfigTesterByChannel(PodConfigNotificationIncremental, kubetypes.ApiserverSource)
if err := config.Restore(tmpDir, channel); err != nil {
t.Fatalf("Restore returned error: %v", err)
}
expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RESTORE, kubetypes.ApiserverSource, pod))
}