add pod garbage collection

pull/6/head
Mike Danese 2015-09-21 15:51:27 -07:00
parent 1daa365d7a
commit 376faea1cf
5 changed files with 299 additions and 0 deletions

View File

@ -38,6 +38,7 @@ import (
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/gc"
"k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/controller/namespace"
"k8s.io/kubernetes/pkg/controller/node"
@ -74,6 +75,7 @@ type CMServer struct {
NamespaceSyncPeriod time.Duration
PVClaimBinderSyncPeriod time.Duration
VolumeConfigFlags VolumeConfigFlags
TerminatedPodGCThreshold int
HorizontalPodAutoscalerSyncPeriod time.Duration
DeploymentControllerSyncPeriod time.Duration
RegisterRetryCount int
@ -164,6 +166,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "pv-recycler-pod-template-filepath-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathHostPath, "The file path to a pod definition used as a template for HostPath persistent volume recycling. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "pv-recycler-minimum-timeout-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutHostPath, "The minimum ActiveDeadlineSeconds to use for a HostPath Recycler pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "pv-recycler-timeout-increment-hostpath", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutHostPath, "the increment of time added per Gi to ActiveDeadlineSeconds for a HostPath scrubber pod. This is for development and testing only and will not work in a multi-node cluster.")
fs.IntVar(&s.TerminatedPodGCThreshold, "terminated-pod-gc-threshold", s.TerminatedPodGCThreshold, "Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.")
fs.DurationVar(&s.HorizontalPodAutoscalerSyncPeriod, "horizontal-pod-autoscaler-sync-period", s.HorizontalPodAutoscalerSyncPeriod, "The period for syncing the number of pods in horizontal pod autoscaler.")
fs.DurationVar(&s.DeploymentControllerSyncPeriod, "deployment-controller-sync-period", s.DeploymentControllerSyncPeriod, "Period for syncing the deployments.")
fs.DurationVar(&s.PodEvictionTimeout, "pod-eviction-timeout", s.PodEvictionTimeout, "The grace period for deleting pods on failed nodes.")
@ -244,6 +247,11 @@ func (s *CMServer) Run(_ []string) error {
go job.NewJobController(kubeClient).
Run(s.ConcurrentJobSyncs, util.NeverStop)
if s.TerminatedPodGCThreshold > 0 {
go gc.New(kubeClient, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
}
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
if err != nil {
glog.Fatalf("Cloud provider could not be initialized: %v", err)

View File

@ -292,4 +292,5 @@ www-prefix
retry_time
file_content_in_loop
cpu-cfs-quota
terminated-pod-gc-threshold

24
pkg/controller/gc/doc.go Normal file
View File

@ -0,0 +1,24 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package gc contains a very simple pod "garbage collector" implementation,
// GCController, that runs in the controller manager. If the number of pods
// in terminated phases (right now either Failed or Succeeded) surpasses a
// configurable threshold, the controller will delete pods in terminated state
// until the system reaches the allowed threshold again. The GCController
// prioritizes pods to delete by sorting by creation timestamp and deleting the
// oldest objects first. The GCController will not delete non-terminated pods.
package gc

View File

@ -0,0 +1,137 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gc
import (
"sort"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/watch"
"github.com/golang/glog"
)
const (
fullResyncPeriod = 0
gcCheckPeriod = 20 * time.Second
)
type GCController struct {
kubeClient client.Interface
podControl controller.PodControlInterface
podStore cache.StoreToPodLister
podStoreSyncer *framework.Controller
threshold int
}
func New(kubeClient client.Interface, threshold int) *GCController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
gcc := &GCController{
kubeClient: kubeClient,
podControl: controller.RealPodControl{
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "pod-garbage-collector"}),
KubeClient: kubeClient,
},
threshold: threshold,
}
terminatedSelector := compileTerminatedPodSelector()
gcc.podStore.Store, gcc.podStoreSyncer = framework.NewInformer(
&cache.ListWatch{
ListFunc: func() (runtime.Object, error) {
return gcc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), terminatedSelector)
},
WatchFunc: func(rv string) (watch.Interface, error) {
return gcc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), terminatedSelector, rv)
},
},
&api.Pod{},
fullResyncPeriod,
framework.ResourceEventHandlerFuncs{},
)
return gcc
}
func (gcc *GCController) Run(stop <-chan struct{}) {
go gcc.podStoreSyncer.Run(stop)
go util.Until(gcc.gc, gcCheckPeriod, stop)
<-stop
}
func (gcc *GCController) gc() {
terminatedPods, _ := gcc.podStore.List(labels.Everything())
terminatedPodCount := len(terminatedPods)
sort.Sort(byCreationTimestamp(terminatedPods))
deleteCount := terminatedPodCount - gcc.threshold
if deleteCount > terminatedPodCount {
deleteCount = terminatedPodCount
}
if deleteCount > 0 {
glog.Infof("garbage collecting %v pods", deleteCount)
}
var wait sync.WaitGroup
for i := 0; i < deleteCount; i++ {
wait.Add(1)
go func(namespace string, name string) {
defer wait.Done()
if err := gcc.podControl.DeletePod(namespace, name); err != nil {
// ignore not founds
defer util.HandleError(err)
}
}(terminatedPods[i].Namespace, terminatedPods[i].Name)
}
wait.Wait()
}
func compileTerminatedPodSelector() fields.Selector {
selector, err := fields.ParseSelector("status.phase!=" + string(api.PodPending) + ",status.phase!=" + string(api.PodRunning) + ",status.phase!=" + string(api.PodUnknown))
if err != nil {
panic("terminatedSelector must compile: " + err.Error())
}
return selector
}
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
type byCreationTimestamp []*api.Pod
func (o byCreationTimestamp) Len() int { return len(o) }
func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
func (o byCreationTimestamp) Less(i, j int) bool {
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
return o[i].Name < o[j].Name
}
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
}

