Add TTL GC controller

pull/8/head
Janet Kuo 2018-07-25 15:49:04 -07:00
parent 1d6dd86407
commit 5186807587
8 changed files with 563 additions and 0 deletions

View File

@ -68,6 +68,7 @@ go_library(
"//pkg/controller/serviceaccount:go_default_library", "//pkg/controller/serviceaccount:go_default_library",
"//pkg/controller/statefulset:go_default_library", "//pkg/controller/statefulset:go_default_library",
"//pkg/controller/ttl:go_default_library", "//pkg/controller/ttl:go_default_library",
"//pkg/controller/ttlafterfinished:go_default_library",
"//pkg/controller/volume/attachdetach:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/expand:go_default_library", "//pkg/controller/volume/expand:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library", "//pkg/controller/volume/persistentvolume:go_default_library",

View File

@ -378,6 +378,7 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc
controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController
controllers["pvc-protection"] = startPVCProtectionController controllers["pvc-protection"] = startPVCProtectionController
controllers["pv-protection"] = startPVProtectionController controllers["pv-protection"] = startPVProtectionController
controllers["ttl-after-finished-controller"] = startTTLAfterFinishedController
return controllers return controllers
} }

View File

@ -52,6 +52,7 @@ import (
servicecontroller "k8s.io/kubernetes/pkg/controller/service" servicecontroller "k8s.io/kubernetes/pkg/controller/service"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
"k8s.io/kubernetes/pkg/controller/ttlafterfinished"
"k8s.io/kubernetes/pkg/controller/volume/attachdetach" "k8s.io/kubernetes/pkg/controller/volume/attachdetach"
"k8s.io/kubernetes/pkg/controller/volume/expand" "k8s.io/kubernetes/pkg/controller/volume/expand"
persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume" persistentvolumecontroller "k8s.io/kubernetes/pkg/controller/volume/persistentvolume"
@ -417,3 +418,14 @@ func startPVProtectionController(ctx ControllerContext) (http.Handler, bool, err
).Run(1, ctx.Stop) ).Run(1, ctx.Stop)
return nil, true, nil return nil, true, nil
} }
func startTTLAfterFinishedController(ctx ControllerContext) (http.Handler, bool, error) {
if !utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
return nil, false, nil
}
go ttlafterfinished.New(
ctx.InformerFactory.Batch().V1().Jobs(),
ctx.ClientBuilder.ClientOrDie("ttl-after-finished-controller"),
).Run(5, ctx.Stop)
return nil, true, nil
}

View File

