Implement ttl controller

pull/6/head
Wojciech Tyczynski 2017-01-30 11:48:15 +01:00
parent 46becf2c81
commit 3aebc4c003
9 changed files with 614 additions and 0 deletions

View File

@ -64,6 +64,7 @@ go_library(
"//pkg/controller/service:go_default_library",
"//pkg/controller/serviceaccount:go_default_library",
"//pkg/controller/statefulset:go_default_library",
"//pkg/controller/ttl:go_default_library",
"//pkg/controller/volume/attachdetach:go_default_library",
"//pkg/controller/volume/persistentvolume:go_default_library",
"//pkg/features:go_default_library",

View File

@ -287,6 +287,7 @@ func newControllerInitializers() map[string]InitFunc {
controllers["statefuleset"] = startStatefulSetController
controllers["cronjob"] = startCronJobController
controllers["certificatesigningrequests"] = startCSRController
controllers["ttl"] = startTTLController
return controllers
}

View File

@ -40,6 +40,7 @@ import (
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl"
quotainstall "k8s.io/kubernetes/pkg/quota/install"
)
@ -141,6 +142,14 @@ func startServiceAccountController(ctx ControllerContext) (bool, error) {
return true, nil
}
func startTTLController(ctx ControllerContext) (bool, error) {
go ttlcontroller.NewTTLController(
ctx.InformerFactory.Nodes(),
ctx.ClientBuilder.ClientOrDie("ttl-controller"),
).Run(5, ctx.Stop)
return true, nil
}
func startGarbageCollectorController(ctx ControllerContext) (bool, error) {
if !ctx.Options.EnableGarbageCollector {
return false, nil

View File

@ -466,6 +466,11 @@ const (
// is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet
// will fail to launch.
UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls"
// ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache
// an object (e.g. secret, config map) before fetching it again from apiserver.
// This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
)
// TolerationToleratesTaint checks if the toleration tolerates the taint.

View File

@ -271,6 +271,11 @@ const (
// is at-your-own-risk. Pods that attempt to set an unsafe sysctl that is not enabled for a kubelet
// will fail to launch.
UnsafeSysctlsPodAnnotationKey string = "security.alpha.kubernetes.io/unsafe-sysctls"
// ObjectTTLAnnotations represents a suggestion for kubelet for how long it can cache
// an object (e.g. secret, config map) before fetching it again from apiserver.
// This annotation can be attached to node.
ObjectTTLAnnotationKey string = "node.alpha.kubernetes.io/ttl"
)
// GetTolerationsFromPodAnnotations gets the json serialized tolerations data from Pod.Annotations

View File

@ -105,6 +105,7 @@ filegroup(
"//pkg/controller/service:all-srcs",
"//pkg/controller/serviceaccount:all-srcs",
"//pkg/controller/statefulset:all-srcs",
"//pkg/controller/ttl:all-srcs",
"//pkg/controller/volume/attachdetach:all-srcs",
"//pkg/controller/volume/persistentvolume:all-srcs",
],

62
pkg/controller/ttl/BUILD Normal file
View File

@ -0,0 +1,62 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
name = "go_default_library",
srcs = ["ttlcontroller.go"],
tags = ["automanaged"],
deps = [
"//pkg/api:go_default_library",
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//pkg/controller:go_default_library",
"//pkg/controller/informers:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:k8s.io/apimachinery/pkg/api/errors",
"//vendor:k8s.io/apimachinery/pkg/types",
"//vendor:k8s.io/apimachinery/pkg/util/json",
"//vendor:k8s.io/apimachinery/pkg/util/runtime",
"//vendor:k8s.io/apimachinery/pkg/util/strategicpatch",
"//vendor:k8s.io/apimachinery/pkg/util/wait",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)
go_test(
name = "go_default_test",
srcs = ["ttlcontroller_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/api/v1:go_default_library",
"//pkg/client/clientset_generated/clientset/fake:go_default_library",
"//pkg/client/listers/core/v1:go_default_library",
"//vendor:github.com/stretchr/testify/assert",
"//vendor:k8s.io/apimachinery/pkg/apis/meta/v1",
"//vendor:k8s.io/client-go/testing",
"//vendor:k8s.io/client-go/tools/cache",
"//vendor:k8s.io/client-go/util/workqueue",
],
)

View File

@ -0,0 +1,299 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// The TTLController sets ttl annotations on nodes, based on cluster size.
// The annotations are consumed by Kubelets as suggestions for how long
// it can cache objects (e.g. secrets or config maps) before refetching
// from apiserver again.
//
// TODO: This is a temporary workaround for the Kubelet not being able to
// send "watch secrets attached to pods from my node" request. Once
// sending such request will be possible, we will modify Kubelet to
// use it and get rid of this controller completely.
package ttl
import (
"fmt"
"math"
"strconv"
"sync"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/json"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/informers"
"github.com/golang/glog"
)
type TTLController struct {
kubeClient clientset.Interface
// nodeStore is a local cache of nodes.
nodeStore listers.NodeLister
// Nodes that need to be synced.
queue workqueue.RateLimitingInterface
// Returns true if all underlying informers are synced.
hasSynced func() bool
lock sync.RWMutex
// Number of nodes in the cluster.
nodeCount int
// Desired TTL for all nodes in the cluster.
desiredTTLSeconds int
// In which interval of cluster size we currently are.
boundaryStep int
}
func NewTTLController(nodeInformer informers.NodeInformer, kubeClient clientset.Interface) *TTLController {
ttlc := &TTLController{
kubeClient: kubeClient,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ttlcontroller"),
}
nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: ttlc.addNode,
UpdateFunc: ttlc.updateNode,
DeleteFunc: ttlc.deleteNode,
})
ttlc.nodeStore = listers.NewNodeLister(nodeInformer.Informer().GetIndexer())
ttlc.hasSynced = nodeInformer.Informer().HasSynced
return ttlc
}
type ttlBoundary struct {
sizeMin int
sizeMax int
ttlSeconds int
}
var (
ttlBoundaries = []ttlBoundary{
{sizeMin: 0, sizeMax: 100, ttlSeconds: 0},
{sizeMin: 90, sizeMax: 500, ttlSeconds: 15},
{sizeMin: 450, sizeMax: 1000, ttlSeconds: 30},
{sizeMin: 900, sizeMax: 2000, ttlSeconds: 60},
{sizeMin: 1800, sizeMax: 10000, ttlSeconds: 300},
{sizeMin: 9000, sizeMax: math.MaxInt32, ttlSeconds: 600},
}
)
func (ttlc *TTLController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
defer ttlc.queue.ShutDown()
glog.Infof("Starting TTL controller")
defer glog.Infof("Shutting down TTL controller")
if !cache.WaitForCacheSync(stopCh, ttlc.hasSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(ttlc.worker, time.Second, stopCh)
}
<-stopCh
}
func (ttlc *TTLController) addNode(obj interface{}) {
node, ok := obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
func() {
ttlc.lock.Lock()
defer ttlc.lock.Unlock()
ttlc.nodeCount++
if ttlc.nodeCount > ttlBoundaries[ttlc.boundaryStep].sizeMax {
ttlc.boundaryStep++
ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
}
}()
ttlc.enqueueNode(node)
}
func (ttlc *TTLController) updateNode(_, newObj interface{}) {
node, ok := newObj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", newObj))
return
}
// Processing all updates of nodes guarantees that we will update
// the ttl annotation, when cluster size changes.
// We are relying on the fact that Kubelet is updating node status
// every 10s (or generally every X seconds), which means that whenever
// required, its ttl annotation should be updated within that period.
ttlc.enqueueNode(node)
}
func (ttlc *TTLController) deleteNode(obj interface{}) {
_, ok := obj.(*v1.Node)
if !ok {
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object type: %v", obj))
return
}
_, ok = tombstone.Obj.(*v1.Node)
if !ok {
utilruntime.HandleError(fmt.Errorf("unexpected object types: %v", obj))
return
}
}
func() {
ttlc.lock.Lock()
defer ttlc.lock.Unlock()
ttlc.nodeCount--
if ttlc.nodeCount < ttlBoundaries[ttlc.boundaryStep].sizeMin {
ttlc.boundaryStep--
ttlc.desiredTTLSeconds = ttlBoundaries[ttlc.boundaryStep].ttlSeconds
}
}()
// We are not processing the node, as it no longer exists.
}
func (ttlc *TTLController) enqueueNode(node *v1.Node) {
key, err := controller.KeyFunc(node)
if err != nil {
glog.Errorf("Couldn't get key for object %+v", node)
return
}
ttlc.queue.Add(key)
}
func (ttlc *TTLController) worker() {
for ttlc.processItem() {
if quit := ttlc.processItem(); quit {
glog.Infof("TTL controller worker shutting down")
return
}
}
}
func (ttlc *TTLController) processItem() bool {
key, quit := ttlc.queue.Get()
if quit {
return false
}
defer ttlc.queue.Done(key)
err := ttlc.updateNodeIfNeeded(key.(string))
if err == nil {
ttlc.queue.Forget(key)
return true
}
ttlc.queue.AddRateLimited(key)
utilruntime.HandleError(err)
return true
}
func (ttlc *TTLController) getDesiredTTLSeconds() int {
ttlc.lock.RLock()
defer ttlc.lock.RUnlock()
return ttlc.desiredTTLSeconds
}
func getIntFromAnnotation(node *v1.Node, annotationKey string) (int, bool) {
if node.Annotations == nil {
return 0, false
}
annotationValue, ok := node.Annotations[annotationKey]
if !ok {
return 0, false
}
intValue, err := strconv.Atoi(annotationValue)
if err != nil {
glog.Warningf("Cannot convert the value %q with annotation key %q for the node %q",
annotationValue, annotationKey, node.Name)
return 0, false
}
return intValue, true
}
func setIntAnnotation(node *v1.Node, annotationKey string, value int) {
if node.Annotations == nil {
node.Annotations = make(map[string]string)
}
node.Annotations[annotationKey] = strconv.Itoa(value)
}
func (ttlc *TTLController) patchNodeWithAnnotation(node *v1.Node, annotationKey string, value int) error {
oldData, err := json.Marshal(node)
if err != nil {
return err
}
setIntAnnotation(node, annotationKey, value)
newData, err := json.Marshal(node)
if err != nil {
return err
}
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, &v1.Node{})
if err != nil {
return err
}
_, err = ttlc.kubeClient.Core().Nodes().Patch(node.Name, types.StrategicMergePatchType, patchBytes)
if err != nil {
glog.V(2).Infof("Failed to change ttl annotation for node %s: %v", node.Name, err)
return err
}
glog.V(2).Infof("Changed ttl annotation for node %s to %d seconds", node.Name, value)
return nil
}
func (ttlc *TTLController) updateNodeIfNeeded(key string) error {
node, err := ttlc.nodeStore.Get(key)
if err != nil {
if apierrors.IsNotFound(err) {
return nil
}
return err
}
desiredTTL := ttlc.getDesiredTTLSeconds()
desiredTTL := ttlc.getDesiredTTLSeconds()
currentTTL, ok := getIntFromAnnotation(node, v1.ObjectTTLAnnotationKey)
if ok && currentTTL == desiredTTL {
return nil
}
objCopy, err := api.Scheme.DeepCopy(node)
if err != nil {
return err
}
return ttlc.patchNodeWithAnnotation(objCopy.(*v1.Node), v1.ObjectTTLAnnotationKey, desiredTTL)
}

