Extend resyncPeriods in controllers in production.

pull/6/head
Wojciech Tyczynski 2015-10-06 11:12:00 +02:00
parent b0d748efb0
commit df79026b79
22 changed files with 109 additions and 89 deletions

View File

@ -61,10 +61,11 @@ SERVICE_CLUSTER_IP_RANGE="10.0.0.0/16" # formerly PORTAL_NET
ENABLE_CLUSTER_MONITORING="${KUBE_ENABLE_CLUSTER_MONITORING:-influxdb}"
TEST_CLUSTER_LOG_LEVEL="${TEST_CLUSTER_LOG_LEVEL:---v=4}"
TEST_CLUSTER_RESYNC_PERIOD="${TEST_CLUSTER_RESYNC_PERIOD:---min-resync-period=3m}"
KUBELET_TEST_ARGS="--max-pods=100 $TEST_CLUSTER_LOG_LEVEL"
APISERVER_TEST_ARGS="--runtime-config=experimental/v1alpha1 ${TEST_CLUSTER_LOG_LEVEL}"
CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}"
CONTROLLER_MANAGER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL} ${TEST_CLUSTER_RESYNC_PERIOD}"
SCHEDULER_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}"
KUBEPROXY_TEST_ARGS="${TEST_CLUSTER_LOG_LEVEL}"

View File

@ -43,6 +43,7 @@ import (
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/endpoint"
"k8s.io/kubernetes/pkg/controller/node"
replicationControllerPkg "k8s.io/kubernetes/pkg/controller/replication"
@ -196,14 +197,13 @@ func startComponents(firstManifestURL, secondManifestURL string) (string, string
eventBroadcaster.StartRecordingToSink(cl.Events(""))
scheduler.New(schedulerConfig).Run()
endpoints := endpointcontroller.NewEndpointController(cl)
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go endpoints.Run(3, util.NeverStop)
controllerManager := replicationControllerPkg.NewReplicationManager(cl, replicationControllerPkg.BurstReplicas)
go endpointcontroller.NewEndpointController(cl, controller.NoResyncPeriodFunc).
Run(3, util.NeverStop)
// TODO: Write an integration test for the replication controllers watch.
go controllerManager.Run(3, util.NeverStop)
go replicationControllerPkg.NewReplicationManager(cl, controller.NoResyncPeriodFunc, replicationControllerPkg.BurstReplicas).
Run(3, util.NeverStop)
nodeController := nodecontroller.NewNodeController(nil, cl, 5*time.Minute, util.NewFakeRateLimiter(), util.NewFakeRateLimiter(),
40*time.Second, 60*time.Second, 5*time.Second, nil, false)

View File

@ -25,6 +25,7 @@ package app
import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"net/http/pprof"
@ -78,6 +79,7 @@ type CMServer struct {
TerminatedPodGCThreshold int
HorizontalPodAutoscalerSyncPeriod time.Duration
DeploymentControllerSyncPeriod time.Duration
MinResyncPeriod time.Duration
RegisterRetryCount int
NodeMonitorGracePeriod time.Duration
NodeStartupGracePeriod time.Duration
@ -115,6 +117,7 @@ func NewCMServer() *CMServer {
PVClaimBinderSyncPeriod: 10 * time.Second,
HorizontalPodAutoscalerSyncPeriod: 30 * time.Second,
DeploymentControllerSyncPeriod: 30 * time.Second,
MinResyncPeriod: 12 * time.Hour,
RegisterRetryCount: 10,
PodEvictionTimeout: 5 * time.Minute,
ClusterName: "kubernetes",
@ -157,6 +160,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource-quota-sync-period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace-sync-period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
fs.DurationVar(&s.PVClaimBinderSyncPeriod, "pvclaimbinder-sync-period", s.PVClaimBinderSyncPeriod, "The period for syncing persistent volumes and persistent volume claims")
fs.DurationVar(&s.MinResyncPeriod, "min-resync-period", s.MinResyncPeriod, "The resync period in reflectors will be random between MinResyncPeriod and 2*MinResyncPeriod")
fs.StringVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "pv-recycler-pod-template-filepath-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerPodTemplateFilePathNFS, "The file path to a pod definition used as a template for NFS persistent volume recycling")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "pv-recycler-minimum-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerMinimumTimeoutNFS, "The minimum ActiveDeadlineSeconds to use for an NFS Recycler pod")
fs.IntVar(&s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "pv-recycler-increment-timeout-nfs", s.VolumeConfigFlags.PersistentVolumeRecyclerIncrementTimeoutNFS, "the increment of time added per Gi to ActiveDeadlineSeconds for an NFS scrubber pod")
@ -191,6 +195,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.EnableExperimental, "enable-experimental", s.EnableExperimental, "Enables experimental controllers (requires enabling experimental API on apiserver).")
}
func (s *CMServer) resyncPeriod() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(s.MinResyncPeriod.Nanoseconds()) * factor)
}
// Run runs the CMServer. This should never exit.
func (s *CMServer) Run(_ []string) error {
if s.Kubeconfig == "" && s.Master == "" {
@ -231,14 +240,14 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatal(server.ListenAndServe())
}()
endpoints := endpointcontroller.NewEndpointController(kubeClient)
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
go endpointcontroller.NewEndpointController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentEndpointSyncs, util.NeverStop)
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas)
go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
go replicationControllerPkg.NewReplicationManager(kubeClient, s.resyncPeriod, replicationControllerPkg.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop)
if s.TerminatedPodGCThreshold > 0 {
go gc.New(kubeClient, s.TerminatedPodGCThreshold).
go gc.New(kubeClient, s.resyncPeriod, s.TerminatedPodGCThreshold).
Run(util.NeverStop)
}
@ -269,16 +278,15 @@ func (s *CMServer) Run(_ []string) error {
}
}
resourceQuotaController := resourcequotacontroller.NewResourceQuotaController(kubeClient)
resourceQuotaController.Run(s.ResourceQuotaSyncPeriod)
resourcequotacontroller.NewResourceQuotaController(kubeClient).Run(s.ResourceQuotaSyncPeriod)
namespacecontroller.NewNamespaceController(kubeClient, s.EnableExperimental, s.NamespaceSyncPeriod).Run()
if s.EnableExperimental {
go daemon.NewDaemonSetsController(kubeClient).
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
go job.NewJobController(kubeClient).
go job.NewJobController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentJobSyncs, util.NeverStop)
podautoscaler.NewHorizontalController(kubeClient, metrics.NewHeapsterMetricsClient(kubeClient)).

