Merge pull request #53592 from frodenas/bootstrap-controller

Automatic merge from submit-queue (batch tested with PRs 53592, 52562, 55175, 55213). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Check RegisterMetricAndTrackRateLimiterUsage error when starting BootstrapSigner & TokenCleaner controllers

**What this PR does / why we need it**:
Prevent `BootstrapSigner` and `TokenCleaner` controllers to start if `metrics.RegisterMetricAndTrackRateLimiterUsage` returns an error.

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: complements #53571 

**Special notes for your reviewer**:

**Release note**:

```release-note
NONE
```
pull/6/head
Kubernetes Submit Queue 2017-11-07 11:21:15 -08:00 committed by GitHub
commit 576c9118a6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 634 additions and 186 deletions

View File

@ -21,6 +21,8 @@ limitations under the License.
package app
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/cronjob"
"k8s.io/kubernetes/pkg/controller/job"
@ -42,8 +44,12 @@ func startCronJobController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "batch", Version: "v1beta1", Resource: "cronjobs"}] {
return false, nil
}
go cronjob.NewCronJobController(
cjc, err := cronjob.NewCronJobController(
ctx.ClientBuilder.ClientOrDie("cronjob-controller"),
).Run(ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating CronJob controller: %v", err)
}
go cjc.Run(ctx.Stop)
return true, nil
}

View File

@ -16,20 +16,32 @@ limitations under the License.
package app
import "k8s.io/kubernetes/pkg/controller/bootstrap"
import (
"fmt"
"k8s.io/kubernetes/pkg/controller/bootstrap"
)
func startBootstrapSignerController(ctx ControllerContext) (bool, error) {
go bootstrap.NewBootstrapSigner(
bsc, err := bootstrap.NewBootstrapSigner(
ctx.ClientBuilder.ClientGoClientOrDie("bootstrap-signer"),
bootstrap.DefaultBootstrapSignerOptions(),
).Run(ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating BootstrapSigner controller: %v", err)
}
go bsc.Run(ctx.Stop)
return true, nil
}
func startTokenCleanerController(ctx ControllerContext) (bool, error) {
go bootstrap.NewTokenCleaner(
tcc, err := bootstrap.NewTokenCleaner(
ctx.ClientBuilder.ClientGoClientOrDie("token-cleaner"),
bootstrap.DefaultTokenCleanerOptions(),
).Run(ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating TokenCleaner controller: %v", err)
}
go tcc.Run(ctx.Stop)
return true, nil
}

View File

@ -525,7 +525,7 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
rootCA = c.rootClientBuilder.ConfigOrDie("tokens-controller").CAData
}
controller := serviceaccountcontroller.NewTokensController(
controller, err := serviceaccountcontroller.NewTokensController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Secrets(),
c.rootClientBuilder.ClientOrDie("tokens-controller"),
@ -534,6 +534,9 @@ func (c serviceAccountTokenControllerStarter) startServiceAccountTokenController
RootCA: rootCA,
},
)
if err != nil {
return true, fmt.Errorf("error creating Tokens controller: %v", err)
}
go controller.Run(int(ctx.Options.ConcurrentSATokenSyncs), ctx.Stop)
// start the first set of informers now so that other controllers can start

View File

@ -68,6 +68,7 @@ func startServiceController(ctx ControllerContext) (bool, error) {
ctx.Options.ClusterName,
)
if err != nil {
// This error shouldn't fail. It lives like this as a legacy.
glog.Errorf("Failed to start service controller: %v", err)
return false, nil
}
@ -258,7 +259,9 @@ func startResourceQuotaController(ctx ControllerContext) (bool, error) {
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
}
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return true, err
}
}
resourceQuotaController, err := resourcequotacontroller.NewResourceQuotaController(resourceQuotaControllerOptions)
@ -302,12 +305,16 @@ func startNamespaceController(ctx ControllerContext) (bool, error) {
}
func startServiceAccountController(ctx ControllerContext) (bool, error) {
go serviceaccountcontroller.NewServiceAccountsController(
sac, err := serviceaccountcontroller.NewServiceAccountsController(
ctx.InformerFactory.Core().V1().ServiceAccounts(),
ctx.InformerFactory.Core().V1().Namespaces(),
ctx.ClientBuilder.ClientOrDie("service-account-controller"),
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
).Run(1, ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating ServiceAccount controller: %v", err)
}
go sac.Run(1, ctx.Stop)
return true, nil
}

View File

@ -21,6 +21,8 @@ limitations under the License.
package app
import (
"fmt"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/controller/deployment"
@ -31,13 +33,17 @@ func startDaemonSetController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "daemonsets"}] {
return false, nil
}
go daemon.NewDaemonSetsController(
dsc, err := daemon.NewDaemonSetsController(
ctx.InformerFactory.Extensions().V1beta1().DaemonSets(),
ctx.InformerFactory.Apps().V1beta1().ControllerRevisions(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.InformerFactory.Core().V1().Nodes(),
ctx.ClientBuilder.ClientOrDie("daemon-set-controller"),
).Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating DaemonSets controller: %v", err)
}
go dsc.Run(int(ctx.Options.ConcurrentDaemonSetSyncs), ctx.Stop)
return true, nil
}
@ -45,12 +51,16 @@ func startDeploymentController(ctx ControllerContext) (bool, error) {
if !ctx.AvailableResources[schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}] {
return false, nil
}
go deployment.NewDeploymentController(
dc, err := deployment.NewDeploymentController(
ctx.InformerFactory.Extensions().V1beta1().Deployments(),
ctx.InformerFactory.Extensions().V1beta1().ReplicaSets(),
ctx.InformerFactory.Core().V1().Pods(),
ctx.ClientBuilder.ClientOrDie("deployment-controller"),
).Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
)
if err != nil {
return true, fmt.Errorf("error creating Deployment controller: %v", err)
}
go dc.Run(int(ctx.Options.ConcurrentDeploymentSyncs), ctx.Stop)
return true, nil
}

View File

