diff --git a/cluster/gce/config-test.sh b/cluster/gce/config-test.sh index 7c7f12eba7..d4c8485819 100755 --- a/cluster/gce/config-test.sh +++ b/cluster/gce/config-test.sh @@ -61,10 +61,11 @@ SERVICE_CLUSTER_IP_RANGE="10.0.0.0/16" # formerly PORTAL_NET ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}" TEST_CLUSTER_LOG_LEVEL="${TEST_CLUSTER_LOG_LEVEL:---v=4}" +TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}" KUBELET_TEST_ARGS="--max-pods=100 $TEST_CLUSTER_LOG_LEVEL" APISERVER_TEST_ARGS="--runtime-config=experimental/v1alpha1 ${TEST_CLUSTER_LOG_LEVEL}" -CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" +CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL} ${TEST_CLUSTER_RESYNC_PERIOD}" SCHEDULER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" KUBEPROXY_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}" diff --git a/cmd/integration/integration.go b/cmd/integration/integration.go index 2d3a556069..00073904ac 100644 --- a/cmd/integration/integration.go +++ b/cmd/integration/integration.go @@ -43,6 +43,7 @@ import ( "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/endpoint" "k8s.io/kubernetes/pkg/controller/node" replicationControllerPkg "k8s.io/kubernetes/pkg/controller/replication" @@ -196,14 +197,13 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string eventBroadcaster.StartRecordingToSink(cl.Events("")) scheduler.New(schedulerConfig).Run() - endpoints := endpointcontroller.NewEndpointController(cl) // ensure the service endpoints are sync'd several times within the window that the integration tests wait - go endpoints.Run(3, util.NeverStop) - - controllerManager := replicationControllerPkg.NewReplicationManager(cl, replicationControllerPkg.BurstReplicas) + go endpointcontroller.NewEndpointController(cl, controller.NoResyncPeriodFunc). + Run(3, util.NeverStop) // TODO: Write an integration test for the replication controllers watch. - go controllerManager.Run(3, util.NeverStop) + go replicationControllerPkg.NewReplicationManager(cl, controller.NoResyncPeriodFunc, replicationControllerPkg.BurstReplicas). + Run(3, util.NeverStop) nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(), 40*time.Second, 60*time.Second, 5*time.Second, nil, false) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 5691fa2952..6b9b828e80 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -25,6 +25,7 @@ package app import ( "fmt" "io/ioutil" + "math/rand" "net" "net/http" "net/http/pprof" @@ -78,6 +79,7 @@ type CMServer struct { TerminatedPodGCThreshold int HorizontalPodAutoscalerSyncPeriod time.Duration DeploymentControllerSyncPeriod time.Duration + MinResyncPeriod time.Duration RegisterRetryCount int NodeMonitorGracePeriod time.Duration NodeStartupGracePeriod time.Duration @@ -115,6 +117,7 @@ func NewCMServer() *CMServer { PVClaimBinderSyncPeriod: 10 * time.Second, HorizontalPodAutoscalerSyncPeriod: 30 * time.Second, DeploymentControllerSyncPeriod: 30 * time.Second, + MinResyncPeriod: 12 * time.Hour, RegisterRetryCount: 10, PodEvictionTimeout: 5 * time.Minute, ClusterName: "kubernetes", @@ -157,6 +160,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system") fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates") fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims") + fs.DurationVar(&s.MinResyncPeriod, "min-resync-period", s.MinResyncPeriod, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod") fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "pv-recycler-pod-template-filepath-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "The file path to a pod definition used as a template for NFS persistent volume recycling") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "pv-recycler-minimum-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod") fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "pv-recycler-increment-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod") @@ -191,6 +195,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.EnableExperimental, "enable-experimental", s.EnableExperimental, "Enables experimental controllers (requires enabling experimental API on apiserver).") } +func (s *CMServer) resyncPeriod() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor) +} + // Run runs the CMServer. This should never exit. func (s *CMServer) Run(_ []string) error { if s.Kubeconfig == "" && s.Master == "" { @@ -231,14 +240,14 @@ func (s *CMServer) Run(_ []string) error { glog.Fatal(server.ListenAndServe()) }() - endpoints := endpointcontroller.NewEndpointController(kubeClient) - go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) + go endpointcontroller.NewEndpointController(kubeClient, s.resyncPeriod). + Run(s.ConcurrentEndpointSyncs, util.NeverStop) - controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas) - go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go replicationControllerPkg.NewReplicationManager(kubeClient, s.resyncPeriod, replicationControllerPkg.BurstReplicas). + Run(s.ConcurrentRCSyncs, util.NeverStop) if s.TerminatedPodGCThreshold > 0 { - go gc.New(kubeClient, s.TerminatedPodGCThreshold). + go gc.New(kubeClient, s.resyncPeriod, s.TerminatedPodGCThreshold). Run(util.NeverStop) } @@ -269,16 +278,15 @@ func (s *CMServer) Run(_ []string) error { } } - resourceQuotaController := resourcequotacontroller.NewResourceQuotaController(kubeClient) - resourceQuotaController.Run(s.ResourceQuotaSyncPeriod) + resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod) namespacecontroller.NewNamespaceController(kubeClient, s.EnableExperimental, s.NamespaceSyncPeriod).Run() if s.EnableExperimental { - go daemon.NewDaemonSetsController(kubeClient). + go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). Run(s.ConcurrentDSCSyncs, util.NeverStop) - go job.NewJobController(kubeClient). + go job.NewJobController(kubeClient, s.resyncPeriod). Run(s.ConcurrentJobSyncs, util.NeverStop) podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)). diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index 88cb246cfa..3c0a69403e 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -19,9 +19,11 @@ package controllermanager import ( "fmt" "io/ioutil" + "math/rand" "net" "net/http" "strconv" + "time" "k8s.io/kubernetes/cmd/kube-controller-manager/app" client "k8s.io/kubernetes/pkg/client/unversioned" @@ -72,6 +74,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.") } +func (s *CMServer) resyncPeriod() time.Duration { + factor := rand.Float64() + 1 + return time.Duration(float64(time.Hour) * 12.0 * factor) +} + func (s *CMServer) Run(_ []string) error { if s.Kubeconfig == "" && s.Master == "" { glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.") @@ -111,10 +118,10 @@ func (s *CMServer) Run(_ []string) error { endpoints := s.createEndpointController(kubeClient) go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop) - controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas) - go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop) + go replicationcontroller.NewReplicationManager(kubeClient, s.resyncPeriod, replicationcontroller.BurstReplicas). + Run(s.ConcurrentRCSyncs, util.NeverStop) - go daemon.NewDaemonSetsController(kubeClient). + go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod). Run(s.ConcurrentDSCSyncs, util.NeverStop) //TODO(jdef) should eventually support more cloud providers here @@ -203,6 +210,6 @@ func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.En return kmendpoint.NewEndpointController(client) } glog.V(2).Infof("Creating podIP:containerPort endpoint controller") - stockEndpointController := kendpoint.NewEndpointController(client) + stockEndpointController := kendpoint.NewEndpointController(client, s.resyncPeriod) return stockEndpointController } diff --git a/contrib/mesos/pkg/service/endpoints_controller.go b/contrib/mesos/pkg/service/endpoints_controller.go index 95ebf3f108..63cb7be933 100644 --- a/contrib/mesos/pkg/service/endpoints_controller.go +++ b/contrib/mesos/pkg/service/endpoints_controller.go @@ -85,7 +85,7 @@ func NewEndpointController(client *client.Client) *endpointController { }, }, &api.Pod{}, - kservice.PodRelistPeriod, + 5*time.Minute, framework.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, diff --git a/hack/jenkins/e2e.sh b/hack/jenkins/e2e.sh index 45035764b4..e9169e5908 100755 --- a/hack/jenkins/e2e.sh +++ b/hack/jenkins/e2e.sh @@ -353,6 +353,8 @@ case ${JOB_NAME} in NUM_MINIONS="100" # Reduce logs verbosity TEST_CLUSTER_LOG_LEVEL="--v=1" + # Increase resync period to simulate production + TEST_CLUSTER_RESYNC_PERIOD="--min-resync-period=12h" ;; # Runs tests on GCE soak cluster. diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 92cba7ffa4..9b07feb220 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -185,6 +185,7 @@ minion-max-log-size minion-path-override min-pr-number min-request-timeout +min-resync-period namespace-sync-period network-plugin network-plugin-dir diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index aa8a3cb22f..2601b5b9c8 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -54,6 +54,13 @@ var ( KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc ) +type ResyncPeriodFunc func() time.Duration + +// Returns 0 for resyncPeriod in case resyncing is not needed. +func NoResyncPeriodFunc() time.Duration { + return 0 +} + // Expectations are a way for controllers to tell the controller manager what they expect. eg: // ControllerExpectations: { // controller1: expects 2 adds in 2 minutes diff --git a/pkg/controller/daemon/controller.go b/pkg/controller/daemon/controller.go index 575856becf..24690f2bc6 100644 --- a/pkg/controller/daemon/controller.go +++ b/pkg/controller/daemon/controller.go @@ -41,12 +41,6 @@ const ( // Daemon sets will periodically check that their daemon pods are running as expected. FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable. - // Nodes don't need relisting. - FullNodeResyncPeriod = 0 - - // Daemon pods don't need relisting. - FullDaemonPodResyncPeriod = 0 - // We must avoid counting pods until the pod store has synced. If it hasn't synced, to // avoid a hot loop, we'll wait this long between checks. PodStoreSyncedPollPeriod = 100 * time.Millisecond @@ -85,7 +79,7 @@ type DaemonSetsController struct { queue *workqueue.Type } -func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController { +func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -110,6 +104,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &experimental.DaemonSet{}, + // TODO: Can we have much longer period here? FullDaemonSetResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { @@ -141,7 +136,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &api.Pod{}, - FullDaemonPodResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: dsc.addPod, UpdateFunc: dsc.updatePod, @@ -159,7 +154,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController }, }, &api.Node{}, - FullNodeResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: dsc.addNode, UpdateFunc: dsc.updateNode, diff --git a/pkg/controller/daemon/controller_test.go b/pkg/controller/daemon/controller_test.go index 8298bee3eb..bbbc3c7946 100644 --- a/pkg/controller/daemon/controller_test.go +++ b/pkg/controller/daemon/controller_test.go @@ -130,7 +130,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num func newTestController() (*DaemonSetsController, *controller.FakePodControl) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewDaemonSetsController(client) + manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady podControl := &controller.FakePodControl{} manager.podControl = podControl diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index f2332e885e..60ceb6a8fe 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -28,6 +28,7 @@ import ( "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/labels" @@ -45,11 +46,6 @@ const ( // often. Higher numbers = lower CPU/network load; lower numbers = // shorter amount of time before a mistaken endpoint is corrected. FullServiceResyncPeriod = 30 * time.Second - - // We'll keep pod watches open up to this long. In the unlikely case - // that a watch misdelivers info about a pod, it'll take this long for - // that mistake to be rectified. - PodRelistPeriod = 5 * time.Minute ) var ( @@ -57,7 +53,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(client *client.Client) *EndpointController { +func NewEndpointController(client *client.Client, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { e := &EndpointController{ client: client, queue: workqueue.New(), @@ -73,6 +69,7 @@ func NewEndpointController(client *client.Client) *EndpointController { }, }, &api.Service{}, + // TODO: Can we have much longer period here? FullServiceResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: e.enqueueService, @@ -93,7 +90,7 @@ func NewEndpointController(client *client.Client) *EndpointController { }, }, &api.Pod{}, - PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 8ec61c4d04..2d81629f51 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -29,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util" ) @@ -188,7 +189,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}}, @@ -220,7 +221,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.checkLeftoverEndpoints() if e, a := 1, endpoints.queue.Len(); e != a { @@ -248,7 +249,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -276,7 +277,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, Spec: api.ServiceSpec{ @@ -301,7 +302,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -338,7 +339,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 0, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -375,7 +376,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 1) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -416,7 +417,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns}, @@ -456,7 +457,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0) endpoints.serviceStore.Store.Add(&api.Service{ ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault}, @@ -475,7 +476,7 @@ func TestSyncEndpointsItems(t *testing.T) { serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 3, 2, 0) addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found! endpoints.serviceStore.Store.Add(&api.Service{ @@ -517,7 +518,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { serverResponse{http.StatusOK, &api.Endpoints{}}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} endpoints.serviceStore.Store.Add(&api.Service{ @@ -577,7 +578,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }}) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - endpoints := NewEndpointController(client) + endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc) addPods(endpoints.podStore.Store, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} endpoints.serviceStore.Store.Add(&api.Service{ diff --git a/pkg/controller/gc/gc_controller.go b/pkg/controller/gc/gc_controller.go index bec6958d23..0ca5c4c442 100644 --- a/pkg/controller/gc/gc_controller.go +++ b/pkg/controller/gc/gc_controller.go @@ -37,8 +37,7 @@ import ( ) const ( - fullResyncPeriod = 0 - gcCheckPeriod = 20 * time.Second + gcCheckPeriod = 20 * time.Second ) type GCController struct { @@ -49,7 +48,7 @@ type GCController struct { threshold int } -func New(kubeClient client.Interface, threshold int) *GCController { +func New(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -75,7 +74,7 @@ func New(kubeClient client.Interface, threshold int) *GCController { }, }, &api.Pod{}, - fullResyncPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{}, ) return gcc diff --git a/pkg/controller/gc/gc_controller_test.go b/pkg/controller/gc/gc_controller_test.go index e8ee1ffd42..631ab16aa2 100644 --- a/pkg/controller/gc/gc_controller_test.go +++ b/pkg/controller/gc/gc_controller_test.go @@ -24,6 +24,7 @@ import ( "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/testclient" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" ) @@ -98,7 +99,7 @@ func TestGC(t *testing.T) { for i, test := range testCases { client := testclient.NewSimpleFake() - gcc := New(client, test.threshold) + gcc := New(client, controller.NoResyncPeriodFunc, test.threshold) fake := &FakePodControl{} gcc.podControl = fake diff --git a/pkg/controller/job/controller.go b/pkg/controller/job/controller.go index 87ec0c4339..9ace279bff 100644 --- a/pkg/controller/job/controller.go +++ b/pkg/controller/job/controller.go @@ -68,7 +68,7 @@ type JobController struct { queue *workqueue.Type } -func NewJobController(kubeClient client.Interface) *JobController { +func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -93,6 +93,7 @@ func NewJobController(kubeClient client.Interface) *JobController { }, }, &experimental.Job{}, + // TODO: Can we have much longer period here? replicationcontroller.FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: jm.enqueueController, @@ -115,7 +116,7 @@ func NewJobController(kubeClient client.Interface) *JobController { }, }, &api.Pod{}, - replicationcontroller.PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: jm.addPod, UpdateFunc: jm.updatePod, diff --git a/pkg/controller/job/controller_test.go b/pkg/controller/job/controller_test.go index 4789e20e70..93c3f09627 100644 --- a/pkg/controller/job/controller_test.go +++ b/pkg/controller/job/controller_test.go @@ -161,7 +161,7 @@ func TestControllerSyncJob(t *testing.T) { for name, tc := range testCases { // job manager setup client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{Err: tc.podControllerError} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -225,7 +225,7 @@ func TestControllerSyncJob(t *testing.T) { func TestSyncJobDeleted(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -245,7 +245,7 @@ func TestSyncJobDeleted(t *testing.T) { func TestSyncJobUpdateRequeue(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -266,7 +266,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) { func TestJobPodLookup(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady testCases := []struct { job *experimental.Job @@ -346,7 +346,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool { // and checking expectations. func TestSyncJobExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()}) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) fakePodControl := controller.FakePodControl{} manager.podControl = &fakePodControl manager.podStoreSynced = alwaysReady @@ -383,7 +383,7 @@ func TestWatchJobs(t *testing.T) { client := testclient.NewSimpleFake() fakeWatch := watch.NewFake() client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady var testJob experimental.Job @@ -447,7 +447,7 @@ func TestWatchPods(t *testing.T) { client := testclient.NewSimpleFake() fakeWatch := watch.NewFake() client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewJobController(client) + manager := NewJobController(client, controller.NoResyncPeriodFunc) manager.podStoreSynced = alwaysReady // Put one job and one pod into the store diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 1cec063f77..5207f93301 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -55,6 +55,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool, }, }, &api.Namespace{}, + // TODO: Can we have much longer period here? resyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go index d106baa64d..230bff06c9 100644 --- a/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_claim_binder_controller.go @@ -63,6 +63,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time }, }, &api.PersistentVolume{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: binder.addVolume, @@ -80,6 +81,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time }, }, &api.PersistentVolumeClaim{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: binder.addClaim, diff --git a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go index af1f6243d9..65de821ade 100644 --- a/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go +++ b/pkg/controller/persistentvolume/persistentvolume_recycler_controller.go @@ -71,6 +71,7 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du }, }, &api.PersistentVolume{}, + // TODO: Can we have much longer period here? syncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index c7d9ea8a70..945629b9db 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -43,12 +43,6 @@ const ( // happens based on contents in local pod storage. FullControllerResyncPeriod = 30 * time.Second - // If a watch misdelivers info about a pod, it'll take at least this long - // to rectify the number of replicas. Note that dropped deletes are only - // rectified after the expectation times out because we don't know the - // final resting state of the pod. - PodRelistPeriod = 5 * time.Minute - // Realistic value of the burstReplica field for the replication manager based off // performance requirements for kubernetes 1.0. BurstReplicas = 500 @@ -95,7 +89,7 @@ type ReplicationManager struct { } // NewReplicationManager creates a new ReplicationManager. -func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager { +func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicationManager { eventBroadcaster := record.NewBroadcaster() eventBroadcaster.StartLogging(glog.Infof) eventBroadcaster.StartRecordingToSink(kubeClient.Events("")) @@ -121,6 +115,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl }, }, &api.ReplicationController{}, + // TODO: Can we have much longer period here? FullControllerResyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: rm.enqueueController, @@ -161,7 +156,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl }, }, &api.Pod{}, - PodRelistPeriod, + resyncPeriod(), framework.ResourceEventHandlerFuncs{ AddFunc: rm.addPod, // This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill diff --git a/pkg/controller/replication/replication_controller_test.go b/pkg/controller/replication/replication_controller_test.go index 96a91d8638..85ba7724c5 100644 --- a/pkg/controller/replication/replication_controller_test.go +++ b/pkg/controller/replication/replication_controller_test.go @@ -135,7 +135,7 @@ type serverResponse struct { func TestSyncReplicationControllerDoesNothing(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // 2 running pods, a controller with 2 replicas, sync is a no-op @@ -151,7 +151,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) { func TestSyncReplicationControllerDeletes(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -167,7 +167,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) { func TestDeleteFinalStateUnknown(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -199,7 +199,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) { func TestSyncReplicationControllerCreates(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // A controller with 2 replicas and no pods in the store, 2 creates expected @@ -221,7 +221,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Steady state for the replication controller, no Status.Replicas updates expected @@ -263,7 +263,7 @@ func TestControllerUpdateReplicas(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Insufficient number of pods in the system, and Status.Replicas is wrong; @@ -303,7 +303,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -349,7 +349,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) { } func TestPodControllerLookup(t *testing.T) { - manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), BurstReplicas) + manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady testCases := []struct { inRCs []*api.ReplicationController @@ -417,7 +417,7 @@ func TestWatchControllers(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady var testControllerSpec api.ReplicationController @@ -460,7 +460,7 @@ func TestWatchPods(t *testing.T) { fakeWatch := watch.NewFake() client := &testclient.Fake{} client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil)) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady // Put one rc and one pod into the controller's stores @@ -502,7 +502,7 @@ func TestWatchPods(t *testing.T) { } func TestUpdatePods(t *testing.T) { - manager := NewReplicationManager(testclient.NewSimpleFake(), BurstReplicas) + manager := NewReplicationManager(testclient.NewSimpleFake(), controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady received := make(chan string) @@ -561,7 +561,7 @@ func TestControllerUpdateRequeue(t *testing.T) { defer testServer.Close() client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, BurstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -642,7 +642,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) { func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, burstReplicas) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, burstReplicas) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -762,7 +762,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool { func TestRCSyncExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, 2) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2) manager.podStoreSynced = alwaysReady manager.podControl = &fakePodControl @@ -786,7 +786,7 @@ func TestRCSyncExpectations(t *testing.T) { func TestDeleteControllerAndExpectations(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) - manager := NewReplicationManager(client, 10) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10) manager.podStoreSynced = alwaysReady rc := newReplicationController(1) @@ -829,7 +829,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) { func TestRCManagerNotReady(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) fakePodControl := controller.FakePodControl{} - manager := NewReplicationManager(client, 2) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2) manager.podControl = &fakePodControl manager.podStoreSynced = func() bool { return false } @@ -867,7 +867,7 @@ func TestOverlappingRCs(t *testing.T) { client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}) for i := 0; i < 5; i++ { - manager := NewReplicationManager(client, 10) + manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10) manager.podStoreSynced = alwaysReady // Create 10 rcs, shuffled them randomly and insert them into the rc manager's store diff --git a/test/integration/framework/master_utils.go b/test/integration/framework/master_utils.go index 5ce56a7eb1..e6fd83c6c7 100644 --- a/test/integration/framework/master_utils.go +++ b/test/integration/framework/master_utils.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/apiserver" "k8s.io/kubernetes/pkg/client/record" client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/replication" "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/kubectl" @@ -97,7 +98,7 @@ func NewMasterComponents(c *Config) *MasterComponents { } restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Default.Version(), QPS: c.QPS, Burst: c.Burst}) rcStopCh := make(chan struct{}) - controllerManager := replicationcontroller.NewReplicationManager(restClient, c.Burst) + controllerManager := replicationcontroller.NewReplicationManager(restClient, controller.NoResyncPeriodFunc, c.Burst) // TODO: Support events once we can cleanly shutdown an event recorder. controllerManager.SetEventRecorder(&record.FakeRecorder{})