View File

@ -19,9 +19,11 @@ package controllermanager
import (
"fmt"
"io/ioutil"
"math/rand"
"net"
"net/http"
"strconv"
"time"
"k8s.io/kubernetes/cmd/kube-controller-manager/app"
client "k8s.io/kubernetes/pkg/client/unversioned"
@ -72,6 +74,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
fs.BoolVar(&s.UseHostPortEndpoints, "host_port_endpoints", s.UseHostPortEndpoints, "Map service endpoints to hostIP:hostPort instead of podIP:containerPort. Default true.")
}
func (s *CMServer) resyncPeriod() time.Duration {
factor := rand.Float64() + 1
return time.Duration(float64(time.Hour) * 12.0 * factor)
}
func (s *CMServer) Run(_ []string) error {
if s.Kubeconfig == "" && s.Master == "" {
glog.Warningf("Neither --kubeconfig nor --master was specified. Using default API client. This might not work.")
@ -111,10 +118,10 @@ func (s *CMServer) Run(_ []string) error {
endpoints := s.createEndpointController(kubeClient)
go endpoints.Run(s.ConcurrentEndpointSyncs, util.NeverStop)
controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas)
go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
go replicationcontroller.NewReplicationManager(kubeClient, s.resyncPeriod, replicationcontroller.BurstReplicas).
Run(s.ConcurrentRCSyncs, util.NeverStop)
go daemon.NewDaemonSetsController(kubeClient).
go daemon.NewDaemonSetsController(kubeClient, s.resyncPeriod).
Run(s.ConcurrentDSCSyncs, util.NeverStop)
//TODO(jdef) should eventually support more cloud providers here
@ -203,6 +210,6 @@ func (s *CMServer) createEndpointController(client *client.Client) kmendpoint.En
return kmendpoint.NewEndpointController(client)
}
glog.V(2).Infof("Creating podIP:containerPort endpoint controller")
stockEndpointController := kendpoint.NewEndpointController(client)
stockEndpointController := kendpoint.NewEndpointController(client, s.resyncPeriod)
return stockEndpointController
}