@ -92,7 +92,7 @@ type BootstrapSigner struct {
// NewBootstrapSigner returns a new *BootstrapSigner.
//
// TODO: Switch to shared informers
func NewBootstrapSigner(cl clientset.Interface, options BootstrapSignerOptions) *BootstrapSigner {
func NewBootstrapSigner(cl clientset.Interface, options BootstrapSignerOptions) (*BootstrapSigner, error) {
e := &BootstrapSigner{
client: cl,
configMapKey: options.ConfigMapNamespace + "/" + options.ConfigMapName,
@ -100,7 +100,9 @@ func NewBootstrapSigner(cl clientset.Interface, options BootstrapSignerOptions)
syncQueue: workqueue.NewNamed("bootstrap_signer_queue"),
}
if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("bootstrap_signer", cl.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("bootstrap_signer", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
configMapSelector := fields.SelectorFromSet(map[string]string{api.ObjectNameField: options.ConfigMapName})
e.configMaps, e.configMapsController = cache.NewInformer(
@ -142,7 +144,7 @@ func NewBootstrapSigner(cl clientset.Interface, options BootstrapSignerOptions)
DeleteFunc: func(_ interface{}) { e.pokeConfigMapSync() },
},
)
return e
return e, nil
}
// Run runs controller loops and returns when they are done

View File

@ -36,10 +36,14 @@ func init() {
const testTokenID = "abc123"
func newBootstrapSigner() (*BootstrapSigner, *fake.Clientset) {
func newBootstrapSigner() (*BootstrapSigner, *fake.Clientset, error) {
options := DefaultBootstrapSignerOptions()
cl := fake.NewSimpleClientset()
return NewBootstrapSigner(cl, options), cl
bsc, err := NewBootstrapSigner(cl, options)
if err != nil {
return nil, nil, err
}
return bsc, cl, nil
}
func newConfigMap(tokenID, signature string) *v1.ConfigMap {
@ -60,13 +64,19 @@ func newConfigMap(tokenID, signature string) *v1.ConfigMap {
}
func TestNoConfigMap(t *testing.T) {
signer, cl := newBootstrapSigner()
signer, cl, err := newBootstrapSigner()
if err != nil {
t.Fatalf("error creating BootstrapSigner: %v", err)
}
signer.signConfigMap()
verifyActions(t, []core.Action{}, cl.Actions())
}
func TestSimpleSign(t *testing.T) {
signer, cl := newBootstrapSigner()
signer, cl, err := newBootstrapSigner()
if err != nil {
t.Fatalf("error creating BootstrapSigner: %v", err)
}
cm := newConfigMap("", "")
signer.configMaps.Add(cm)
@ -87,7 +97,10 @@ func TestSimpleSign(t *testing.T) {
}
func TestNoSignNeeded(t *testing.T) {
signer, cl := newBootstrapSigner()
signer, cl, err := newBootstrapSigner()
if err != nil {
t.Fatalf("error creating BootstrapSigner: %v", err)
}
cm := newConfigMap(testTokenID, "eyJhbGciOiJIUzI1NiIsImtpZCI6ImFiYzEyMyJ9..QSxpUG7Q542CirTI2ECPSZjvBOJURUW5a7XqFpNI958")
signer.configMaps.Add(cm)
@ -102,7 +115,10 @@ func TestNoSignNeeded(t *testing.T) {
}
func TestUpdateSignature(t *testing.T) {
signer, cl := newBootstrapSigner()
signer, cl, err := newBootstrapSigner()
if err != nil {
t.Fatalf("error creating BootstrapSigner: %v", err)
}
cm := newConfigMap(testTokenID, "old signature")
signer.configMaps.Add(cm)
@ -123,7 +139,10 @@ func TestUpdateSignature(t *testing.T) {
}
func TestRemoveSignature(t *testing.T) {
signer, cl := newBootstrapSigner()
signer, cl, err := newBootstrapSigner()
if err != nil {
t.Fatalf("error creating BootstrapSigner: %v", err)
}
cm := newConfigMap(testTokenID, "old signature")
signer.configMaps.Add(cm)

View File

@ -66,13 +66,15 @@ type TokenCleaner struct {
// NewTokenCleaner returns a new *NewTokenCleaner.
//
// TODO: Switch to shared informers
func NewTokenCleaner(cl clientset.Interface, options TokenCleanerOptions) *TokenCleaner {
func NewTokenCleaner(cl clientset.Interface, options TokenCleanerOptions) (*TokenCleaner, error) {
e := &TokenCleaner{
client: cl,
tokenSecretNamespace: options.TokenSecretNamespace,
}
if cl.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("token_cleaner", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
secretSelector := fields.SelectorFromSet(map[string]string{api.SecretTypeField: string(bootstrapapi.SecretTypeBootstrapToken)})
@ -94,7 +96,7 @@ func NewTokenCleaner(cl clientset.Interface, options TokenCleanerOptions) *Token
UpdateFunc: func(oldSecret, newSecret interface{}) { e.evalSecret(newSecret) },
},
)
return e
return e, nil
}
// Run runs controller loops and returns when they are done

View File

@ -32,14 +32,21 @@ func init() {
spew.Config.DisableMethods = true
}
func newTokenCleaner() (*TokenCleaner, *fake.Clientset) {
func newTokenCleaner() (*TokenCleaner, *fake.Clientset, error) {
options := DefaultTokenCleanerOptions()
cl := fake.NewSimpleClientset()
return NewTokenCleaner(cl, options), cl
tcc, err := NewTokenCleaner(cl, options)
if err != nil {
return nil, nil, err
}
return tcc, cl, nil
}
func TestCleanerNoExpiration(t *testing.T) {
cleaner, cl := newTokenCleaner()
cleaner, cl, err := newTokenCleaner()
if err != nil {
t.Fatalf("error creating TokenCleaner: %v", err)
}
secret := newTokenSecret("tokenID", "tokenSecret")
cleaner.secrets.Add(secret)
@ -52,7 +59,10 @@ func TestCleanerNoExpiration(t *testing.T) {
}
func TestCleanerExpired(t *testing.T) {
cleaner, cl := newTokenCleaner()
cleaner, cl, err := newTokenCleaner()
if err != nil {
t.Fatalf("error creating TokenCleaner: %v", err)
}
secret := newTokenSecret("tokenID", "tokenSecret")
addSecretExpiration(secret, timeString(-time.Hour))
@ -71,7 +81,10 @@ func TestCleanerExpired(t *testing.T) {
}
func TestCleanerNotExpired(t *testing.T) {
cleaner, cl := newTokenCleaner()
cleaner, cl, err := newTokenCleaner()
if err != nil {
t.Fatalf("error creating TokenCleaner: %v", err)
}
secret := newTokenSecret("tokenID", "tokenSecret")
addSecretExpiration(secret, timeString(time.Hour))

View File

@ -66,14 +66,16 @@ type CronJobController struct {
recorder record.EventRecorder
}
func NewCronJobController(kubeClient clientset.Interface) *CronJobController {
func NewCronJobController(kubeClient clientset.Interface) (*CronJobController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("cronjob_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
jm := &CronJobController{
@ -84,12 +86,15 @@ func NewCronJobController(kubeClient clientset.Interface) *CronJobController {
recorder: eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cronjob-controller"}),
}
return jm
return jm, nil
}
func NewCronJobControllerFromClient(kubeClient clientset.Interface) *CronJobController {
jm := NewCronJobController(kubeClient)
return jm
func NewCronJobControllerFromClient(kubeClient clientset.Interface) (*CronJobController, error) {
jm, err := NewCronJobController(kubeClient)
if err != nil {
return nil, err
}
return jm, nil
}
// Run the main goroutine responsible for watching and syncing jobs.

View File

@ -130,14 +130,16 @@ type DaemonSetsController struct {
suspendedDaemonPods map[string]sets.String
}
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) *DaemonSetsController {
func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInformer, historyInformer appsinformers.ControllerRevisionInformer, podInformer coreinformers.PodInformer, nodeInformer coreinformers.NodeInformer, kubeClient clientset.Interface) (*DaemonSetsController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(kubeClient.CoreV1().RESTClient()).Events("")})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("daemon_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dsc := &DaemonSetsController{
kubeClient: kubeClient,
@ -201,7 +203,7 @@ func NewDaemonSetsController(daemonSetInformer extensionsinformers.DaemonSetInfo
dsc.syncHandler = dsc.syncDaemonSet
dsc.enqueueDaemonSet = dsc.enqueue
dsc.enqueueDaemonSetRateLimited = dsc.enqueueRateLimited
return dsc
return dsc, nil
}
func (dsc *DaemonSetsController) deleteDaemonset(obj interface{}) {

View File

@ -294,17 +294,20 @@ type daemonSetsController struct {
fakeRecorder *record.FakeRecorder
}
func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset) {
func newTestController(initialObjects ...runtime.Object) (*daemonSetsController, *fakePodControl, *fake.Clientset, error) {
clientset := fake.NewSimpleClientset(initialObjects...)
informerFactory := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
dsc := NewDaemonSetsController(
dsc, err := NewDaemonSetsController(
informerFactory.Extensions().V1beta1().DaemonSets(),
informerFactory.Apps().V1beta1().ControllerRevisions(),
informerFactory.Core().V1().Pods(),
informerFactory.Core().V1().Nodes(),
clientset,
)
if err != nil {
return nil, nil, nil, err
}
fakeRecorder := record.NewFakeRecorder(100)
dsc.eventRecorder = fakeRecorder
@ -324,7 +327,7 @@ func newTestController(initialObjects ...runtime.Object) (*daemonSetsController,
informerFactory.Core().V1().Pods().Informer().GetStore(),
informerFactory.Core().V1().Nodes().Informer().GetStore(),
fakeRecorder,
}, podControl, clientset
}, podControl, clientset, nil
}
func validateSyncDaemonSets(t *testing.T, manager *daemonSetsController, fakePodControl *fakePodControl, expectedCreates, expectedDeletes int, expectedEvents int) {
@ -378,7 +381,10 @@ func clearExpectations(t *testing.T, manager *daemonSetsController, ds *extensio
func TestDeleteFinalStateUnknown(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 1, nil)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
@ -409,7 +415,10 @@ func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0, 0)
@ -422,7 +431,10 @@ func TestSimpleDaemonSetPodCreateErrors(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
podControl.FakePodControl.CreateLimit = 10
addNodes(manager.nodeStore, 0, podControl.FakePodControl.CreateLimit*10, nil)
manager.dsStore.Add(ds)
@ -442,7 +454,10 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset := newTestController(ds)
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
var updated *extensions.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
@ -470,7 +485,10 @@ func TestSimpleDaemonSetUpdatesStatusAfterLaunchingPods(t *testing.T) {
// DaemonSets should do nothing if there aren't any nodes
func TestNoNodesDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager.dsStore.Add(ds)
@ -484,7 +502,10 @@ func TestOneNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.nodeStore.Add(newNode("only-node", nil))
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0, 0)
@ -496,7 +517,10 @@ func TestNotReadyNodeDaemonDoesLaunchPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("not-ready", nil)
node.Status.Conditions = []v1.NodeCondition{
{Type: v1.NodeReady, Status: v1.ConditionFalse},
@ -543,7 +567,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotLaunchPod(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -570,7 +597,10 @@ func TestInsufficientCapacityNodeDaemonDoesNotUnscheduleRunningPod(t *testing.T)
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -595,7 +625,10 @@ func TestInsufficientCapacityNodeSufficientCapacityWithNodeLabelDaemonLaunchPod(
ds := newDaemonSet("foo")
ds.Spec.Template.Spec = podSpec
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node1 := newNode("not-enough-resource", nil)
node1.Status.Allocatable = allocatableResources("10M", "20m")
node2 := newNode("enough-resource", simpleNodeLabel)
@ -616,7 +649,10 @@ func TestSufficientCapacityWithTerminatedPodsDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -636,7 +672,10 @@ func TestSufficientCapacityNodeDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
@ -653,7 +692,10 @@ func TestNetworkUnavailableNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("network-unavailable", nil)
node.Status.Conditions = []v1.NodeCondition{
@ -675,7 +717,10 @@ func TestDontDoAnythingIfBeingDeleted(t *testing.T) {
ds.Spec.Template.Spec = podSpec
now := metav1.Now()
ds.DeletionTimestamp = &now
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("not-too-much-mem", nil)
node.Status.Allocatable = allocatableResources("200M", "200m")
manager.nodeStore.Add(node)
@ -694,7 +739,10 @@ func TestDontDoAnythingIfBeingDeletedRace(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
now := metav1.Now()
ds.DeletionTimestamp = &now
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 5, nil)
// Lister (cache) says it's NOT deleted.
@ -721,7 +769,10 @@ func TestPortConflictNodeDaemonDoesNotLaunchPod(t *testing.T) {
}},
}},
}
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&v1.Pod{
@ -750,7 +801,10 @@ func TestPortConflictWithSameDaemonPodDoesNotDeletePod(t *testing.T) {
}},
}},
}
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
ds := newDaemonSet("foo")
@ -785,7 +839,10 @@ func TestNoPortConflictNodeDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec = podSpec2
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("no-port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&v1.Pod{
@ -816,7 +873,10 @@ func TestPodIsNotDeletedByDaemonsetWithEmptyLabelSelector(t *testing.T) {
ds.Spec.Selector = &ls
ds.Spec.Template.Spec.NodeSelector = map[string]string{"foo": "bar"}
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.nodeStore.Add(newNode("node1", nil))
// Create pod not controlled by a daemonset.
manager.podStore.Add(&v1.Pod{
@ -839,7 +899,10 @@ func TestDealsWithExistingPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
addPods(manager.podStore, "node-1", simpleDaemonSetLabel, ds, 1)
@ -856,7 +919,10 @@ func TestSelectorDaemonLaunchesPods(t *testing.T) {
daemon := newDaemonSet("foo")
daemon.Spec.UpdateStrategy = *strategy
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _ := newTestController(daemon)
manager, podControl, _, err := newTestController(daemon)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
manager.dsStore.Add(daemon)
@ -870,7 +936,10 @@ func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
addNodes(manager.nodeStore, 5, 5, simpleNodeLabel)
@ -888,7 +957,10 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 5, nil)
addNodes(manager.nodeStore, 5, 5, simpleNodeLabel)
@ -907,7 +979,10 @@ func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
// DaemonSet with node selector which does not match any node labels should not launch pods.
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
ds := newDaemonSet("foo")
@ -924,7 +999,10 @@ func TestNameDaemonSetLaunchesPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeName = "node-0"
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0, 0)
@ -937,7 +1015,10 @@ func TestBadNameDaemonSetDoesNothing(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeName = "node-10"
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0, 0)
@ -951,7 +1032,10 @@ func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
ds.Spec.Template.Spec.NodeName = "node-6"
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
manager.dsStore.Add(ds)
@ -966,7 +1050,10 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
ds.Spec.Template.Spec.NodeName = "node-0"
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
manager.dsStore.Add(ds)
@ -978,7 +1065,10 @@ func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
func TestSelectorDaemonSetLaunchesPods(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
manager.dsStore.Add(ds)
@ -1008,7 +1098,10 @@ func TestNodeAffinityDaemonLaunchesPods(t *testing.T) {
},
}
manager, podControl, _ := newTestController(daemon)
manager, podControl, _, err := newTestController(daemon)
if err != nil {
t.Fatalf("rrror creating DaemonSetsController: %v", err)
}
addNodes(manager.nodeStore, 0, 4, nil)
addNodes(manager.nodeStore, 4, 3, simpleNodeLabel)
manager.dsStore.Add(daemon)
@ -1020,7 +1113,10 @@ func TestNumberReadyStatus(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, clientset := newTestController(ds)
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
var updated *extensions.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
@ -1060,7 +1156,10 @@ func TestObservedGeneration(t *testing.T) {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
ds.Generation = 1
manager, podControl, clientset := newTestController(ds)
manager, podControl, clientset, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
var updated *extensions.DaemonSet
clientset.PrependReactor("update", "daemonsets", func(action core.Action) (handled bool, ret runtime.Object, err error) {
if action.GetSubresource() != "status" {
@ -1101,7 +1200,10 @@ func TestDaemonKillFailedPods(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
addNodes(manager.nodeStore, 0, 1, nil)
addFailedPods(manager.podStore, "node-0", simpleDaemonSetLabel, ds, test.numFailedPods)
@ -1117,7 +1219,10 @@ func TestNoScheduleTaintedDoesntEvicitRunningIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
manager.nodeStore.Add(node)
@ -1135,7 +1240,10 @@ func TestNoExecuteTaintedDoesEvicitRunningIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
manager.nodeStore.Add(node)
@ -1152,7 +1260,10 @@ func TestTaintedNodeDaemonDoesNotLaunchIntolerantPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("intolerant")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
setNodeTaint(node, noScheduleTaints)
@ -1169,7 +1280,10 @@ func TestTaintedNodeDaemonLaunchesToleratePod(t *testing.T) {
ds := newDaemonSet("tolerate")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetToleration(ds, noScheduleTolerations)
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
setNodeTaint(node, noScheduleTaints)
@ -1185,7 +1299,10 @@ func TestNotReadyNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
setNodeTaint(node, nodeNotReady)
@ -1204,7 +1321,10 @@ func TestUnreachableNodeDaemonLaunchesPod(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("simple")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("tainted", nil)
setNodeTaint(node, nodeUnreachable)
@ -1224,7 +1344,10 @@ func TestNodeDaemonLaunchesToleratePod(t *testing.T) {
ds := newDaemonSet("tolerate")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetToleration(ds, noScheduleTolerations)
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 1, nil)
manager.dsStore.Add(ds)
@ -1237,7 +1360,10 @@ func TestDaemonSetRespectsTermination(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 1, simpleNodeLabel)
pod := newPod(fmt.Sprintf("%s-", "node-0"), "node-0", simpleDaemonSetLabel, ds)
@ -1264,7 +1390,10 @@ func TestTaintOutOfDiskNodeDaemonLaunchesCriticalPod(t *testing.T) {
ds := newDaemonSet("critical")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetCritical(ds)
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("not-enough-disk", nil)
node.Status.Conditions = []v1.NodeCondition{{Type: v1.NodeOutOfDisk, Status: v1.ConditionTrue}}
@ -1297,7 +1426,10 @@ func TestTaintPressureNodeDaemonLaunchesPod(t *testing.T) {
ds := newDaemonSet("critical")
ds.Spec.UpdateStrategy = *strategy
setDaemonSetCritical(ds)
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("resources-pressure", nil)
node.Status.Conditions = []v1.NodeCondition{
@ -1329,7 +1461,10 @@ func TestInsufficientCapacityNodeDaemonLaunchesCriticalPod(t *testing.T) {
ds.Spec.Template.Spec = podSpec
setDaemonSetCritical(ds)
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("too-much-mem", nil)
node.Status.Allocatable = allocatableResources("100M", "200m")
manager.nodeStore.Add(node)
@ -1373,7 +1508,10 @@ func TestPortConflictNodeDaemonDoesNotLaunchCriticalPod(t *testing.T) {
}},
}},
}
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node := newNode("port-conflict", nil)
manager.nodeStore.Add(node)
manager.podStore.Add(&v1.Pod{
@ -1494,7 +1632,10 @@ func TestNodeShouldRunDaemonPod(t *testing.T) {
node := newNode("test-node", nil)
node.Status.Conditions = append(node.Status.Conditions, c.nodeCondition...)
node.Status.Allocatable = allocatableResources("100M", "1")
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.nodeStore.Add(node)
for _, p := range c.podsOnNode {
manager.podStore.Add(p)
@ -1566,7 +1707,10 @@ func TestUpdateNode(t *testing.T) {
}
for _, c := range cases {
for _, strategy := range updateStrategies() {
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.nodeStore.Add(c.oldNode)
c.ds.Spec.UpdateStrategy = *strategy
manager.dsStore.Add(c.ds)
@ -1729,7 +1873,10 @@ func TestDeleteNoDaemonPod(t *testing.T) {
for _, c := range cases {
for _, strategy := range updateStrategies() {
manager, podControl, _ := newTestController()
manager, podControl, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.nodeStore.Add(c.node)
c.ds.Spec.UpdateStrategy = *strategy
manager.dsStore.Add(c.ds)
@ -1766,7 +1913,10 @@ func TestGetNodesToDaemonPods(t *testing.T) {
ds.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
ds2.Spec.UpdateStrategy = *strategy
manager, _, _ := newTestController(ds, ds2)
manager, _, _, err := newTestController(ds, ds2)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
manager.dsStore.Add(ds)
manager.dsStore.Add(ds2)
addNodes(manager.nodeStore, 0, 2, nil)
@ -1821,7 +1971,10 @@ func TestGetNodesToDaemonPods(t *testing.T) {
}
func TestAddNode(t *testing.T) {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
node1 := newNode("node1", nil)
ds := newDaemonSet("ds")
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
@ -1845,7 +1998,10 @@ func TestAddNode(t *testing.T) {
func TestAddPod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -1885,7 +2041,10 @@ func TestAddPod(t *testing.T) {
func TestAddPodOrphan(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -1911,7 +2070,10 @@ func TestAddPodOrphan(t *testing.T) {
func TestUpdatePod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -1955,7 +2117,10 @@ func TestUpdatePod(t *testing.T) {
func TestUpdatePodOrphanSameLabels(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -1975,7 +2140,10 @@ func TestUpdatePodOrphanSameLabels(t *testing.T) {
func TestUpdatePodOrphanWithNewLabels(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -2001,7 +2169,10 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
for _, strategy := range updateStrategies() {
ds := newDaemonSet("foo")
ds.Spec.UpdateStrategy = *strategy
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds2 := newDaemonSet("foo2")
manager.dsStore.Add(ds1)
@ -2020,7 +2191,10 @@ func TestUpdatePodChangeControllerRef(t *testing.T) {
func TestUpdatePodControllerRefRemoved(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -2041,7 +2215,10 @@ func TestUpdatePodControllerRefRemoved(t *testing.T) {
func TestDeletePod(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")
@ -2081,7 +2258,10 @@ func TestDeletePod(t *testing.T) {
func TestDeletePodOrphan(t *testing.T) {
for _, strategy := range updateStrategies() {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
ds1 := newDaemonSet("foo1")
ds1.Spec.UpdateStrategy = *strategy
ds2 := newDaemonSet("foo2")

View File

@ -27,7 +27,10 @@ import (
func TestDaemonSetUpdatesPods(t *testing.T) {
ds := newDaemonSet("foo")
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
maxUnavailable := 2
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
@ -66,7 +69,10 @@ func TestDaemonSetUpdatesPods(t *testing.T) {
func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
ds := newDaemonSet("foo")
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
maxUnavailable := 3
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
@ -93,7 +99,10 @@ func TestDaemonSetUpdatesWhenNewPosIsNotReady(t *testing.T) {
func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
ds := newDaemonSet("foo")
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
maxUnavailable := 3
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
@ -119,7 +128,10 @@ func TestDaemonSetUpdatesAllOldPodsNotReady(t *testing.T) {
func TestDaemonSetUpdatesNoTemplateChanged(t *testing.T) {
ds := newDaemonSet("foo")
manager, podControl, _ := newTestController(ds)
manager, podControl, _, err := newTestController(ds)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
maxUnavailable := 3
addNodes(manager.nodeStore, 0, 5, nil)
manager.dsStore.Add(ds)
@ -149,7 +161,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
{
name: "No nodes",
Manager: func() *daemonSetsController {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
return manager
}(),
ds: func() *extensions.DaemonSet {
@ -165,7 +180,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
{
name: "Two nodes with ready pods",
Manager: func() *daemonSetsController {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
@ -191,7 +209,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
{
name: "Two nodes, one node without pods",
Manager: func() *daemonSetsController {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
@ -214,7 +235,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
{
name: "Two nodes with pods, MaxUnavailable in percents",
Manager: func() *daemonSetsController {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),
@ -240,7 +264,10 @@ func TestGetUnavailableNumbers(t *testing.T) {
{
name: "Two nodes with pods, MaxUnavailable in percents, pod terminating",
Manager: func() *daemonSetsController {
manager, _, _ := newTestController()
manager, _, _, err := newTestController()
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
addNodes(manager.nodeStore, 0, 2, nil)
return manager
}(),

View File

@ -97,14 +97,16 @@ type DeploymentController struct {
}
// NewDeploymentController creates a new DeploymentController.
func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) *DeploymentController {
func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, rsInformer extensionsinformers.ReplicaSetInformer, podInformer coreinformers.PodInformer, client clientset.Interface) (*DeploymentController, error) {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
// TODO: remove the wrapper when every clients have moved to use the clientset.
eventBroadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: v1core.New(client.CoreV1().RESTClient()).Events("")})
if client != nil && client.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("deployment_controller", client.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
dc := &DeploymentController{
client: client,
@ -140,7 +142,7 @@ func NewDeploymentController(dInformer extensionsinformers.DeploymentInformer, r
dc.dListerSynced = dInformer.Informer().HasSynced
dc.rsListerSynced = rsInformer.Informer().HasSynced
dc.podListerSynced = podInformer.Informer().HasSynced
return dc
return dc, nil
}
// Run begins watching and syncing.

View File

@ -187,10 +187,13 @@ func newFixture(t *testing.T) *fixture {
return f
}
func (f *fixture) newController() (*DeploymentController, informers.SharedInformerFactory) {
func (f *fixture) newController() (*DeploymentController, informers.SharedInformerFactory, error) {
f.client = fake.NewSimpleClientset(f.objects...)
informers := informers.NewSharedInformerFactory(f.client, controller.NoResyncPeriodFunc())
c := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), f.client)
c, err := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), f.client)
if err != nil {
return nil, nil, err
}
c.eventRecorder = &record.FakeRecorder{}
c.dListerSynced = alwaysReady
c.rsListerSynced = alwaysReady
@ -204,7 +207,7 @@ func (f *fixture) newController() (*DeploymentController, informers.SharedInform
for _, pod := range f.podLister {
informers.Core().V1().Pods().Informer().GetIndexer().Add(pod)
}
return c, informers
return c, informers, nil
}
func (f *fixture) runExpectError(deploymentName string, startInformers bool) {
@ -216,14 +219,17 @@ func (f *fixture) run(deploymentName string) {
}
func (f *fixture) run_(deploymentName string, startInformers bool, expectError bool) {
c, informers := f.newController()
c, informers, err := f.newController()
if err != nil {
f.t.Fatalf("error creating Deployment controller: %v", err)
}
if startInformers {
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
}
err := c.syncDeployment(deploymentName)
err = c.syncDeployment(deploymentName)
if !expectError && err != nil {
f.t.Errorf("error syncing deployment: %v", err)
} else if expectError && err == nil {
@ -378,7 +384,10 @@ func TestPodDeletionEnqueuesRecreateDeployment(t *testing.T) {
f.rsLister = append(f.rsLister, rs)
f.objects = append(f.objects, foo, rs)
c, _ := f.newController()
c, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
enqueued := false
c.enqueueDeployment = func(d *extensions.Deployment) {
if d.Name == "foo" {
@ -411,7 +420,10 @@ func TestPodDeletionDoesntEnqueueRecreateDeployment(t *testing.T) {
// return a non-empty list.
f.podLister = append(f.podLister, pod1, pod2)
c, _ := f.newController()
c, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
enqueued := false
c.enqueueDeployment = func(d *extensions.Deployment) {
if d.Name == "foo" {
@ -444,7 +456,10 @@ func TestPodDeletionPartialReplicaSetOwnershipEnqueueRecreateDeployment(t *testi
f.rsLister = append(f.rsLister, rs1, rs2)
f.objects = append(f.objects, foo, rs1, rs2)
c, _ := f.newController()
c, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
enqueued := false
c.enqueueDeployment = func(d *extensions.Deployment) {
if d.Name == "foo" {
@ -480,7 +495,10 @@ func TestPodDeletionPartialReplicaSetOwnershipDoesntEnqueueRecreateDeployment(t
// return a non-empty list.
f.podLister = append(f.podLister, pod)
c, _ := f.newController()
c, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
enqueued := false
c.enqueueDeployment = func(d *extensions.Deployment) {
if d.Name == "foo" {
@ -512,7 +530,10 @@ func TestGetReplicaSetsForDeployment(t *testing.T) {
f.objects = append(f.objects, d1, d2, rs1, rs2)
// Start the fixture.
c, informers := f.newController()
c, informers, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
@ -559,7 +580,10 @@ func TestGetReplicaSetsForDeploymentAdoptRelease(t *testing.T) {
f.objects = append(f.objects, d, rsAdopt, rsRelease)
// Start the fixture.
c, informers := f.newController()
c, informers, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
@ -603,7 +627,10 @@ func TestGetPodMapForReplicaSets(t *testing.T) {
f.objects = append(f.objects, d, rs1, rs2, pod1, pod2, pod3, pod4)
// Start the fixture.
c, informers := f.newController()
c, informers, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
stopCh := make(chan struct{})
defer close(stopCh)
informers.Start(stopCh)
@ -656,7 +683,10 @@ func TestAddReplicaSet(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
dc.addReplicaSet(rs1)
if got, want := dc.queue.Len(), 1; got != want {
@ -703,7 +733,10 @@ func TestAddReplicaSetOrphan(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
dc.addReplicaSet(rs)
if got, want := dc.queue.Len(), 2; got != want {
@ -728,7 +761,10 @@ func TestUpdateReplicaSet(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
prev := *rs1
next := *rs1
@ -779,7 +815,10 @@ func TestUpdateReplicaSetOrphanWithNewLabels(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
// Change labels and expect all matching controllers to queue.
prev := *rs
@ -806,7 +845,10 @@ func TestUpdateReplicaSetChangeControllerRef(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
// Change ControllerRef and expect both old and new to queue.
prev := *rs
@ -833,7 +875,10 @@ func TestUpdateReplicaSetRelease(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
// Remove ControllerRef and expect all matching controller to sync orphan.
prev := *rs
@ -863,7 +908,10 @@ func TestDeleteReplicaSet(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
dc.deleteReplicaSet(rs1)
if got, want := dc.queue.Len(), 1; got != want {
@ -908,7 +956,10 @@ func TestDeleteReplicaSetOrphan(t *testing.T) {
// Create the fixture but don't start it,
// so nothing happens in the background.
dc, _ := f.newController()
dc, _, err := f.newController()
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
dc.deleteReplicaSet(rs)
if got, want := dc.queue.Len(), 0; got != want {

View File

@ -65,7 +65,10 @@ func TestScaleDownOldReplicaSets(t *testing.T) {
kc := fake.NewSimpleClientset(expected...)
informers := informers.NewSharedInformerFactory(kc, controller.NoResyncPeriodFunc())
c := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), kc)
c, err := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), kc)
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
c.eventRecorder = &record.FakeRecorder{}
c.scaleDownOldReplicaSetsForRecreate(oldRSs, test.d)

View File

@ -399,7 +399,10 @@ func TestDeploymentController_cleanupDeployment(t *testing.T) {
fake := &fake.Clientset{}
informers := informers.NewSharedInformerFactory(fake, controller.NoResyncPeriodFunc())
controller := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), fake)
controller, err := NewDeploymentController(informers.Extensions().V1beta1().Deployments(), informers.Extensions().V1beta1().ReplicaSets(), informers.Core().V1().Pods(), fake)
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
controller.eventRecorder = &record.FakeRecorder{}
controller.dListerSynced = alwaysReady

View File

@ -119,7 +119,9 @@ func New(
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
if kubeClient != nil && kubeClient.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("service_controller", kubeClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
s := &ServiceController{

View File

@ -61,14 +61,16 @@ func DefaultServiceAccountsControllerOptions() ServiceAccountsControllerOptions
}
// NewServiceAccountsController returns a new *ServiceAccountsController.
func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) *ServiceAccountsController {
func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInformer, nsInformer coreinformers.NamespaceInformer, cl clientset.Interface, options ServiceAccountsControllerOptions) (*ServiceAccountsController, error) {
e := &ServiceAccountsController{
client: cl,
serviceAccountsToEnsure: options.ServiceAccounts,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "serviceaccount"),
}
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_controller", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
saInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
@ -86,7 +88,7 @@ func NewServiceAccountsController(saInformer coreinformers.ServiceAccountInforme
e.syncHandler = e.syncNamespace
return e
return e, nil
}
// ServiceAccountsController manages ServiceAccount objects inside Namespaces

View File

@ -165,12 +165,15 @@ func TestServiceAccountCreation(t *testing.T) {
}
saInformer := informers.Core().V1().ServiceAccounts()
nsInformer := informers.Core().V1().Namespaces()
controller := NewServiceAccountsController(
controller, err := NewServiceAccountsController(
saInformer,
nsInformer,
client,
options,
)
if err != nil {
t.Fatalf("error creating ServiceAccounts controller: %v", err)
}
controller.saListerSynced = alwaysReady
controller.nsListerSynced = alwaysReady

View File

@ -70,7 +70,7 @@ type TokensControllerOptions struct {
}
// NewTokensController returns a new *TokensController.
func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) *TokensController {
func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secrets informers.SecretInformer, cl clientset.Interface, options TokensControllerOptions) (*TokensController, error) {
maxRetries := options.MaxRetries
if maxRetries == 0 {
maxRetries = 10
@ -87,7 +87,9 @@ func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secre
maxRetries: maxRetries,
}
if cl != nil && cl.CoreV1().RESTClient().GetRateLimiter() != nil {
metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_tokens_controller", cl.CoreV1().RESTClient().GetRateLimiter())
if err := metrics.RegisterMetricAndTrackRateLimiterUsage("serviceaccount_tokens_controller", cl.CoreV1().RESTClient().GetRateLimiter()); err != nil {
return nil, err
}
}
e.serviceAccounts = serviceAccounts.Lister()
@ -124,7 +126,7 @@ func NewTokensController(serviceAccounts informers.ServiceAccountInformer, secre
options.SecretResync,
)
return e
return e, nil
}
// TokensController manages ServiceAccountToken secrets for ServiceAccount objects

View File

@ -586,7 +586,10 @@ func TestTokenCreation(t *testing.T) {
secretInformer := informers.Core().V1().Secrets().Informer()
secrets := secretInformer.GetStore()
serviceAccounts := informers.Core().V1().ServiceAccounts().Informer().GetStore()
controller := NewTokensController(informers.Core().V1().ServiceAccounts(), informers.Core().V1().Secrets(), client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
controller, err := NewTokensController(informers.Core().V1().ServiceAccounts(), informers.Core().V1().Secrets(), client, TokensControllerOptions{TokenGenerator: generator, RootCA: []byte("CA Data"), MaxRetries: tc.MaxRetries})
if err != nil {
t.Fatalf("error creating Tokens controller: %v", err)
}
if tc.ExistingServiceAccount != nil {
serviceAccounts.Add(tc.ExistingServiceAccount)

View File

@ -34,40 +34,66 @@ const (
var (
metricsLock sync.Mutex
rateLimiterMetrics = make(map[string]prometheus.Gauge)
rateLimiterMetrics = make(map[string]rateLimiterMetric)
)
type rateLimiterMetric struct {
metric prometheus.Gauge
stopCh chan struct{}
}
func registerRateLimiterMetric(ownerName string) error {
metricsLock.Lock()
defer metricsLock.Unlock()
if _, ok := rateLimiterMetrics[ownerName]; ok {
glog.Errorf("Metric for %v already registered", ownerName)
return fmt.Errorf("Metric for %v already registered", ownerName)
return fmt.Errorf("Rate Limiter Metric for %v already registered", ownerName)
}
metric := prometheus.NewGauge(prometheus.GaugeOpts{
Name: "rate_limiter_use",
Subsystem: ownerName,
Help: fmt.Sprintf("A metric measuring the saturation of the rate limiter for %v", ownerName),
})
rateLimiterMetrics[ownerName] = metric
if err := prometheus.Register(metric); err != nil {
return fmt.Errorf("error registering rate limiter usage metric: %v", err)
}
stopCh := make(chan struct{})
rateLimiterMetrics[ownerName] = rateLimiterMetric{
metric: metric,
stopCh: stopCh,
}
return nil
}
// RegisterMetricAndTrackRateLimiterUsage registers a metric ownerName_rate_limiter_use in prometheus to track
// how much used rateLimiter is and starts a goroutine that updates this metric every updatePeriod
func RegisterMetricAndTrackRateLimiterUsage(ownerName string, rateLimiter flowcontrol.RateLimiter) error {
err := registerRateLimiterMetric(ownerName)
if err != nil {
if err := registerRateLimiterMetric(ownerName); err != nil {
return err
}
go wait.Forever(func() {
go wait.Until(func() {
metricsLock.Lock()
defer metricsLock.Unlock()
rateLimiterMetrics[ownerName].Set(rateLimiter.Saturation())
}, updatePeriod)
rateLimiterMetrics[ownerName].metric.Set(rateLimiter.Saturation())
}, updatePeriod, rateLimiterMetrics[ownerName].stopCh)
return nil
}
// UnregisterMetricAndUntrackRateLimiterUsage unregisters a metric ownerName_rate_limiter_use from prometheus and
// stops the goroutine that updates this metric
func UnregisterMetricAndUntrackRateLimiterUsage(ownerName string) bool {
metricsLock.Lock()
defer metricsLock.Unlock()
rlm, ok := rateLimiterMetrics[ownerName]
if !ok {
glog.Warningf("Rate Limiter Metric for %v not registered", ownerName)
return false
}
close(rlm.stopCh)
prometheus.Unregister(rlm.metric)
delete(rateLimiterMetrics, ownerName)
return true
}

View File

@ -34,6 +34,11 @@ func TestRegisterMetricAndTrackRateLimiterUsage(t *testing.T) {
rateLimiter: flowcontrol.NewTokenBucketRateLimiter(1, 1),
err: "",
},
{
ownerName: "owner_name",
rateLimiter: flowcontrol.NewTokenBucketRateLimiter(1, 1),
err: "already registered",
},
{
ownerName: "invalid-owner-name",
rateLimiter: flowcontrol.NewTokenBucketRateLimiter(1, 1),
@ -52,3 +57,27 @@ func TestRegisterMetricAndTrackRateLimiterUsage(t *testing.T) {
}
}
}
func TestUnregisterMetricAndUntrackRateLimiterUsage(t *testing.T) {
RegisterMetricAndTrackRateLimiterUsage("owner_name", flowcontrol.NewTokenBucketRateLimiter(1, 1))
testCases := []struct {
ownerName string
ok bool
}{
{
ownerName: "owner_name",
ok: true,
},
{
ownerName: "owner_name",
ok: false,
},
}
for i, tc := range testCases {
ok := UnregisterMetricAndUntrackRateLimiterUsage(tc.ownerName)
if tc.ok != ok {
t.Errorf("Case[%d] Expected %v, got %v", i, tc.ok, ok)
}
}
}

View File

@ -17,6 +17,7 @@ go_test(
deps = [
"//pkg/api/v1/pod:go_default_library",
"//pkg/controller/daemon:go_default_library",
"//pkg/util/metrics:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",
"//vendor/k8s.io/api/extensions/v1beta1:go_default_library",

View File

@ -36,6 +36,7 @@ import (
"k8s.io/client-go/tools/cache"
podutil "k8s.io/kubernetes/pkg/api/v1/pod"
"k8s.io/kubernetes/pkg/controller/daemon"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/test/integration/framework"
)
@ -50,13 +51,17 @@ func setup(t *testing.T) (*httptest.Server, framework.CloseFunc, *daemon.DaemonS
}
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-informers")), resyncPeriod)
dc := daemon.NewDaemonSetsController(
metrics.UnregisterMetricAndUntrackRateLimiterUsage("daemon_controller")
dc, err := daemon.NewDaemonSetsController(
informers.Extensions().V1beta1().DaemonSets(),
informers.Apps().V1beta1().ControllerRevisions(),
informers.Core().V1().Pods(),
informers.Core().V1().Nodes(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "daemonset-controller")),
)
if err != nil {
t.Fatalf("error creating DaemonSets controller: %v", err)
}
return server, closeFn, dc, informers, clientSet
}

View File

@ -35,6 +35,7 @@ go_library(
"//pkg/controller/deployment:go_default_library",
"//pkg/controller/deployment/util:go_default_library",
"//pkg/controller/replicaset:go_default_library",
"//pkg/util/metrics:go_default_library",
"//test/integration/framework:go_default_library",
"//test/utils:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View File

@ -33,6 +33,7 @@ import (
"k8s.io/kubernetes/pkg/controller/deployment"
deploymentutil "k8s.io/kubernetes/pkg/controller/deployment/util"
"k8s.io/kubernetes/pkg/controller/replicaset"
"k8s.io/kubernetes/pkg/util/metrics"
"k8s.io/kubernetes/test/integration/framework"
testutil "k8s.io/kubernetes/test/utils"
)
@ -150,12 +151,16 @@ func dcSetup(t *testing.T) (*httptest.Server, framework.CloseFunc, *replicaset.R
resyncPeriod := 12 * time.Hour
informers := informers.NewSharedInformerFactory(clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "deployment-informers")), resyncPeriod)
dc := deployment.NewDeploymentController(
metrics.UnregisterMetricAndUntrackRateLimiterUsage("deployment_controller")
dc, err := deployment.NewDeploymentController(
informers.Extensions().V1beta1().Deployments(),
informers.Extensions().V1beta1().ReplicaSets(),
informers.Core().V1().Pods(),
clientset.NewForConfigOrDie(restclient.AddUserAgent(&config, "deployment-controller")),
)
if err != nil {
t.Fatalf("error creating Deployment controller: %v", err)
}
rm := replicaset.NewReplicaSetController(
informers.Extensions().V1beta1().ReplicaSets(),
informers.Core().V1().Pods(),

View File

@ -21,6 +21,7 @@ go_test(
"//pkg/controller:go_default_library",
"//pkg/controller/serviceaccount:go_default_library",
"//pkg/serviceaccount:go_default_library",
"//pkg/util/metrics:go_default_library",
"//plugin/pkg/admission/serviceaccount:go_default_library",
"//test/integration/framework:go_default_library",
"//vendor/k8s.io/api/core/v1:go_default_library",

View File

@ -50,6 +50,7 @@ import (
"k8s.io/kubernetes/pkg/controller"
serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount"
"k8s.io/kubernetes/pkg/serviceaccount"
"k8s.io/kubernetes/pkg/util/metrics"
serviceaccountadmission "k8s.io/kubernetes/plugin/pkg/admission/serviceaccount"
"k8s.io/kubernetes/test/integration/framework"
)
@ -63,13 +64,16 @@ const (
)
func TestServiceAccountAutoCreate(t *testing.T) {
c, _, stopFunc := startServiceAccountTestServer(t)
c, _, stopFunc, err := startServiceAccountTestServer(t)
defer stopFunc()
if err != nil {
t.Fatalf("failed to setup ServiceAccounts server: %v", err)
}
ns := "test-service-account-creation"
// Create namespace
_, err := c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
_, err = c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
if err != nil {
t.Fatalf("could not create namespace: %v", err)
}
@ -97,14 +101,17 @@ func TestServiceAccountAutoCreate(t *testing.T) {
}
func TestServiceAccountTokenAutoCreate(t *testing.T) {
c, _, stopFunc := startServiceAccountTestServer(t)
c, _, stopFunc, err := startServiceAccountTestServer(t)
defer stopFunc()
if err != nil {
t.Fatalf("failed to setup ServiceAccounts server: %v", err)
}
ns := "test-service-account-token-creation"
name := "my-service-account"
// Create namespace
_, err := c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
_, err = c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
if err != nil {
t.Fatalf("could not create namespace: %v", err)
}
@ -192,13 +199,16 @@ func TestServiceAccountTokenAutoCreate(t *testing.T) {
}
func TestServiceAccountTokenAutoMount(t *testing.T) {
c, _, stopFunc := startServiceAccountTestServer(t)
c, _, stopFunc, err := startServiceAccountTestServer(t)
defer stopFunc()
if err != nil {
t.Fatalf("failed to setup ServiceAccounts server: %v", err)
}
ns := "auto-mount-ns"
// Create "my" namespace
_, err := c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
_, err = c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: ns}})
if err != nil && !errors.IsAlreadyExists(err) {
t.Fatalf("could not create namespace: %v", err)
}
@ -271,14 +281,17 @@ func TestServiceAccountTokenAutoMount(t *testing.T) {
}
func TestServiceAccountTokenAuthentication(t *testing.T) {
c, config, stopFunc := startServiceAccountTestServer(t)
c, config, stopFunc, err := startServiceAccountTestServer(t)
defer stopFunc()
if err != nil {
t.Fatalf("failed to setup ServiceAccounts server: %v", err)
}
myns := "auth-ns"
otherns := "other-ns"
// Create "my" namespace
_, err := c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: myns}})
_, err = c.Core().Namespaces().Create(&v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: myns}})
if err != nil && !errors.IsAlreadyExists(err) {
t.Fatalf("could not create namespace: %v", err)
}
@ -337,7 +350,7 @@ func TestServiceAccountTokenAuthentication(t *testing.T) {
// startServiceAccountTestServer returns a started server
// It is the responsibility of the caller to ensure the returned stopFunc is called
func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclient.Config, func()) {
func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclient.Config, func(), error) {
// Listener
h := &framework.MasterHolder{Initialized: make(chan struct{})}
apiServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -419,30 +432,38 @@ func startServiceAccountTestServer(t *testing.T) (*clientset.Clientset, restclie
// Start the service account and service account token controllers
stopCh := make(chan struct{})
tokenController := serviceaccountcontroller.NewTokensController(
informers.Core().V1().ServiceAccounts(),
informers.Core().V1().Secrets(),
rootClientset,
serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)},
)
go tokenController.Run(1, stopCh)
serviceAccountController := serviceaccountcontroller.NewServiceAccountsController(
informers.Core().V1().ServiceAccounts(),
informers.Core().V1().Namespaces(),
rootClientset,
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
)
informers.Start(stopCh)
internalInformers.Start(stopCh)
go serviceAccountController.Run(5, stopCh)
stop := func() {
close(stopCh)
apiServer.Close()
}
return rootClientset, clientConfig, stop
metrics.UnregisterMetricAndUntrackRateLimiterUsage("serviceaccount_tokens_controller")
tokenController, err := serviceaccountcontroller.NewTokensController(
informers.Core().V1().ServiceAccounts(),
informers.Core().V1().Secrets(),
rootClientset,
serviceaccountcontroller.TokensControllerOptions{TokenGenerator: serviceaccount.JWTTokenGenerator(serviceAccountKey)},
)
if err != nil {
return rootClientset, clientConfig, stop, err
}
go tokenController.Run(1, stopCh)
metrics.UnregisterMetricAndUntrackRateLimiterUsage("serviceaccount_controller")
serviceAccountController, err := serviceaccountcontroller.NewServiceAccountsController(
informers.Core().V1().ServiceAccounts(),
informers.Core().V1().Namespaces(),
rootClientset,
serviceaccountcontroller.DefaultServiceAccountsControllerOptions(),
)
if err != nil {
return rootClientset, clientConfig, stop, err
}
informers.Start(stopCh)
internalInformers.Start(stopCh)
go serviceAccountController.Run(5, stopCh)
return rootClientset, clientConfig, stop, nil
}
func getServiceAccount(c *clientset.Clientset, ns string, name string, shouldWait bool) (*v1.ServiceAccount, error) {