From 376faea1cf5b5e0e79cb552520d10de9543f6612 Mon Sep 17 00:00:00 2001 From: Mike Danese Date: Mon, 21 Sep 2015 15:51:27 -0700 Subject: [PATCH] add pod garbage collection --- .../app/controllermanager.go | 8 + hack/verify-flags/known-flags.txt | 1 + pkg/controller/gc/doc.go | 24 +++ pkg/controller/gc/gc_controller.go | 137 ++++++++++++++++++ pkg/controller/gc/gc_controller_test.go | 129 +++++++++++++++++ 5 files changed, 299 insertions(+) create mode 100644 pkg/controller/gc/doc.go create mode 100644 pkg/controller/gc/gc_controller.go create mode 100644 pkg/controller/gc/gc_controller_test.go diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 228647b8ac..ef541da9a3 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -38,6 +38,7 @@ import ( "k8s.io/kubernetes/pkg/controller/daemon" "k8s.io/kubernetes/pkg/controller/deployment" "k8s.io/kubernetes/pkg/controller/endpoint" + "k8s.io/kubernetes/pkg/controller/gc" "k8s.io/kubernetes/pkg/controller/job" "k8s.io/kubernetes/pkg/controller/namespace" "k8s.io/kubernetes/pkg/controller/node" @@ -74,6 +75,7 @@ type CMServer struct { NamespaceSyncPeriod time.Duration PVClaimBinderSyncPeriod time.Duration VolumeConfigFlags VolumeConfigFlags + TerminatedPodGCThreshold int HorizontalPodAutoscalerSyncPeriod time.Duration DeploymentControllerSyncPeriod time.Duration RegisterRetryCount int @@ -164,6 +166,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.") + fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.") fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.") fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.") fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.") @@ -244,6 +247,11 @@ func (s *CMServer) Run(_ []string) error { go job.NewJobController(kubeClient). Run(s.ConcurrentJobSyncs, util.NeverStop) + if s.TerminatedPodGCThreshold > 0 { + go gc.New(kubeClient, s.TerminatedPodGCThreshold). + Run(util.NeverStop) + } + cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile) if err != nil { glog.Fatalf("Cloud provider could not be initialized: %v", err) diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index a2f83ae1d6..1e51ac045a 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -292,4 +292,5 @@ www-prefix retry_time file_content_in_loop cpu-cfs-quota +terminated-pod-gc-threshold diff --git a/pkg/controller/gc/doc.go b/pkg/controller/gc/doc.go new file mode 100644 index 0000000000..db08e7a36d --- /dev/null +++ b/pkg/controller/gc/doc.go @@ -0,0 +1,24 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package gc contains a very simple pod "garbage collector" implementation, +// GCController, that runs in the controller manager. If the number of pods +// in terminated phases (right now either Failed or Succeeded) surpasses a +// configurable threshold, the controller will delete pods in terminated state +// until the system reaches the allowed threshold again. The GCController +// prioritizes pods to delete by sorting by creation timestamp and deleting the +// oldest objects first. The GCController will not delete non-terminated pods. +package gc diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go new file mode 100644 index 0000000000..bec6958d23 --- /dev/null +++ b/pkg/controller/gc/gc_controller.go @@ -0,0 +1,137 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gc + +import ( + "sort" + "sync" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + "k8s.io/kubernetes/pkg/client/record" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" + "k8s.io/kubernetes/pkg/controller/framework" + "k8s.io/kubernetes/pkg/fields" + "k8s.io/kubernetes/pkg/labels" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util" + "k8s.io/kubernetes/pkg/watch" + + "github.com/golang/glog" +) + +const ( + fullResyncPeriod = 0 + gcCheckPeriod = 20 * time.Second +) + +type GCController struct { + kubeClient client.Interface + podControl controller.PodControlInterface + podStore cache.StoreToPodLister + podStoreSyncer *framework.Controller + threshold int +} + +func New(kubeClient client.Interface, threshold int) *GCController { + eventBroadcaster := record.NewBroadcaster() + eventBroadcaster.StartLogging(glog.Infof) + eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) + + gcc := &GCController{ + kubeClient: kubeClient, + podControl: controller.RealPodControl{ + Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "pod-garbage-collector"}), + KubeClient: kubeClient, + }, + threshold: threshold, + } + + terminatedSelector := compileTerminatedPodSelector() + + gcc.podStore.Store, gcc.podStoreSyncer = framework.NewInformer( + &cache.ListWatch{ + ListFunc: func() (runtime.Object, error) { + return gcc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), terminatedSelector) + }, + WatchFunc: func(rv string) (watch.Interface, error) { + return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, rv) + }, + }, + &api.Pod{}, + fullResyncPeriod, + framework.ResourceEventHandlerFuncs{}, + ) + return gcc +} + +func (gcc *GCController) Run(stop <-chan struct{}) { + go gcc.podStoreSyncer.Run(stop) + go util.Until(gcc.gc, gcCheckPeriod, stop) + <-stop +} + +func (gcc *GCController) gc() { + terminatedPods, _ := gcc.podStore.List(labels.Everything()) + terminatedPodCount := len(terminatedPods) + sort.Sort(byCreationTimestamp(terminatedPods)) + + deleteCount := terminatedPodCount - gcc.threshold + + if deleteCount > terminatedPodCount { + deleteCount = terminatedPodCount + } + if deleteCount > 0 { + glog.Infof("garbage collecting %v pods", deleteCount) + } + + var wait sync.WaitGroup + for i := 0; i < deleteCount; i++ { + wait.Add(1) + go func(namespace string, name string) { + defer wait.Done() + if err := gcc.podControl.DeletePod(namespace, name); err != nil { + // ignore not founds + defer util.HandleError(err) + } + }(terminatedPods[i].Namespace, terminatedPods[i].Name) + } + wait.Wait() +} + +func compileTerminatedPodSelector() fields.Selector { + selector, err := fields.ParseSelector("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown)) + if err != nil { + panic("terminatedSelector must compile: " + err.Error()) + } + return selector +} + +// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker. +type byCreationTimestamp []*api.Pod + +func (o byCreationTimestamp) Len() int { return len(o) } +func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] } + +func (o byCreationTimestamp) Less(i, j int) bool { + if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) { + return o[i].Name < o[j].Name + } + return o[i].CreationTimestamp.Before(o[j].CreationTimestamp) +} diff --git a/pkg/controller/gc/gc_controller_test.go b/pkg/controller/gc/gc_controller_test.go new file mode 100644 index 0000000000..1bf4bebfcf --- /dev/null +++ b/pkg/controller/gc/gc_controller_test.go @@ -0,0 +1,129 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package gc + +import ( + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/util/sets" +) + +type FakePodControl struct { + podSpec []api.PodTemplateSpec + deletePodName []string + lock sync.Mutex + err error +} + +func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { + panic("unimplemented") +} + +func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, spec *api.PodTemplateSpec, object runtime.Object) error { + panic("unimplemented") +} + +func (f *FakePodControl) DeletePod(namespace string, podName string) error { + f.lock.Lock() + defer f.lock.Unlock() + if f.err != nil { + return f.err + } + f.deletePodName = append(f.deletePodName, podName) + return nil +} +func (f *FakePodControl) clear() { + f.lock.Lock() + defer f.lock.Unlock() + f.deletePodName = []string{} + f.podSpec = []api.PodTemplateSpec{} +} + +func TestGC(t *testing.T) { + + testCases := []struct { + pods map[string]api.PodPhase + threshold int + deletedPodNames sets.String + }{ + { + pods: map[string]api.PodPhase{ + "a": api.PodFailed, + "b": api.PodSucceeded, + }, + threshold: 0, + deletedPodNames: sets.NewString("a", "b"), + }, + { + pods: map[string]api.PodPhase{ + "a": api.PodFailed, + "b": api.PodSucceeded, + }, + threshold: 1, + deletedPodNames: sets.NewString("a"), + }, + { + pods: map[string]api.PodPhase{ + "a": api.PodFailed, + "b": api.PodSucceeded, + }, + threshold: 5, + deletedPodNames: sets.NewString(), + }, + } + + for i, test := range testCases { + client := testclient.NewSimpleFake() + gcc := New(client, test.threshold) + fake := &FakePodControl{} + gcc.podControl = fake + + creationTime := time.Unix(0, 0) + for name, phase := range test.pods { + creationTime = creationTime.Add(1 * time.Hour) + gcc.podStore.Store.Add(&api.Pod{ + ObjectMeta: api.ObjectMeta{Name: name, CreationTimestamp: unversioned.Time{creationTime}}, + Status: api.PodStatus{Phase: phase}, + }) + } + + gcc.gc() + + pass := true + for _, pod := range fake.deletePodName { + if !test.deletedPodNames.Has(pod) { + pass = false + } + } + if len(fake.deletePodName) != len(test.deletedPodNames) { + pass = false + } + if !pass { + t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames.List(), fake.deletePodName) + } + } +} + +func TestTerminatedPodSelectorCompiles(t *testing.T) { + compileTerminatedPodSelector() +}