View File

@ -85,7 +85,7 @@ func NewEndpointController(client *client.Client) *endpointController {
},
},
&api.Pod{},
kservice.PodRelistPeriod,
5*time.Minute,
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,

View File

@ -353,6 +353,8 @@ case ${JOB_NAME} in
NUM_MINIONS="100"
# Reduce logs verbosity
TEST_CLUSTER_LOG_LEVEL="--v=1"
# Increase resync period to simulate production
TEST_CLUSTER_RESYNC_PERIOD="--min-resync-period=12h"
;;
# Runs tests on GCE soak cluster.

View File

@ -185,6 +185,7 @@ minion-max-log-size
minion-path-override
min-pr-number
min-request-timeout
min-resync-period
namespace-sync-period
network-plugin
network-plugin-dir

View File

@ -54,6 +54,13 @@ var (
KeyFunc = framework.DeletionHandlingMetaNamespaceKeyFunc
)
type ResyncPeriodFunc func() time.Duration
// Returns 0 for resyncPeriod in case resyncing is not needed.
func NoResyncPeriodFunc() time.Duration {
return 0
}
// Expectations are a way for controllers to tell the controller manager what they expect. eg:
// ControllerExpectations: {
// controller1: expects 2 adds in 2 minutes

View File

@ -41,12 +41,6 @@ const (
// Daemon sets will periodically check that their daemon pods are running as expected.
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
// Nodes don't need relisting.
FullNodeResyncPeriod = 0
// Daemon pods don't need relisting.
FullDaemonPodResyncPeriod = 0
// We must avoid counting pods until the pod store has synced. If it hasn't synced, to
// avoid a hot loop, we'll wait this long between checks.
PodStoreSyncedPollPeriod = 100 * time.Millisecond
@ -85,7 +79,7 @@ type DaemonSetsController struct {
queue *workqueue.Type
}
func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController {
func NewDaemonSetsController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@ -110,6 +104,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController
},
},
&experimental.DaemonSet{},
// TODO: Can we have much longer period here?
FullDaemonSetResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
@ -141,7 +136,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController
},
},
&api.Pod{},
FullDaemonPodResyncPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: dsc.addPod,
UpdateFunc: dsc.updatePod,
@ -159,7 +154,7 @@ func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController
},
},
&api.Node{},
FullNodeResyncPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: dsc.addNode,
UpdateFunc: dsc.updateNode,

View File

