mirror of https://github.com/k3s-io/k3s
add lookup cache for daemonset
parent
a40f8fb4d8
commit
7e1ab26c06
|
@ -290,7 +290,7 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
|
|||
|
||||
if containsResource(resources, "daemonsets") {
|
||||
glog.Infof("Starting daemon set controller")
|
||||
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s)).
|
||||
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), ResyncPeriod(s), s.LookupCacheSizeForDaemonSet).
|
||||
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
|
||||
}
|
||||
|
||||
|
|
|
@ -55,6 +55,7 @@ func NewCMServer() *CMServer {
|
|||
ConcurrentNamespaceSyncs: 2,
|
||||
LookupCacheSizeForRC: 4096,
|
||||
LookupCacheSizeForRS: 4096,
|
||||
LookupCacheSizeForDaemonSet: 1024,
|
||||
ServiceSyncPeriod: unversioned.Duration{5 * time.Minute},
|
||||
NodeSyncPeriod: unversioned.Duration{10 * time.Second},
|
||||
ResourceQuotaSyncPeriod: unversioned.Duration{5 * time.Minute},
|
||||
|
@ -102,6 +103,7 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) {
|
|||
fs.IntVar(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load")
|
||||
fs.IntVar(&s.LookupCacheSizeForRC, "replication-controller-lookup-cache-size", s.LookupCacheSizeForRC, "The the size of lookup cache for replication controllers. Larger number = more responsive replica management, but more MEM load.")
|
||||
fs.IntVar(&s.LookupCacheSizeForRS, "replicaset-lookup-cache-size", s.LookupCacheSizeForRS, "The the size of lookup cache for replicatsets. Larger number = more responsive replica management, but more MEM load.")
|
||||
fs.IntVar(&s.LookupCacheSizeForDaemonSet, "daemonset-lookup-cache-size", s.LookupCacheSizeForDaemonSet, "The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.")
|
||||
fs.DurationVar(&s.ServiceSyncPeriod.Duration, "service-sync-period", s.ServiceSyncPeriod.Duration, "The period for syncing services with their external load balancers")
|
||||
fs.DurationVar(&s.NodeSyncPeriod.Duration, "node-sync-period", s.NodeSyncPeriod.Duration, ""+
|
||||
"The period for syncing nodes from cloudprovider. Longer periods will result in "+
|
||||
|
|
|
@ -238,7 +238,7 @@ func (s *CMServer) Run(_ []string) error {
|
|||
|
||||
if containsResource(resources, "daemonsets") {
|
||||
glog.Infof("Starting daemon set controller")
|
||||
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod).
|
||||
go daemon.NewDaemonSetsController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "daemon-set-controller")), s.resyncPeriod, s.LookupCacheSizeForDaemonSet).
|
||||
Run(s.ConcurrentDaemonSetSyncs, wait.NeverStop)
|
||||
}
|
||||
|
||||
|
|
|
@ -67,6 +67,7 @@ kube-controller-manager
|
|||
--concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load
|
||||
--concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load
|
||||
--concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load
|
||||
--daemonset-lookup-cache-size=1024: The the size of lookup cache for daemonsets. Larger number = more responsive daemonsets, but more MEM load.
|
||||
--deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter.
|
||||
--deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure.
|
||||
--deployment-controller-sync-period=30s: Period for syncing the deployments.
|
||||
|
@ -107,7 +108,7 @@ kube-controller-manager
|
|||
--terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled.
|
||||
```
|
||||
|
||||
###### Auto generated by spf13/cobra on 25-Feb-2016
|
||||
###### Auto generated by spf13/cobra on 29-Feb-2016
|
||||
|
||||
|
||||
<!-- BEGIN MUNGE: GENERATED_ANALYTICS -->
|
||||
|
|
|
@ -67,6 +67,7 @@ cpu-percent
|
|||
create-external-load-balancer
|
||||
current-release-pr
|
||||
current-replicas
|
||||
daemonset-lookup-cache-size
|
||||
default-container-cpu-limit
|
||||
default-container-mem-limit
|
||||
delay-shutdown
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -415,7 +415,7 @@ type KubeControllerManagerConfiguration struct {
|
|||
// but more CPU (and network) load.
|
||||
ConcurrentDeploymentSyncs int `json:"concurrentDeploymentSyncs"`
|
||||
// concurrentDaemonSetSyncs is the number of daemonset objects that are
|
||||
// allowed to sync concurrently. Larger number = more responsive DaemonSet,
|
||||
// allowed to sync concurrently. Larger number = more responsive daemonset,
|
||||
// but more CPU (and network) load.
|
||||
ConcurrentDaemonSetSyncs int `json:"concurrentDaemonSetSyncs"`
|
||||
// concurrentJobSyncs is the number of job objects that are
|
||||
|
@ -425,12 +425,15 @@ type KubeControllerManagerConfiguration struct {
|
|||
// concurrentNamespaceSyncs is the number of namespace objects that are
|
||||
// allowed to sync concurrently.
|
||||
ConcurrentNamespaceSyncs int `json:"concurrentNamespaceSyncs"`
|
||||
// LookupCacheSizeForRC is the size of lookup cache for replication controllers.
|
||||
// lookupCacheSizeForRC is the size of lookup cache for replication controllers.
|
||||
// Larger number = more responsive replica management, but more MEM load.
|
||||
LookupCacheSizeForRC int `json:"lookupCacheSizeForRC"`
|
||||
// LookupCacheSizeForRS is the size of lookup cache for replicatsets.
|
||||
// lookupCacheSizeForRS is the size of lookup cache for replicatsets.
|
||||
// Larger number = more responsive replica management, but more MEM load.
|
||||
LookupCacheSizeForRS int `json:"lookupCacheSizeForRS"`
|
||||
// lookupCacheSizeForDaemonSet is the size of lookup cache for daemonsets.
|
||||
// Larger number = more responsive daemonset, but more MEM load.
|
||||
LookupCacheSizeForDaemonSet int `json:"lookupCacheSizeForDaemonSet"`
|
||||
// serviceSyncPeriod is the period for syncing services with their external
|
||||
// load balancers.
|
||||
ServiceSyncPeriod unversioned.Duration `json:"serviceSyncPeriod"`
|
||||
|
|
|
@ -22,6 +22,7 @@ import (
|
|||
"sync"
|
||||
"time"
|
||||
|
||||
"fmt"
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||
|
@ -91,11 +92,13 @@ type DaemonSetsController struct {
|
|||
// Added as a member to the struct to allow injection for testing.
|
||||
podStoreSynced func() bool
|
||||
|
||||
lookupCache *controller.MatchingCache
|
||||
|
||||
// Daemon sets that need to be synced.
|
||||
queue *workqueue.Type
|
||||
}
|
||||
|
||||
func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc) *DaemonSetsController {
|
||||
func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod controller.ResyncPeriodFunc, lookupCacheSize int) *DaemonSetsController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
// TODO: remove the wrapper when every clients have moved to use the clientset.
|
||||
|
@ -132,6 +135,22 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
|
|||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldDS := old.(*extensions.DaemonSet)
|
||||
curDS := cur.(*extensions.DaemonSet)
|
||||
// We should invalidate the whole lookup cache if a DS's selector has been updated.
|
||||
//
|
||||
// Imagine that you have two RSs:
|
||||
// * old DS1
|
||||
// * new DS2
|
||||
// You also have a pod that is attached to DS2 (because it doesn't match DS1 selector).
|
||||
// Now imagine that you are changing DS1 selector so that it is now matching that pod,
|
||||
// in such case we must invalidate the whole cache so that pod could be adopted by DS1
|
||||
//
|
||||
// This makes the lookup cache less helpful, but selector update does not happen often,
|
||||
// so it's not a big problem
|
||||
if !reflect.DeepEqual(oldDS.Spec.Selector, curDS.Spec.Selector) {
|
||||
dsc.lookupCache.InvalidateAll()
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
|
||||
dsc.enqueueDaemonSet(cur)
|
||||
},
|
||||
|
@ -180,6 +199,7 @@ func NewDaemonSetsController(kubeClient clientset.Interface, resyncPeriod contro
|
|||
)
|
||||
dsc.syncHandler = dsc.syncDaemonSet
|
||||
dsc.podStoreSynced = dsc.podController.HasSynced
|
||||
dsc.lookupCache = controller.NewMatchingCache(lookupCacheSize)
|
||||
return dsc
|
||||
}
|
||||
|
||||
|
@ -238,6 +258,18 @@ func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) {
|
|||
}
|
||||
|
||||
func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *extensions.DaemonSet {
|
||||
// look up in the cache, if cached and the cache is valid, just return cached value
|
||||
if obj, cached := dsc.lookupCache.GetMatchingObject(pod); cached {
|
||||
ds, ok := obj.(*extensions.DaemonSet)
|
||||
if !ok {
|
||||
// This should not happen
|
||||
glog.Errorf("lookup cache does not retuen a ReplicationController object")
|
||||
return nil
|
||||
}
|
||||
if cached && dsc.isCacheValid(pod, ds) {
|
||||
return ds
|
||||
}
|
||||
}
|
||||
sets, err := dsc.dsStore.GetPodDaemonSets(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name)
|
||||
|
@ -250,9 +282,42 @@ func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *extensions.Daemo
|
|||
glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)
|
||||
sort.Sort(byCreationTimestamp(sets))
|
||||
}
|
||||
|
||||
// update lookup cache
|
||||
dsc.lookupCache.Update(pod, &sets[0])
|
||||
|
||||
return &sets[0]
|
||||
}
|
||||
|
||||
// isCacheValid check if the cache is valid
|
||||
func (dsc *DaemonSetsController) isCacheValid(pod *api.Pod, cachedDS *extensions.DaemonSet) bool {
|
||||
_, exists, err := dsc.dsStore.Get(cachedDS)
|
||||
// ds has been deleted or updated, cache is invalid
|
||||
if err != nil || !exists || !isDaemonSetMatch(pod, cachedDS) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// isDaemonSetMatch take a Pod and DaemonSet, return whether the Pod and DaemonSet are matching
|
||||
// TODO(mqliang): This logic is a copy from GetPodDaemonSets(), remove the duplication
|
||||
func isDaemonSetMatch(pod *api.Pod, ds *extensions.DaemonSet) bool {
|
||||
if ds.Namespace != pod.Namespace {
|
||||
return false
|
||||
}
|
||||
selector, err := unversioned.LabelSelectorAsSelector(ds.Spec.Selector)
|
||||
if err != nil {
|
||||
err = fmt.Errorf("invalid selector: %v", err)
|
||||
return false
|
||||
}
|
||||
|
||||
// If a ReplicaSet with a nil or empty selector creeps in, it should match nothing, not everything.
|
||||
if selector.Empty() || !selector.Matches(labels.Set(pod.Labels)) {
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
pod := obj.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s added.", pod.Name)
|
||||
|
|
|
@ -133,7 +133,7 @@ func addPods(podStore cache.Store, nodeName string, label map[string]string, num
|
|||
|
||||
func newTestController() (*DaemonSetsController, *controller.FakePodControl) {
|
||||
clientset := clientset.NewForConfigOrDie(&restclient.Config{Host: "", ContentConfig: restclient.ContentConfig{GroupVersion: testapi.Default.GroupVersion()}})
|
||||
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc)
|
||||
manager := NewDaemonSetsController(clientset, controller.NoResyncPeriodFunc, 0)
|
||||
manager.podStoreSynced = alwaysReady
|
||||
podControl := &controller.FakePodControl{}
|
||||
manager.podControl = podControl
|
||||
|
|
Loading…
Reference in New Issue