View File

@ -0,0 +1,231 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package ttl
import (
"testing"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
core "k8s.io/client-go/testing"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
listers "k8s.io/kubernetes/pkg/client/listers/core/v1"
"github.com/stretchr/testify/assert"
)
func TestPatchNode(t *testing.T) {
testCases := []struct {
node *v1.Node
ttlSeconds int
patch string
}{
{
node: &v1.Node{},
ttlSeconds: 0,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}",
},
{
node: &v1.Node{},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0", "a": "b"}}},
ttlSeconds: 10,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"10\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "10", "a": "b"}}},
ttlSeconds: 10,
patch: "{}",
},
}
for i, testCase := range testCases {
fakeClient := &fake.Clientset{}
ttlController := &TTLController{
kubeClient: fakeClient,
}
err := ttlController.patchNodeWithAnnotation(testCase.node, v1.ObjectTTLAnnotationKey, testCase.ttlSeconds)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
actions := fakeClient.Actions()
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
patchAction := actions[0].(core.PatchActionImpl)
assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch))
}
}
func TestUpdateNodeIfNeeded(t *testing.T) {
testCases := []struct {
node *v1.Node
desiredTTL int
patch string
}{
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 0,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"0\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 15,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"15\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name"}},
desiredTTL: 30,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "0"}}},
desiredTTL: 60,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"60\"}}}",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}},
desiredTTL: 60,
patch: "",
},
{
node: &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "name", Annotations: map[string]string{"node.alpha.kubernetes.io/ttl": "60"}}},
desiredTTL: 30,
patch: "{\"metadata\":{\"annotations\":{\"node.alpha.kubernetes.io/ttl\":\"30\"}}}",
},
}
for i, testCase := range testCases {
fakeClient := &fake.Clientset{}
nodeStore := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{})
nodeStore.Add(testCase.node)
ttlController := &TTLController{
kubeClient: fakeClient,
nodeStore: listers.NewNodeLister(nodeStore),
desiredTTLSeconds: testCase.desiredTTL,
}
if err := ttlController.updateNodeIfNeeded(testCase.node.Name); err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
actions := fakeClient.Actions()
if testCase.patch == "" {
assert.Equal(t, 0, len(actions), "unexpected actions: %#v", actions)
} else {
assert.Equal(t, 1, len(actions), "unexpected actions: %#v", actions)
patchAction := actions[0].(core.PatchActionImpl)
assert.Equal(t, testCase.patch, string(patchAction.Patch), "%d: unexpected patch: %s", i, string(patchAction.Patch))
}
}
}
func TestDesiredTTL(t *testing.T) {
testCases := []struct {
addNode bool
deleteNode bool
nodeCount int
desiredTTL int
boundaryStep int
expectedTTL int
}{
{
addNode: true,
nodeCount: 0,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 0,
},
{
addNode: true,
nodeCount: 99,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 0,
},
{
addNode: true,
nodeCount: 100,
desiredTTL: 0,
boundaryStep: 0,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 101,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 91,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
addNode: true,
nodeCount: 91,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 15,
},
{
deleteNode: true,
nodeCount: 90,
desiredTTL: 15,
boundaryStep: 1,
expectedTTL: 0,
},
}
for i, testCase := range testCases {
ttlController := &TTLController{
queue: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()),
nodeCount: testCase.nodeCount,
desiredTTLSeconds: testCase.desiredTTL,
boundaryStep: testCase.boundaryStep,
}
if testCase.addNode {
ttlController.addNode(&v1.Node{})
}
if testCase.deleteNode {
ttlController.deleteNode(&v1.Node{})
}
assert.Equal(t, testCase.expectedTTL, ttlController.getDesiredTTLSeconds(),
"%d: unexpected ttl: %d", i, ttlController.getDesiredTTLSeconds())
}
}