@ -130,7 +130,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewDaemonSetsController(client)
manager := NewDaemonSetsController(client, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
podControl := &controller.FakePodControl{}
manager.podControl = podControl

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/labels"
@ -45,11 +46,6 @@ const (
// often. Higher numbers = lower CPU/network load; lower numbers =
// shorter amount of time before a mistaken endpoint is corrected.
FullServiceResyncPeriod = 30 * time.Second
// We'll keep pod watches open up to this long. In the unlikely case
// that a watch misdelivers info about a pod, it'll take this long for
// that mistake to be rectified.
PodRelistPeriod = 5 * time.Minute
)
var (
@ -57,7 +53,7 @@ var (
)
// NewEndpointController returns a new *EndpointController.
func NewEndpointController(client *client.Client) *EndpointController {
func NewEndpointController(client *client.Client, resyncPeriod controller.ResyncPeriodFunc) *EndpointController {
e := &EndpointController{
client: client,
queue: workqueue.New(),
@ -73,6 +69,7 @@ func NewEndpointController(client *client.Client) *EndpointController {
},
},
&api.Service{},
// TODO: Can we have much longer period here?
FullServiceResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: e.enqueueService,
@ -93,7 +90,7 @@ func NewEndpointController(client *client.Client) *EndpointController {
},
},
&api.Pod{},
PodRelistPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: e.addPod,
UpdateFunc: e.updatePod,

View File

@ -29,6 +29,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util"
)
@ -188,7 +189,7 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{Ports: []api.ServicePort{{Port: 80}}},
@ -220,7 +221,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.checkLeftoverEndpoints()
if e, a := 1, endpoints.queue.Len(); e != a {
@ -248,7 +249,7 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -276,7 +277,7 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
Spec: api.ServiceSpec{
@ -301,7 +302,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@ -338,7 +339,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 0, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@ -375,7 +376,7 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 1, 1, 1)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@ -416,7 +417,7 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: ns},
@ -456,7 +457,7 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, api.NamespaceDefault, 1, 1, 0)
endpoints.serviceStore.Store.Add(&api.Service{
ObjectMeta: api.ObjectMeta{Name: "foo", Namespace: api.NamespaceDefault},
@ -475,7 +476,7 @@ func TestSyncEndpointsItems(t *testing.T) {
serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
addPods(endpoints.podStore.Store, "blah", 5, 2, 0) // make sure these aren't found!
endpoints.serviceStore.Store.Add(&api.Service{
@ -517,7 +518,7 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) {
serverResponse{http.StatusOK, &api.Endpoints{}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 3, 2, 0)
serviceLabels := map[string]string{"foo": "bar"}
endpoints.serviceStore.Store.Add(&api.Service{
@ -577,7 +578,7 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) {
}})
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
endpoints := NewEndpointController(client)
endpoints := NewEndpointController(client, controller.NoResyncPeriodFunc)
addPods(endpoints.podStore.Store, ns, 1, 1, 0)
serviceLabels := map[string]string{"baz": "blah"}
endpoints.serviceStore.Store.Add(&api.Service{

View File

@ -37,8 +37,7 @@ import (
)
const (
fullResyncPeriod = 0
gcCheckPeriod = 20 * time.Second
gcCheckPeriod = 20 * time.Second
)
type GCController struct {
@ -49,7 +48,7 @@ type GCController struct {
threshold int
}
func New(kubeClient client.Interface, threshold int) *GCController {
func New(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, threshold int) *GCController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@ -75,7 +74,7 @@ func New(kubeClient client.Interface, threshold int) *GCController {
},
},
&api.Pod{},
fullResyncPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{},
)
return gcc

View File

@ -24,6 +24,7 @@ import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets"
)
@ -98,7 +99,7 @@ func TestGC(t *testing.T) {
for i, test := range testCases {
client := testclient.NewSimpleFake()
gcc := New(client, test.threshold)
gcc := New(client, controller.NoResyncPeriodFunc, test.threshold)
fake := &FakePodControl{}
gcc.podControl = fake

View File

@ -68,7 +68,7 @@ type JobController struct {
queue *workqueue.Type
}
func NewJobController(kubeClient client.Interface) *JobController {
func NewJobController(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc) *JobController {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@ -93,6 +93,7 @@ func NewJobController(kubeClient client.Interface) *JobController {
},
},
&experimental.Job{},
// TODO: Can we have much longer period here?
replicationcontroller.FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: jm.enqueueController,
@ -115,7 +116,7 @@ func NewJobController(kubeClient client.Interface) *JobController {
},
},
&api.Pod{},
replicationcontroller.PodRelistPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: jm.addPod,
UpdateFunc: jm.updatePod,

View File

@ -161,7 +161,7 @@ func TestControllerSyncJob(t *testing.T) {
for name, tc := range testCases {
// job manager setup
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{Err: tc.podControllerError}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -225,7 +225,7 @@ func TestControllerSyncJob(t *testing.T) {
func TestSyncJobDeleted(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -245,7 +245,7 @@ func TestSyncJobDeleted(t *testing.T) {
func TestSyncJobUpdateRequeue(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -266,7 +266,7 @@ func TestSyncJobUpdateRequeue(t *testing.T) {
func TestJobPodLookup(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
testCases := []struct {
job *experimental.Job
@ -346,7 +346,7 @@ func (fe FakeJobExpectations) SatisfiedExpectations(controllerKey string) bool {
// and checking expectations.
func TestSyncJobExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.GroupAndVersion()})
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
fakePodControl := controller.FakePodControl{}
manager.podControl = &fakePodControl
manager.podStoreSynced = alwaysReady
@ -383,7 +383,7 @@ func TestWatchJobs(t *testing.T) {
client := testclient.NewSimpleFake()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
var testJob experimental.Job
@ -447,7 +447,7 @@ func TestWatchPods(t *testing.T) {
client := testclient.NewSimpleFake()
fakeWatch := watch.NewFake()
client.PrependWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewJobController(client)
manager := NewJobController(client, controller.NoResyncPeriodFunc)
manager.podStoreSynced = alwaysReady
// Put one job and one pod into the store

View File

@ -55,6 +55,7 @@ func NewNamespaceController(kubeClient client.Interface, experimentalMode bool,
},
},
&api.Namespace{},
// TODO: Can we have much longer period here?
resyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {

View File

@ -63,6 +63,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
},
},
&api.PersistentVolume{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addVolume,
@ -80,6 +81,7 @@ func NewPersistentVolumeClaimBinder(kubeClient client.Interface, syncPeriod time
},
},
&api.PersistentVolumeClaim{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: binder.addClaim,

View File

@ -71,6 +71,7 @@ func NewPersistentVolumeRecycler(kubeClient client.Interface, syncPeriod time.Du
},
},
&api.PersistentVolume{},
// TODO: Can we have much longer period here?
syncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {

View File

@ -43,12 +43,6 @@ const (
// happens based on contents in local pod storage.
FullControllerResyncPeriod = 30 * time.Second
// If a watch misdelivers info about a pod, it'll take at least this long
// to rectify the number of replicas. Note that dropped deletes are only
// rectified after the expectation times out because we don't know the
// final resting state of the pod.
PodRelistPeriod = 5 * time.Minute
// Realistic value of the burstReplica field for the replication manager based off
// performance requirements for kubernetes 1.0.
BurstReplicas = 500
@ -95,7 +89,7 @@ type ReplicationManager struct {
}
// NewReplicationManager creates a new ReplicationManager.
func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *ReplicationManager {
func NewReplicationManager(kubeClient client.Interface, resyncPeriod controller.ResyncPeriodFunc, burstReplicas int) *ReplicationManager {
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster.StartLogging(glog.Infof)
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
@ -121,6 +115,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl
},
},
&api.ReplicationController{},
// TODO: Can we have much longer period here?
FullControllerResyncPeriod,
framework.ResourceEventHandlerFuncs{
AddFunc: rm.enqueueController,
@ -161,7 +156,7 @@ func NewReplicationManager(kubeClient client.Interface, burstReplicas int) *Repl
},
},
&api.Pod{},
PodRelistPeriod,
resyncPeriod(),
framework.ResourceEventHandlerFuncs{
AddFunc: rm.addPod,
// This invokes the rc for every pod change, eg: host assignment. Though this might seem like overkill

View File

@ -135,7 +135,7 @@ type serverResponse struct {
func TestSyncReplicationControllerDoesNothing(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// 2 running pods, a controller with 2 replicas, sync is a no-op
@ -151,7 +151,7 @@ func TestSyncReplicationControllerDoesNothing(t *testing.T) {
func TestSyncReplicationControllerDeletes(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -167,7 +167,7 @@ func TestSyncReplicationControllerDeletes(t *testing.T) {
func TestDeleteFinalStateUnknown(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -199,7 +199,7 @@ func TestDeleteFinalStateUnknown(t *testing.T) {
func TestSyncReplicationControllerCreates(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// A controller with 2 replicas and no pods in the store, 2 creates expected
@ -221,7 +221,7 @@ func TestStatusUpdatesWithoutReplicasChange(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Steady state for the replication controller, no Status.Replicas updates expected
@ -263,7 +263,7 @@ func TestControllerUpdateReplicas(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Insufficient number of pods in the system, and Status.Replicas is wrong;
@ -303,7 +303,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -349,7 +349,7 @@ func TestSyncReplicationControllerDormancy(t *testing.T) {
}
func TestPodControllerLookup(t *testing.T) {
manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), BurstReplicas)
manager := NewReplicationManager(client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()}), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
testCases := []struct {
inRCs []*api.ReplicationController
@ -417,7 +417,7 @@ func TestWatchControllers(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
var testControllerSpec api.ReplicationController
@ -460,7 +460,7 @@ func TestWatchPods(t *testing.T) {
fakeWatch := watch.NewFake()
client := &testclient.Fake{}
client.AddWatchReactor("*", testclient.DefaultWatchReactor(fakeWatch, nil))
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
// Put one rc and one pod into the controller's stores
@ -502,7 +502,7 @@ func TestWatchPods(t *testing.T) {
}
func TestUpdatePods(t *testing.T) {
manager := NewReplicationManager(testclient.NewSimpleFake(), BurstReplicas)
manager := NewReplicationManager(testclient.NewSimpleFake(), controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
received := make(chan string)
@ -561,7 +561,7 @@ func TestControllerUpdateRequeue(t *testing.T) {
defer testServer.Close()
client := client.NewOrDie(&client.Config{Host: testServer.URL, Version: testapi.Default.Version()})
manager := NewReplicationManager(client, BurstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, BurstReplicas)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@ -642,7 +642,7 @@ func TestControllerUpdateStatusWithFailure(t *testing.T) {
func doTestControllerBurstReplicas(t *testing.T, burstReplicas, numReplicas int) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, burstReplicas)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, burstReplicas)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -762,7 +762,7 @@ func (fe FakeRCExpectations) SatisfiedExpectations(controllerKey string) bool {
func TestRCSyncExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, 2)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2)
manager.podStoreSynced = alwaysReady
manager.podControl = &fakePodControl
@ -786,7 +786,7 @@ func TestRCSyncExpectations(t *testing.T) {
func TestDeleteControllerAndExpectations(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
manager := NewReplicationManager(client, 10)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10)
manager.podStoreSynced = alwaysReady
rc := newReplicationController(1)
@ -829,7 +829,7 @@ func TestDeleteControllerAndExpectations(t *testing.T) {
func TestRCManagerNotReady(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
fakePodControl := controller.FakePodControl{}
manager := NewReplicationManager(client, 2)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 2)
manager.podControl = &fakePodControl
manager.podStoreSynced = func() bool { return false }
@ -867,7 +867,7 @@ func TestOverlappingRCs(t *testing.T) {
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Default.Version()})
for i := 0; i < 5; i++ {
manager := NewReplicationManager(client, 10)
manager := NewReplicationManager(client, controller.NoResyncPeriodFunc, 10)
manager.podStoreSynced = alwaysReady
// Create 10 rcs, shuffled them randomly and insert them into the rc manager's store

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/apiserver"
"k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/replication"
"k8s.io/kubernetes/pkg/fields"
"k8s.io/kubernetes/pkg/kubectl"
@ -97,7 +98,7 @@ func NewMasterComponents(c *Config) *MasterComponents {
}
restClient := client.NewOrDie(&client.Config{Host: s.URL, Version: testapi.Default.Version(), QPS: c.QPS, Burst: c.Burst})
rcStopCh := make(chan struct{})
controllerManager := replicationcontroller.NewReplicationManager(restClient, c.Burst)
controllerManager := replicationcontroller.NewReplicationManager(restClient, controller.NoResyncPeriodFunc, c.Burst)
// TODO: Support events once we can cleanly shutdown an event recorder.
controllerManager.SetEventRecorder(&record.FakeRecorder{})