diff --git a/pkg/kubelet/config/BUILD b/pkg/kubelet/config/BUILD index 37e72d75e2..6985adfdd1 100644 --- a/pkg/kubelet/config/BUILD +++ b/pkg/kubelet/config/BUILD @@ -108,6 +108,8 @@ go_test( "//pkg/apis/core:go_default_library", "//pkg/apis/core/v1:go_default_library", "//pkg/apis/core/validation:go_default_library", + "//pkg/kubelet/checkpoint:go_default_library", + "//pkg/kubelet/checkpointmanager:go_default_library", "//pkg/kubelet/types:go_default_library", "//pkg/securitycontext:go_default_library", "//vendor/github.com/stretchr/testify/assert:go_default_library", diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index 6ffb448254..51c2e29c54 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -113,17 +113,20 @@ func (c *PodConfig) Sync() { // Restore restores pods from the checkpoint path, *once* func (c *PodConfig) Restore(path string, updates chan<- interface{}) error { - var err error - if c.checkpointManager == nil { - c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path) - if err != nil { - pods, err := checkpoint.LoadPods(c.checkpointManager) - if err == nil { - updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} - } - } + if c.checkpointManager != nil { + return nil } - return err + var err error + c.checkpointManager, err = checkpointmanager.NewCheckpointManager(path) + if err != nil { + return err + } + pods, err := checkpoint.LoadPods(c.checkpointManager) + if err != nil { + return err + } + updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.RESTORE, Source: kubetypes.ApiserverSource} + return nil } // podStorage manages the current pod state at any point in time and ensures updates @@ -311,6 +314,9 @@ func (s *podStorage) merge(source string, change interface{}) (adds, updates, de } case kubetypes.RESTORE: glog.V(4).Infof("Restoring pods for source %s", source) + for _, value := range update.Pods { + restorePods = append(restorePods, value) + } default: glog.Warningf("Received invalid update type: %v", update) diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index deb1f4dcaf..f41542a9a6 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -17,7 +17,9 @@ limitations under the License. package config import ( + "io/ioutil" "math/rand" + "os" "reflect" "sort" "strconv" @@ -30,6 +32,9 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/record" + "k8s.io/kubernetes/pkg/apis/core" + "k8s.io/kubernetes/pkg/kubelet/checkpoint" + "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" ) @@ -85,6 +90,14 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source} } +func createPodConfigTesterByChannel(mode PodConfigNotificationMode, channelName string) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { + eventBroadcaster := record.NewBroadcaster() + config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"})) + channel := config.Channel(channelName) + ch := config.Updates() + return channel, ch, config +} + func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { eventBroadcaster := record.NewBroadcaster() config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"})) @@ -413,3 +426,29 @@ func TestPodUpdateLabels(t *testing.T) { expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.UPDATE, TestSource, pod)) } + +func TestPodRestore(t *testing.T) { + tmpDir, _ := ioutil.TempDir("", "") + defer os.RemoveAll(tmpDir) + + pod := CreateValidPod("api-server", "kube-default") + pod.Annotations = make(map[string]string, 0) + pod.Annotations["kubernetes.io/config.source"] = kubetypes.ApiserverSource + pod.Annotations[core.BootstrapCheckpointAnnotationKey] = "true" + + // Create Checkpointer + checkpointManager, err := checkpointmanager.NewCheckpointManager(tmpDir) + if err != nil { + t.Fatalf("failed to initialize checkpoint manager: %v", err) + } + if err := checkpoint.WritePod(checkpointManager, pod); err != nil { + t.Fatalf("Error writing checkpoint for pod: %v", pod.GetName()) + } + + // Restore checkpoint + channel, ch, config := createPodConfigTesterByChannel(PodConfigNotificationIncremental, kubetypes.ApiserverSource) + if err := config.Restore(tmpDir, channel); err != nil { + t.Fatalf("Restore returned error: %v", err) + } + expectPodUpdate(t, ch, CreatePodUpdate(kubetypes.RESTORE, kubetypes.ApiserverSource, pod)) +}