View File

@ -0,0 +1,129 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package gc
import (
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
type FakePodControl struct {
podSpec []api.PodTemplateSpec
deletePodName []string
lock sync.Mutex
err error
}
func (f *FakePodControl) CreatePods(namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
panic("unimplemented")
}
func (f *FakePodControl) CreatePodsOnNode(nodeName, namespace string, spec *api.PodTemplateSpec, object runtime.Object) error {
panic("unimplemented")
}
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
f.lock.Lock()
defer f.lock.Unlock()
if f.err != nil {
return f.err
}
f.deletePodName = append(f.deletePodName, podName)
return nil
}
func (f *FakePodControl) clear() {
f.lock.Lock()
defer f.lock.Unlock()
f.deletePodName = []string{}
f.podSpec = []api.PodTemplateSpec{}
}
func TestGC(t *testing.T) {
testCases := []struct {
pods map[string]api.PodPhase
threshold int
deletedPodNames sets.String
}{
{
pods: map[string]api.PodPhase{
"a": api.PodFailed,
"b": api.PodSucceeded,
},
threshold: 0,
deletedPodNames: sets.NewString("a", "b"),
},
{
pods: map[string]api.PodPhase{
"a": api.PodFailed,
"b": api.PodSucceeded,
},
threshold: 1,
deletedPodNames: sets.NewString("a"),
},
{
pods: map[string]api.PodPhase{
"a": api.PodFailed,
"b": api.PodSucceeded,
},
threshold: 5,
deletedPodNames: sets.NewString(),
},
}
for i, test := range testCases {
client := testclient.NewSimpleFake()
gcc := New(client, test.threshold)
fake := &FakePodControl{}
gcc.podControl = fake
creationTime := time.Unix(0, 0)
for name, phase := range test.pods {
creationTime = creationTime.Add(1 * time.Hour)
gcc.podStore.Store.Add(&api.Pod{
ObjectMeta: api.ObjectMeta{Name: name, CreationTimestamp: unversioned.Time{creationTime}},
Status: api.PodStatus{Phase: phase},
})
}
gcc.gc()
pass := true
for _, pod := range fake.deletePodName {
if !test.deletedPodNames.Has(pod) {
pass = false
}
}
if len(fake.deletePodName) != len(test.deletedPodNames) {
pass = false
}
if !pass {
t.Errorf("[%v]pod's deleted expected and actual did not match.\n\texpected: %v\n\tactual: %v", i, test.deletedPodNames.List(), fake.deletePodName)
}
}
}
func TestTerminatedPodSelectorCompiles(t *testing.T) {
compileTerminatedPodSelector()
}