@ -131,6 +131,7 @@ filegroup(
"//pkg/controller/statefulset:all-srcs", "//pkg/controller/statefulset:all-srcs",
"//pkg/controller/testutil:all-srcs", "//pkg/controller/testutil:all-srcs",
"//pkg/controller/ttl:all-srcs", "//pkg/controller/ttl:all-srcs",
"//pkg/controller/ttlafterfinished:all-srcs",
"//pkg/controller/util/node:all-srcs", "//pkg/controller/util/node:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs", "//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/events:all-srcs", "//pkg/controller/volume/events:all-srcs",

View File

@ -0,0 +1,54 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "go_default_library",
srcs = ["ttlafterfinished_controller.go"],
importpath = "k8s.io/kubernetes/pkg/controller/ttlafterfinished",
visibility = ["//visibility:public"],
deps = [
"//pkg/controller:go_default_library",
"//pkg/controller/job:go_default_library",
"//pkg/kubectl/scheme:go_default_library",
"//pkg/util/metrics:go_default_library",
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/clock:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library",
"//staging/src/k8s.io/client-go/informers/batch/v1:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
"//staging/src/k8s.io/client-go/kubernetes/typed/core/v1:go_default_library",
"//staging/src/k8s.io/client-go/listers/batch/v1:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//staging/src/k8s.io/client-go/tools/record:go_default_library",
"//staging/src/k8s.io/client-go/util/workqueue:go_default_library",
"//vendor/github.com/golang/glog:go_default_library",
],
)
go_test(
name = "go_default_test",
srcs = ["ttlafterfinished_controller_test.go"],
embed = [":go_default_library"],
deps = [
"//staging/src/k8s.io/api/batch/v1:go_default_library",
"//staging/src/k8s.io/api/core/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
visibility = ["//visibility:public"],
)

View File

@ -0,0 +1,308 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttlafterfinished
import (
"fmt"
"time"
"github.com/golang/glog"
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/clock"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
batchinformers "k8s.io/client-go/informers/batch/v1"
clientset "k8s.io/client-go/kubernetes"
v1core "k8s.io/client-go/kubernetes/typed/core/v1"
batchlisters "k8s.io/client-go/listers/batch/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/controller"
jobutil "k8s.io/kubernetes/pkg/controller/job"
"k8s.io/kubernetes/pkg/kubectl/scheme"
"k8s.io/kubernetes/pkg/util/metrics"
)
// Controller watches for changes of Jobs API objects. Triggered by Job creation
// and updates, it enqueues Jobs that have non-nil `.spec.ttlSecondsAfterFinished`
// to the `queue`. The Controller has workers who consume `queue`, check whether
// the Job TTL has expired or not; if the Job TTL hasn't expired, it will add the
// Job to the queue after the TTL is expected to expire; if the TTL has expired, the
// worker will send requests to the API server to delete the Jobs accordingly.
// This is implemented outside of Job controller for separation of concerns, and
// because it will be extended to handle other finishable resource types.
type Controller struct {
client clientset.Interface
recorder record.EventRecorder
// jLister can list/get Jobs from the shared informer's store
jLister batchlisters.JobLister
// jStoreSynced returns true if the Job store has been synced at least once.
// Added as a member to the struct to allow injection for testing.
jListerSynced cache.InformerSynced
// Jobs that the controller will check its TTL and attempt to delete when the TTL expires.
queue workqueue.RateLimitingInterface
// The clock for tracking time
clock clock.Clock
}
// New creates an instance of Controller
func New(jobInformer batchinformers.JobInformer, client clientset.Interface) *Controller {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: client.CoreV1().Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("ttl_after_finished_controller", client.CoreV1().RESTClient().GetRateLimiter())
}
tc := &Controller{
client: client,
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "ttl-after-finished-controller"}),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttl_jobs_to_delete"),
}
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: tc.addJob,
UpdateFunc: tc.updateJob,
})
tc.jLister = jobInformer.Lister()
tc.jListerSynced = jobInformer.Informer().HasSynced
tc.clock = clock.RealClock{}
return tc
}
// Run starts the workers to clean up Jobs.
func (tc *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer tc.queue.ShutDown()
glog.Infof("Starting TTL after finished controller")
defer glog.Infof("Shutting down TTL after finished controller")
if !controller.WaitForCacheSync("TTL after finished", stopCh, tc.jListerSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(tc.worker, time.Second, stopCh)
}
<-stopCh
}
func (tc *Controller) addJob(obj interface{}) {
job := obj.(*batch.Job)
glog.V(4).Infof("Adding job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(job)
}
}
func (tc *Controller) updateJob(old, cur interface{}) {
job := cur.(*batch.Job)
glog.V(4).Infof("Updating job %s/%s", job.Namespace, job.Name)
if job.DeletionTimestamp == nil && needsCleanup(job) {
tc.enqueue(job)
}
}
func (tc *Controller) enqueue(job *batch.Job) {
glog.V(4).Infof("Add job %s/%s to cleanup", job.Namespace, job.Name)
key, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
return
}
tc.queue.Add(key)
}
func (tc *Controller) enqueueAfter(job *batch.Job, after time.Duration) {
key, err := controller.KeyFunc(job)
if err != nil {
utilruntime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", job, err))
return
}
tc.queue.AddAfter(key, after)
}
func (tc *Controller) worker() {
for tc.processNextWorkItem() {
}
}
func (tc *Controller) processNextWorkItem() bool {
key, quit := tc.queue.Get()
if quit {
return false
}
defer tc.queue.Done(key)
err := tc.processJob(key.(string))
tc.handleErr(err, key)
return true
}
func (tc *Controller) handleErr(err error, key interface{}) {
if err == nil {
tc.queue.Forget(key)
return
}
utilruntime.HandleError(fmt.Errorf("error cleaning up Job %v, will retry: %v", key, err))
tc.queue.AddRateLimited(key)
}
// processJob will check the Job's state and TTL and delete the Job when it
// finishes and its TTL after finished has expired. If the Job hasn't finished or
// its TTL hasn't expired, it will be added to the queue after the TTL is expected
// to expire.
// This function is not meant to be invoked concurrently with the same key.
func (tc *Controller) processJob(key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
glog.V(4).Infof("Checking if Job %s/%s is ready for cleanup", namespace, name)
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
job, err := tc.jLister.Jobs(namespace).Get(name)
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if expired, err := tc.processTTL(job); err != nil {
return err
} else if !expired {
return nil
}
// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
// Before deleting the Job, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
// The latest Job may have a different UID, but it's fine because the checks will be run again.
fresh, err := tc.client.BatchV1().Jobs(namespace).Get(name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
if expired, err := tc.processTTL(fresh); err != nil {
return err
} else if !expired {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
policy := metav1.DeletePropagationForeground
options := &metav1.DeleteOptions{
PropagationPolicy: &policy,
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
glog.V(4).Infof("Cleaning up Job %s/%s", namespace, name)
return tc.client.BatchV1().Jobs(fresh.Namespace).Delete(fresh.Name, options)
}
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (tc *Controller) processTTL(job *batch.Job) (expired bool, err error) {
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
if job.DeletionTimestamp != nil || !needsCleanup(job) {
return false, nil
}
now := tc.clock.Now()
t, err := timeLeft(job, &now)
if err != nil {
return false, err
}
// TTL has expired
if *t <= 0 {
return true, nil
}
tc.enqueueAfter(job, *t)
return false, nil
}
// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *batch.Job) bool {
return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
}
func getFinishAndExpireTime(j *batch.Job) (*time.Time, *time.Time, error) {
if !needsCleanup(j) {
return nil, nil, fmt.Errorf("Job %s/%s should not be cleaned up", j.Namespace, j.Name)
}
finishAt, err := jobFinishTime(j)
if err != nil {
return nil, nil, err
}
finishAtUTC := finishAt.UTC()
expireAtUTC := finishAtUTC.Add(time.Duration(*j.Spec.TTLSecondsAfterFinished) * time.Second)
return &finishAtUTC, &expireAtUTC, nil
}
func timeLeft(j *batch.Job, since *time.Time) (*time.Duration, error) {
finishAt, expireAt, err := getFinishAndExpireTime(j)
if err != nil {
return nil, err
}
if finishAt.UTC().After(since.UTC()) {
glog.Warningf("Warning: Found Job %s/%s finished in the future. This is likely due to time skew in the cluster. Job cleanup will be deferred.", j.Namespace, j.Name)
}
remaining := expireAt.UTC().Sub(since.UTC())
glog.V(4).Infof("Found Job %s/%s finished at %v, remaining TTL %v since %v, TTL will expire at %v", j.Namespace, j.Name, finishAt.UTC(), remaining, since.UTC(), expireAt.UTC())
return &remaining, nil
}
// jobFinishTime takes an already finished Job and returns the time it finishes.
func jobFinishTime(finishedJob *batch.Job) (metav1.Time, error) {
for _, c := range finishedJob.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
finishAt := c.LastTransitionTime
if finishAt.IsZero() {
return metav1.Time{}, fmt.Errorf("unable to find the time when the Job %s/%s finished", finishedJob.Namespace, finishedJob.Name)
}
return c.LastTransitionTime, nil
}
}
// This should never happen if the Jobs has finished
return metav1.Time{}, fmt.Errorf("unable to find the status of the finished Job %s/%s", finishedJob.Namespace, finishedJob.Name)
}

