mirror of https://github.com/k3s-io/k3s
parent
e912d5204c
commit
8bc9f5fef7
|
@ -223,7 +223,8 @@ func startComponents(firstManifestURL, secondManifestURL, apiVersion string) (st
|
|||
api.ResourceName(api.ResourceMemory): resource.MustParse("10G"),
|
||||
}}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute)
|
||||
nodeController := nodeControllerPkg.NewNodeController(
|
||||
nil, "", machineList, nodeResources, cl, fakeKubeletClient{}, 10, 5*time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.Run(5*time.Second, true, false)
|
||||
cadvisorInterface := new(cadvisor.Fake)
|
||||
|
||||
|
|
|
@ -58,6 +58,8 @@ type CMServer struct {
|
|||
SyncNodeList bool
|
||||
SyncNodeStatus bool
|
||||
PodEvictionTimeout time.Duration
|
||||
DeletingPodsQps float32
|
||||
DeletingPodsBurst int
|
||||
|
||||
// TODO: Discover these by pinging the host machines, and rip out these params.
|
||||
NodeMilliCPU int64
|
||||
|
@ -104,6 +106,8 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.DurationVar(&s.ResourceQuotaSyncPeriod, "resource_quota_sync_period", s.ResourceQuotaSyncPeriod, "The period for syncing quota usage status in the system")
|
||||
fs.DurationVar(&s.NamespaceSyncPeriod, "namespace_sync_period", s.NamespaceSyncPeriod, "The period for syncing namespace life-cycle updates")
|
||||
fs.DurationVar(&s.PodEvictionTimeout, "pod_eviction_timeout", s.PodEvictionTimeout, "The grace peroid for deleting pods on failed nodes.")
|
||||
fs.Float32Var(&s.DeletingPodsQps, "deleting_pods_qps", 0.1, "Number of nodes per second on which pods are deleted in case of node failure.")
|
||||
fs.IntVar(&s.DeletingPodsBurst, "deleting_pods_burst", 10, "Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.")
|
||||
fs.IntVar(&s.RegisterRetryCount, "register_retry_count", s.RegisterRetryCount, ""+
|
||||
"The number of retries for initial node registration. Retry interval equals node_sync_period.")
|
||||
fs.Var(&s.MachineList, "machines", "List of machines to schedule onto, comma separated.")
|
||||
|
@ -177,7 +181,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(cloud, s.MinionRegexp, s.MachineList, nodeResources,
|
||||
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout)
|
||||
kubeClient, kubeletClient, s.RegisterRetryCount, s.PodEvictionTimeout, util.NewTokenBucketRateLimiter(s.DeletingPodsQps, s.DeletingPodsBurst))
|
||||
nodeController.Run(s.NodeSyncPeriod, s.SyncNodeList, s.SyncNodeStatus)
|
||||
|
||||
resourceQuotaManager := resourcequota.NewResourceQuotaManager(kubeClient)
|
||||
|
|
|
@ -61,6 +61,8 @@ var (
|
|||
nodeMemory = flag.Int64("node_memory", 3*1024*1024*1024, "The amount of memory (in bytes) provisioned on each node")
|
||||
masterServiceNamespace = flag.String("master_service_namespace", api.NamespaceDefault, "The namespace from which the kubernetes master services should be injected into pods")
|
||||
enableProfiling = flag.Bool("profiling", false, "Enable profiling via web interface host:port/debug/pprof/")
|
||||
deletingPodsQps = flag.Float32("deleting_pods_qps", 0.1, "")
|
||||
deletingPodsBurst = flag.Int("deleting_pods_burst", 10, "")
|
||||
)
|
||||
|
||||
type delegateHandler struct {
|
||||
|
@ -128,7 +130,8 @@ func runControllerManager(machineList []string, cl *client.Client, nodeMilliCPU,
|
|||
}
|
||||
kubeClient := &client.HTTPKubeletClient{Client: http.DefaultClient, Port: ports.KubeletPort}
|
||||
|
||||
nodeController := nodeControllerPkg.NewNodeController(nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute)
|
||||
nodeController := nodeControllerPkg.NewNodeController(
|
||||
nil, "", machineList, nodeResources, cl, kubeClient, 10, 5*time.Minute, util.NewTokenBucketRateLimiter(*deletingPodsQps, *deletingPodsBurst))
|
||||
nodeController.Run(10*time.Second, true, true)
|
||||
|
||||
endpoints := service.NewEndpointController(cl)
|
||||
|
|
|
@ -71,15 +71,17 @@ var (
|
|||
)
|
||||
|
||||
type NodeController struct {
|
||||
cloud cloudprovider.Interface
|
||||
matchRE string
|
||||
staticResources *api.NodeResources
|
||||
nodes []string
|
||||
kubeClient client.Interface
|
||||
kubeletClient client.KubeletClient
|
||||
recorder record.EventRecorder
|
||||
registerRetryCount int
|
||||
podEvictionTimeout time.Duration
|
||||
cloud cloudprovider.Interface
|
||||
matchRE string
|
||||
staticResources *api.NodeResources
|
||||
nodes []string
|
||||
kubeClient client.Interface
|
||||
kubeletClient client.KubeletClient
|
||||
recorder record.EventRecorder
|
||||
registerRetryCount int
|
||||
podEvictionTimeout time.Duration
|
||||
deletingPodsRateLimiter util.RateLimiter
|
||||
|
||||
// Method for easy mocking in unittest.
|
||||
lookupIP func(host string) ([]net.IP, error)
|
||||
now func() util.Time
|
||||
|
@ -94,7 +96,8 @@ func NewNodeController(
|
|||
kubeClient client.Interface,
|
||||
kubeletClient client.KubeletClient,
|
||||
registerRetryCount int,
|
||||
podEvictionTimeout time.Duration) *NodeController {
|
||||
podEvictionTimeout time.Duration,
|
||||
deletingPodsRateLimiter util.RateLimiter) *NodeController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
recorder := eventBroadcaster.NewRecorder(api.EventSource{Component: "controllermanager"})
|
||||
if kubeClient != nil {
|
||||
|
@ -104,17 +107,18 @@ func NewNodeController(
|
|||
glog.Infof("No api server defined - no events will be sent to API server.")
|
||||
}
|
||||
return &NodeController{
|
||||
cloud: cloud,
|
||||
matchRE: matchRE,
|
||||
nodes: nodes,
|
||||
staticResources: staticResources,
|
||||
kubeClient: kubeClient,
|
||||
kubeletClient: kubeletClient,
|
||||
recorder: recorder,
|
||||
registerRetryCount: registerRetryCount,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
lookupIP: net.LookupIP,
|
||||
now: util.Now,
|
||||
cloud: cloud,
|
||||
matchRE: matchRE,
|
||||
nodes: nodes,
|
||||
staticResources: staticResources,
|
||||
kubeClient: kubeClient,
|
||||
kubeletClient: kubeletClient,
|
||||
recorder: recorder,
|
||||
registerRetryCount: registerRetryCount,
|
||||
podEvictionTimeout: podEvictionTimeout,
|
||||
deletingPodsRateLimiter: deletingPodsRateLimiter,
|
||||
lookupIP: net.LookupIP,
|
||||
now: util.Now,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -558,13 +562,18 @@ func (nc *NodeController) MonitorNodeStatus() error {
|
|||
if lastReadyCondition.Status == api.ConditionFalse &&
|
||||
nc.now().After(lastReadyCondition.LastTransitionTime.Add(nc.podEvictionTimeout)) {
|
||||
// Node stays in not ready for at least 'podEvictionTimeout' - evict all pods on the unhealthy node.
|
||||
nc.deletePods(node.Name)
|
||||
// Makes sure we are not removing pods from to many nodes in the same time.
|
||||
if nc.deletingPodsRateLimiter.CanAccept() {
|
||||
nc.deletePods(node.Name)
|
||||
}
|
||||
}
|
||||
if lastReadyCondition.Status == api.ConditionUnknown &&
|
||||
nc.now().After(lastReadyCondition.LastProbeTime.Add(nc.podEvictionTimeout-gracePeriod)) {
|
||||
// Same as above. Note however, since condition unknown is posted by node controller, which means we
|
||||
// need to substract monitoring grace period in order to get the real 'podEvictionTimeout'.
|
||||
nc.deletePods(node.Name)
|
||||
if nc.deletingPodsRateLimiter.CanAccept() {
|
||||
nc.deletePods(node.Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -247,7 +247,7 @@ func TestRegisterNodes(t *testing.T) {
|
|||
for _, machine := range item.machines {
|
||||
nodes.Items = append(nodes.Items, *newNode(machine))
|
||||
}
|
||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(nil, "", item.machines, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
err := nodeController.RegisterNodes(&nodes, item.retryCount, time.Millisecond)
|
||||
if !item.expectedFail && err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -332,7 +332,7 @@ func TestCreateGetStaticNodesWithSpec(t *testing.T) {
|
|||
},
|
||||
}
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(nil, "", item.machines, &resources, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
nodes, err := nodeController.GetStaticNodesWithSpec()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -393,7 +393,7 @@ func TestCreateGetCloudNodesWithSpec(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, &api.NodeResources{}, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
nodes, err := nodeController.GetCloudNodesWithSpec()
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -499,7 +499,7 @@ func TestSyncCloudNodes(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -585,7 +585,7 @@ func TestSyncCloudNodesEvictPods(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(item.fakeCloud, item.matchRE, nil, &api.NodeResources{}, item.fakeNodeHandler, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
if err := nodeController.SyncCloudNodes(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
@ -687,7 +687,7 @@ func TestNodeConditionsCheck(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute)
|
||||
nodeController := NewNodeController(nil, "", nil, nil, nil, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
conditions := nodeController.DoCheck(item.node)
|
||||
if !reflect.DeepEqual(item.expectedConditions, conditions) {
|
||||
|
@ -718,7 +718,7 @@ func TestPopulateNodeAddresses(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute)
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, nil, nil, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
result, err := nodeController.PopulateAddresses(item.nodes)
|
||||
// In case of IP querying error, we should continue.
|
||||
if err != nil {
|
||||
|
@ -821,7 +821,7 @@ func TestSyncProbedNodeStatus(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||
nodeController := NewNodeController(item.fakeCloud, ".*", nil, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -924,7 +924,7 @@ func TestSyncProbedNodeStatusTransitionTime(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute)
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||
|
@ -1077,7 +1077,7 @@ func TestSyncProbedNodeStatusEvictPods(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute)
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, item.fakeKubeletClient, 10, 5*time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.lookupIP = func(host string) ([]net.IP, error) { return nil, fmt.Errorf("lookup %v: no such host", host) }
|
||||
if err := nodeController.SyncProbedNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -1235,7 +1235,7 @@ func TestMonitorNodeStatusEvictPods(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout)
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, item.evictionTimeout, util.NewFakeRateLimiter())
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
@ -1417,7 +1417,7 @@ func TestMonitorNodeStatusUpdateStatus(t *testing.T) {
|
|||
}
|
||||
|
||||
for _, item := range table {
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute)
|
||||
nodeController := NewNodeController(nil, "", []string{"node0"}, nil, item.fakeNodeHandler, nil, 10, 5*time.Minute, util.NewFakeRateLimiter())
|
||||
nodeController.now = func() util.Time { return fakeNow }
|
||||
if err := nodeController.MonitorNodeStatus(); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
|
|
|
@ -51,6 +51,12 @@ func NewTokenBucketRateLimiter(qps float32, burst int) RateLimiter {
|
|||
return rate
|
||||
}
|
||||
|
||||
type fakeRateLimiter struct{}
|
||||
|
||||
func NewFakeRateLimiter() RateLimiter {
|
||||
return &fakeRateLimiter{}
|
||||
}
|
||||
|
||||
func newTokenBucketRateLimiterFromTicker(ticker <-chan time.Time, burst int) *tickRateLimiter {
|
||||
if burst < 1 {
|
||||
panic("burst must be a positive integer")
|
||||
|
@ -109,3 +115,11 @@ func (t *tickRateLimiter) increment() {
|
|||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (t *fakeRateLimiter) CanAccept() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (t *fakeRateLimiter) Stop() {}
|
||||
|
||||
func (t *fakeRateLimiter) Accept() {}
|
||||
|
|
Loading…
Reference in New Issue