View File

@ -0,0 +1,177 @@
/*
Copyright 2018 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttlafterfinished
import (
"strings"
"testing"
"time"
batch "k8s.io/api/batch/v1"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)
func newJob(completionTime, failedTime metav1.Time, ttl *int32) *batch.Job {
j := &batch.Job{
TypeMeta: metav1.TypeMeta{Kind: "Job"},
ObjectMeta: metav1.ObjectMeta{
Name: "foobar",
Namespace: metav1.NamespaceDefault,
},
Spec: batch.JobSpec{
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{"foo": "bar"},
},
Template: v1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"foo": "bar",
},
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{Image: "foo/bar"},
},
},
},
},
}
if !completionTime.IsZero() {
c := batch.JobCondition{Type: batch.JobComplete, Status: v1.ConditionTrue, LastTransitionTime: completionTime}
j.Status.Conditions = append(j.Status.Conditions, c)
}
if !failedTime.IsZero() {
c := batch.JobCondition{Type: batch.JobFailed, Status: v1.ConditionTrue, LastTransitionTime: failedTime}
j.Status.Conditions = append(j.Status.Conditions, c)
}
if ttl != nil {
j.Spec.TTLSecondsAfterFinished = ttl
}
return j
}
func durationPointer(n int) *time.Duration {
s := time.Duration(n) * time.Second
return &s
}
func int32Ptr(n int32) *int32 {
return &n
}
func TestTimeLeft(t *testing.T) {
now := metav1.Now()
testCases := []struct {
name string
completionTime metav1.Time
failedTime metav1.Time
ttl *int32
since *time.Time
expectErr bool
expectErrStr string
expectedTimeLeft *time.Duration
}{
{
name: "Error case: Job unfinished",
ttl: int32Ptr(100),
since: &now.Time,
expectErr: true,
expectErrStr: "should not be cleaned up",
},
{
name: "Error case: Job completed now, no TTL",
completionTime: now,
since: &now.Time,
expectErr: true,
expectErrStr: "should not be cleaned up",
},
{
name: "Job completed now, 0s TTL",
completionTime: now,
ttl: int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
},
{
name: "Job completed now, 10s TTL",
completionTime: now,
ttl: int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
},
{
name: "Job completed 10s ago, 15s TTL",
completionTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
},
{
name: "Error case: Job failed now, no TTL",
failedTime: now,
since: &now.Time,
expectErr: true,
expectErrStr: "should not be cleaned up",
},
{
name: "Job failed now, 0s TTL",
failedTime: now,
ttl: int32Ptr(0),
since: &now.Time,
expectedTimeLeft: durationPointer(0),
},
{
name: "Job failed now, 10s TTL",
failedTime: now,
ttl: int32Ptr(10),
since: &now.Time,
expectedTimeLeft: durationPointer(10),
},
{
name: "Job failed 10s ago, 15s TTL",
failedTime: metav1.NewTime(now.Add(-10 * time.Second)),
ttl: int32Ptr(15),
since: &now.Time,
expectedTimeLeft: durationPointer(5),
},
}
for _, tc := range testCases {
job := newJob(tc.completionTime, tc.failedTime, tc.ttl)
gotTimeLeft, gotErr := timeLeft(job, tc.since)
if tc.expectErr != (gotErr != nil) {
t.Errorf("%s: expected error is %t, got %t, error: %v", tc.name, tc.expectErr, gotErr != nil, gotErr)
}
if tc.expectErr && len(tc.expectErrStr) == 0 {
t.Errorf("%s: invalid test setup; error message must not be empty for error cases", tc.name)
}
if tc.expectErr && !strings.Contains(gotErr.Error(), tc.expectErrStr) {
t.Errorf("%s: expected error message contains %q, got %v", tc.name, tc.expectErrStr, gotErr)
}
if !tc.expectErr {
if *gotTimeLeft != *tc.expectedTimeLeft {
t.Errorf("%s: expected time left %v, got %v", tc.name, tc.expectedTimeLeft, gotTimeLeft)
}
}
}
}

View File

@ -340,6 +340,15 @@ func buildControllerRoles() ([]rbacv1.ClusterRole, []rbacv1.ClusterRoleBinding)
eventsRule(), eventsRule(),
}, },
}) })
if utilfeature.DefaultFeatureGate.Enabled(features.TTLAfterFinished) {
addControllerRole(&controllerRoles, &controllerRoleBindings, rbacv1.ClusterRole{
ObjectMeta: metav1.ObjectMeta{Name: saRolePrefix + "ttl-after-finished-controller"},
Rules: []rbacv1.PolicyRule{
rbacv1helpers.NewRule("get", "list", "watch", "delete").Groups(batchGroup).Resources("jobs").RuleOrDie(),
eventsRule(),
},
})
}
return controllerRoles, controllerRoleBindings return controllerRoles, controllerRoleBindings
} }