mirror of https://github.com/k3s-io/k3s
rebase and resolve a huge amount of conflicts to keep this up to date (does this commit have more LOC changed than the original implementation? that would be funny...)
parent
f8e4f5aa17
commit
1065872d29
|
@ -64,7 +64,7 @@ type CMServer struct {
|
|||
CloudConfigFile string
|
||||
ConcurrentEndpointSyncs int
|
||||
ConcurrentRCSyncs int
|
||||
ConcurrentDCSyncs int
|
||||
ConcurrentDSCSyncs int
|
||||
ServiceSyncPeriod time.Duration
|
||||
NodeSyncPeriod time.Duration
|
||||
ResourceQuotaSyncPeriod time.Duration
|
||||
|
@ -100,7 +100,7 @@ func NewCMServer() *CMServer {
|
|||
Address: net.ParseIP("127.0.0.1"),
|
||||
ConcurrentEndpointSyncs: 5,
|
||||
ConcurrentRCSyncs: 5,
|
||||
ConcurrentDCSyncs: 2,
|
||||
ConcurrentDSCSyncs: 2,
|
||||
ServiceSyncPeriod: 5 * time.Minute,
|
||||
NodeSyncPeriod: 10 * time.Second,
|
||||
ResourceQuotaSyncPeriod: 10 * time.Second,
|
||||
|
@ -216,8 +216,8 @@ func (s *CMServer) Run(_ []string) error {
|
|||
controllerManager := replicationControllerPkg.NewReplicationManager(kubeClient, replicationControllerPkg.BurstReplicas)
|
||||
go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
|
||||
|
||||
daemonManager := daemon.NewDaemonManager(kubeClient)
|
||||
go daemonManager.Run(s.ConcurrentDCSyncs, util.NeverStop)
|
||||
go daemon.NewDaemonSetsController(kubeClient).
|
||||
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
||||
|
||||
cloud, err := cloudprovider.InitCloudProvider(s.CloudProvider, s.CloudConfigFile)
|
||||
if err != nil {
|
||||
|
|
|
@ -47,6 +47,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
"github.com/prometheus/client_golang/prometheus"
|
||||
"github.com/spf13/pflag"
|
||||
"k8s.io/kubernetes/pkg/controller/daemon"
|
||||
)
|
||||
|
||||
// CMServer is the main context object for the controller manager.
|
||||
|
@ -113,6 +114,9 @@ func (s *CMServer) Run(_ []string) error {
|
|||
controllerManager := replicationcontroller.NewReplicationManager(kubeClient, replicationcontroller.BurstReplicas)
|
||||
go controllerManager.Run(s.ConcurrentRCSyncs, util.NeverStop)
|
||||
|
||||
go daemon.NewDaemonSetsController(kubeClient).
|
||||
Run(s.ConcurrentDSCSyncs, util.NeverStop)
|
||||
|
||||
//TODO(jdef) should eventually support more cloud providers here
|
||||
if s.CloudProvider != mesos.ProviderName {
|
||||
glog.Fatalf("Only provider %v is supported, you specified %v", mesos.ProviderName, s.CloudProvider)
|
||||
|
|
|
@ -26,11 +26,11 @@ import (
|
|||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/latest"
|
||||
"k8s.io/kubernetes/pkg/api/validation"
|
||||
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/expapi"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
)
|
||||
|
@ -215,8 +215,8 @@ func NewControllerExpectations() *ControllerExpectations {
|
|||
type PodControlInterface interface {
|
||||
// CreateReplica creates new replicated pods according to the spec.
|
||||
CreateReplica(namespace string, controller *api.ReplicationController) error
|
||||
// CreateReplicaOnNodes creates a new pod according to the spec, on a specified list of nodes.
|
||||
CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeNames string) error
|
||||
// CreateReplicaOnNode creates a new pod according to the spec on the specified node.
|
||||
CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error
|
||||
// DeletePod deletes the pod identified by podID.
|
||||
DeletePod(namespace string, podID string) error
|
||||
}
|
||||
|
@ -294,13 +294,13 @@ func (r RealPodControl) CreateReplica(namespace string, controller *api.Replicat
|
|||
return nil
|
||||
}
|
||||
|
||||
func (r RealPodControl) CreateReplicaOnNode(namespace string, controller *expapi.DaemonSet, nodeName string) error {
|
||||
desiredLabels := getReplicaLabelSet(controller.Spec.Template)
|
||||
desiredAnnotations, err := getReplicaAnnotationSet(controller.Spec.Template, controller)
|
||||
func (r RealPodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error {
|
||||
desiredLabels := getReplicaLabelSet(ds.Spec.Template)
|
||||
desiredAnnotations, err := getReplicaAnnotationSet(ds.Spec.Template, ds)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
prefix := getReplicaPrefix(controller.Name)
|
||||
prefix := getReplicaPrefix(ds.Name)
|
||||
|
||||
pod := &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -309,19 +309,20 @@ func (r RealPodControl) CreateReplicaOnNode(namespace string, controller *expapi
|
|||
GenerateName: prefix,
|
||||
},
|
||||
}
|
||||
if err := api.Scheme.Convert(&controller.Spec.Template.Spec, &pod.Spec); err != nil {
|
||||
if err := api.Scheme.Convert(&ds.Spec.Template.Spec, &pod.Spec); err != nil {
|
||||
return fmt.Errorf("unable to convert pod template: %v", err)
|
||||
}
|
||||
// if a pod does not have labels then it cannot be controlled by any controller
|
||||
if labels.Set(pod.Labels).AsSelector().Empty() {
|
||||
return fmt.Errorf("unable to create pod replica, no labels")
|
||||
}
|
||||
pod.Spec.NodeName = nodeName
|
||||
if newPod, err := r.KubeClient.Pods(namespace).Create(pod); err != nil {
|
||||
r.Recorder.Eventf(controller, "failedCreate", "Error creating: %v", err)
|
||||
r.Recorder.Eventf(ds, "failedCreate", "Error creating: %v", err)
|
||||
return fmt.Errorf("unable to create pod replica: %v", err)
|
||||
} else {
|
||||
glog.V(4).Infof("Controller %v created pod %v", controller.Name, newPod.Name)
|
||||
r.Recorder.Eventf(controller, "successfulCreate", "Created pod: %v", newPod.Name)
|
||||
glog.V(4).Infof("Controller %v created pod %v", ds.Name, newPod.Name)
|
||||
r.Recorder.Eventf(ds, "successfulCreate", "Created pod: %v", newPod.Name)
|
||||
}
|
||||
|
||||
return nil
|
||||
|
|
|
@ -0,0 +1,497 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
"k8s.io/kubernetes/pkg/client/record"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
// Daemon sets will periodically check that their daemon pods are running as expected.
|
||||
FullDaemonSetResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
|
||||
// Nodes don't need relisting.
|
||||
FullNodeResyncPeriod = 0
|
||||
// Daemon pods don't need relisting.
|
||||
FullDaemonPodResyncPeriod = 0
|
||||
// If sending a status upate to API server fails, we retry a finite number of times.
|
||||
StatusUpdateRetries = 1
|
||||
)
|
||||
|
||||
// DaemonSetsController is responsible for synchronizing DaemonSet objects stored
|
||||
// in the system with actual running pods.
|
||||
type DaemonSetsController struct {
|
||||
kubeClient client.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
// To allow injection of syncDaemonSet for testing.
|
||||
syncHandler func(dsKey string) error
|
||||
// A TTLCache of pod creates/deletes each ds expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
// A store of daemon sets
|
||||
dsStore cache.StoreToDaemonSetLister
|
||||
// A store of pods
|
||||
podStore cache.StoreToPodLister
|
||||
// A store of nodes
|
||||
nodeStore cache.StoreToNodeLister
|
||||
// Watches changes to all daemon sets.
|
||||
dsController *framework.Controller
|
||||
// Watches changes to all pods
|
||||
podController *framework.Controller
|
||||
// Watches changes to all nodes.
|
||||
nodeController *framework.Controller
|
||||
// Daemon sets that need to be synced.
|
||||
queue *workqueue.Type
|
||||
}
|
||||
|
||||
func NewDaemonSetsController(kubeClient client.Interface) *DaemonSetsController {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||
|
||||
dsc := &DaemonSetsController{
|
||||
kubeClient: kubeClient,
|
||||
podControl: controller.RealPodControl{
|
||||
KubeClient: kubeClient,
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon-set"}),
|
||||
},
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
// Manage addition/update of daemon sets.
|
||||
dsc.dsStore.Store, dsc.dsController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).List(labels.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dsc.kubeClient.Experimental().DaemonSets(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&experimental.DaemonSet{},
|
||||
FullDaemonSetResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
ds := obj.(*experimental.DaemonSet)
|
||||
glog.V(4).Infof("Adding daemon set %s", ds.Name)
|
||||
dsc.enqueueDaemonSet(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldDS := old.(*experimental.DaemonSet)
|
||||
glog.V(4).Infof("Updating daemon set %s", oldDS.Name)
|
||||
dsc.enqueueDaemonSet(cur)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
ds := obj.(*experimental.DaemonSet)
|
||||
glog.V(4).Infof("Deleting daemon set %s", ds.Name)
|
||||
dsc.enqueueDaemonSet(obj)
|
||||
},
|
||||
},
|
||||
)
|
||||
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon set to create/delete
|
||||
// more pods until all the effects (expectations) of a daemon set's create/delete have been observed.
|
||||
dsc.podStore.Store, dsc.podController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dsc.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dsc.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
FullDaemonPodResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addPod,
|
||||
UpdateFunc: dsc.updatePod,
|
||||
DeleteFunc: dsc.deletePod,
|
||||
},
|
||||
)
|
||||
// Watch for new nodes or updates to nodes - daemon pods are launched on new nodes, and possibly when labels on nodes change,
|
||||
dsc.nodeStore.Store, dsc.nodeController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dsc.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dsc.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Node{},
|
||||
FullNodeResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: dsc.addNode,
|
||||
UpdateFunc: dsc.updateNode,
|
||||
},
|
||||
)
|
||||
dsc.syncHandler = dsc.syncDaemonSet
|
||||
return dsc
|
||||
}
|
||||
|
||||
// Run begins watching and syncing daemon sets.
|
||||
func (dsc *DaemonSetsController) Run(workers int, stopCh <-chan struct{}) {
|
||||
go dsc.dsController.Run(stopCh)
|
||||
go dsc.podController.Run(stopCh)
|
||||
go dsc.nodeController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go util.Until(dsc.worker, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Daemon Set Controller")
|
||||
dsc.queue.ShutDown()
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) worker() {
|
||||
for {
|
||||
func() {
|
||||
dsKey, quit := dsc.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer dsc.queue.Done(dsKey)
|
||||
err := dsc.syncHandler(dsKey.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing daemon set with key %s: %v", dsKey.(string), err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) enqueueAllDaemonSets() {
|
||||
glog.V(4).Infof("Enqueueing all daemon sets")
|
||||
ds, err := dsc.dsStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Error enqueueing daemon sets: %v", err)
|
||||
return
|
||||
}
|
||||
for i := range ds {
|
||||
dsc.enqueueDaemonSet(&ds[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) enqueueDaemonSet(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
dsc.queue.Add(key)
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) getPodDaemonSet(pod *api.Pod) *experimental.DaemonSet {
|
||||
sets, err := dsc.dsStore.GetPodDaemonSets(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No daemon sets found for pod %v, daemon set controller will avoid syncing", pod.Name)
|
||||
return nil
|
||||
}
|
||||
// More than two items in this list indicates user error. If two daemon
|
||||
// sets overlap, sort by creation timestamp, subsort by name, then pick
|
||||
// the first.
|
||||
glog.Errorf("user error! more than one daemon is selecting pods with labels: %+v", pod.Labels)
|
||||
sort.Sort(byCreationTimestamp(sets))
|
||||
return &sets[0]
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addPod(obj interface{}) {
|
||||
pod := obj.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s added.", pod.Name)
|
||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
|
||||
return
|
||||
}
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
}
|
||||
|
||||
// When a pod is updated, figure out what sets manage it and wake them
|
||||
// up. If the labels of the pod have changed we need to awaken both the old
|
||||
// and new set. old and cur must be *api.Pod types.
|
||||
func (dsc *DaemonSetsController) updatePod(old, cur interface{}) {
|
||||
if api.Semantic.DeepEqual(old, cur) {
|
||||
// A periodic relist will send update events for all known pods.
|
||||
return
|
||||
}
|
||||
curPod := cur.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||
if curDS := dsc.getPodDaemonSet(curPod); curDS != nil {
|
||||
dsc.enqueueDaemonSet(curDS)
|
||||
}
|
||||
oldPod := old.(*api.Pod)
|
||||
// If the labels have not changed, then the daemon set responsible for
|
||||
// the pod is the same as it was before. In that case we have enqueued the daemon
|
||||
// set above, and do not have to enqueue the set again.
|
||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
||||
// It's ok if both oldDS and curDS are the same, because curDS will set
|
||||
// the expectations on its run so oldDS will have no effect.
|
||||
if oldDS := dsc.getPodDaemonSet(oldPod); oldDS != nil {
|
||||
dsc.enqueueDaemonSet(oldDS)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
// the deleted key/value. Note that this value might be stale. If the pod
|
||||
// changed labels the new rc will not be woken up till the periodic resync.
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %+v", obj)
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*api.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if ds := dsc.getPodDaemonSet(pod); ds != nil {
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
|
||||
return
|
||||
}
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
dsc.enqueueDaemonSet(ds)
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) addNode(obj interface{}) {
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
dsc.enqueueAllDaemonSets()
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) updateNode(old, cur interface{}) {
|
||||
oldNode := old.(*api.Node)
|
||||
curNode := cur.(*api.Node)
|
||||
if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) {
|
||||
// A periodic relist will send update events for all known pods.
|
||||
return
|
||||
}
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each ds would only examine the added node (unless it has other work to do, too).
|
||||
dsc.enqueueAllDaemonSets()
|
||||
}
|
||||
|
||||
// getNodesToDaemonSetPods returns a map from nodes to daemon pods (corresponding to ds) running on the nodes.
|
||||
func (dsc *DaemonSetsController) getNodesToDaemonPods(ds *experimental.DaemonSet) (map[string][]*api.Pod, error) {
|
||||
nodeToDaemonPods := make(map[string][]*api.Pod)
|
||||
daemonPods, err := dsc.podStore.Pods(ds.Namespace).List(labels.Set(ds.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
return nodeToDaemonPods, err
|
||||
}
|
||||
for i := range daemonPods.Items {
|
||||
nodeName := daemonPods.Items[i].Spec.NodeName
|
||||
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i])
|
||||
}
|
||||
return nodeToDaemonPods, nil
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) manage(ds *experimental.DaemonSet) {
|
||||
// Find out which nodes are running the daemon pods selected by ds.
|
||||
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err)
|
||||
}
|
||||
|
||||
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
|
||||
// pod. If the node is supposed to run the daemon pod, but isn't, create the daemon pod on the node.
|
||||
nodeList, err := dsc.nodeStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get list of nodes when syncing daemon set %+v: %v", ds, err)
|
||||
}
|
||||
var nodesNeedingDaemonPods, podsToDelete []string
|
||||
for i := range nodeList.Items {
|
||||
// Check if the node satisfies the daemon set's node selector.
|
||||
nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector()
|
||||
shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels))
|
||||
// If the daemon set specifies a node name, check that it matches with nodeName.
|
||||
nodeName := nodeList.Items[i].Name
|
||||
shouldRun = shouldRun && (ds.Spec.Template.Spec.NodeName == "" || ds.Spec.Template.Spec.NodeName == nodeName)
|
||||
daemonPods, isRunning := nodeToDaemonPods[nodeName]
|
||||
if shouldRun && !isRunning {
|
||||
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
||||
nodesNeedingDaemonPods = append(nodesNeedingDaemonPods, nodeName)
|
||||
} else if shouldRun && len(daemonPods) > 1 {
|
||||
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
|
||||
// TODO: sort the daemon pods by creation time, so the the oldest is preserved.
|
||||
for i := 1; i < len(daemonPods); i++ {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
}
|
||||
} else if !shouldRun && isRunning {
|
||||
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
|
||||
for i := range daemonPods {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need to set expectations before creating/deleting pods to avoid race conditions.
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
|
||||
return
|
||||
}
|
||||
dsc.expectations.SetExpectations(dsKey, len(nodesNeedingDaemonPods), len(podsToDelete))
|
||||
|
||||
glog.V(4).Infof("Nodes needing daemon pods for daemon set %s: %+v", ds.Name, nodesNeedingDaemonPods)
|
||||
for i := range nodesNeedingDaemonPods {
|
||||
if err := dsc.podControl.CreateReplicaOnNode(ds.Namespace, ds, nodesNeedingDaemonPods[i]); err != nil {
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.CreationObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Pods to delete for daemon set %s: %+v", ds.Name, podsToDelete)
|
||||
for i := range podsToDelete {
|
||||
if err := dsc.podControl.DeletePod(ds.Namespace, podsToDelete[i]); err != nil {
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for set %q/%q", ds.Namespace, ds.Name)
|
||||
dsc.expectations.DeletionObserved(dsKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func storeDaemonSetStatus(dsClient client.DaemonSetInterface, ds *experimental.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
|
||||
if ds.Status.DesiredNumberScheduled == desiredNumberScheduled && ds.Status.CurrentNumberScheduled == currentNumberScheduled && ds.Status.NumberMisscheduled == numberMisscheduled {
|
||||
return nil
|
||||
}
|
||||
|
||||
var updateErr, getErr error
|
||||
for i := 0; i <= StatusUpdateRetries; i++ {
|
||||
ds.Status.DesiredNumberScheduled = desiredNumberScheduled
|
||||
ds.Status.CurrentNumberScheduled = currentNumberScheduled
|
||||
ds.Status.NumberMisscheduled = numberMisscheduled
|
||||
_, updateErr = dsClient.Update(ds)
|
||||
if updateErr == nil {
|
||||
// successful update
|
||||
return nil
|
||||
}
|
||||
// Update the set with the latest resource version for the next poll
|
||||
if ds, getErr = dsClient.Get(ds.Name); getErr != nil {
|
||||
// If the GET fails we can't trust status.Replicas anymore. This error
|
||||
// is bound to be more interesting than the update failure.
|
||||
return getErr
|
||||
}
|
||||
}
|
||||
return updateErr
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) updateDaemonSetStatus(ds *experimental.DaemonSet) {
|
||||
glog.Infof("Updating daemon set status")
|
||||
nodeToDaemonPods, err := dsc.getNodesToDaemonPods(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting node to daemon pod mapping for daemon set %+v: %v", ds, err)
|
||||
}
|
||||
|
||||
nodeList, err := dsc.nodeStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get list of nodes when updating daemon set %+v: %v", ds, err)
|
||||
}
|
||||
|
||||
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
|
||||
for _, node := range nodeList.Items {
|
||||
nodeSelector := labels.Set(ds.Spec.Template.Spec.NodeSelector).AsSelector()
|
||||
shouldRun := nodeSelector.Matches(labels.Set(node.Labels))
|
||||
numDaemonPods := len(nodeToDaemonPods[node.Name])
|
||||
|
||||
if numDaemonPods > 0 {
|
||||
currentNumberScheduled++
|
||||
}
|
||||
|
||||
if shouldRun {
|
||||
desiredNumberScheduled++
|
||||
} else if numDaemonPods >= 0 {
|
||||
numberMisscheduled++
|
||||
}
|
||||
}
|
||||
|
||||
err = storeDaemonSetStatus(dsc.kubeClient.Experimental().DaemonSets(ds.Namespace), ds, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
|
||||
if err != nil {
|
||||
glog.Errorf("Error storing status for daemon set %+v: %v", ds, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (dsc *DaemonSetsController) syncDaemonSet(key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing daemon set %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
obj, exists, err := dsc.dsStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve ds %v from store: %v", key, err)
|
||||
dsc.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
glog.V(3).Infof("daemon set has been deleted %v", key)
|
||||
dsc.expectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
ds := obj.(*experimental.DaemonSet)
|
||||
|
||||
// Don't process a daemon set until all its creations and deletions have been processed.
|
||||
// For example if daemon set foo asked for 3 new daemon pods in the previous call to manage,
|
||||
// then we do not want to call manage on foo until the daemon pods have been created.
|
||||
dsKey, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", ds, err)
|
||||
return err
|
||||
}
|
||||
dsNeedsSync := dsc.expectations.SatisfiedExpectations(dsKey)
|
||||
if dsNeedsSync {
|
||||
dsc.manage(ds)
|
||||
}
|
||||
|
||||
dsc.updateDaemonSetStatus(ds)
|
||||
return nil
|
||||
}
|
||||
|
||||
// byCreationTimestamp sorts a list by creation timestamp, using their names as a tie breaker.
|
||||
type byCreationTimestamp []experimental.DaemonSet
|
||||
|
||||
func (o byCreationTimestamp) Len() int { return len(o) }
|
||||
func (o byCreationTimestamp) Swap(i, j int) { o[i], o[j] = o[j], o[i] }
|
||||
|
||||
func (o byCreationTimestamp) Less(i, j int) bool {
|
||||
if o[i].CreationTimestamp.Equal(o[j].CreationTimestamp) {
|
||||
return o[i].Name < o[j].Name
|
||||
}
|
||||
return o[i].CreationTimestamp.Before(o[j].CreationTimestamp)
|
||||
}
|
|
@ -0,0 +1,321 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
)
|
||||
|
||||
var (
|
||||
simpleDaemonSetLabel = map[string]string{"name": "simple-daemon", "type": "production"}
|
||||
simpleDaemonSetLabel2 = map[string]string{"name": "simple-daemon", "type": "test"}
|
||||
simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"}
|
||||
simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"}
|
||||
)
|
||||
|
||||
type FakePodControl struct {
|
||||
daemonSet []experimental.DaemonSet
|
||||
deletePodName []string
|
||||
lock sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func init() {
|
||||
api.ForTesting_ReferencesAllowBlankSelfLinks = true
|
||||
}
|
||||
|
||||
func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakePodControl) CreateReplicaOnNode(namespace string, ds *experimental.DaemonSet, nodeName string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
f.daemonSet = append(f.daemonSet, *ds)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
f.deletePodName = append(f.deletePodName, podName)
|
||||
return nil
|
||||
}
|
||||
func (f *FakePodControl) clear() {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.deletePodName = []string{}
|
||||
f.daemonSet = []experimental.DaemonSet{}
|
||||
}
|
||||
|
||||
func newDaemonSet(name string) *experimental.DaemonSet {
|
||||
return &experimental.DaemonSet{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Experimental.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: experimental.DaemonSetSpec{
|
||||
Selector: simpleDaemonSetLabel,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: simpleDaemonSetLabel,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
TerminationMessagePath: api.TerminationMessagePathDefault,
|
||||
ImagePullPolicy: api.PullIfNotPresent,
|
||||
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
|
||||
},
|
||||
},
|
||||
DNSPolicy: api.DNSDefault,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newNode(name string, label map[string]string) *api.Node {
|
||||
return &api.Node{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: label,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) {
|
||||
for i := startIndex; i < startIndex+numNodes; i++ {
|
||||
nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label))
|
||||
}
|
||||
}
|
||||
|
||||
func newPod(podName string, nodeName string, label map[string]string) *api.Pod {
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Default.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
GenerateName: podName,
|
||||
Labels: label,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: nodeName,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
TerminationMessagePath: api.TerminationMessagePathDefault,
|
||||
ImagePullPolicy: api.PullIfNotPresent,
|
||||
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
|
||||
},
|
||||
},
|
||||
DNSPolicy: api.DNSDefault,
|
||||
},
|
||||
}
|
||||
api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta)
|
||||
return pod
|
||||
}
|
||||
|
||||
func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) {
|
||||
for i := 0; i < number; i++ {
|
||||
podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label))
|
||||
}
|
||||
}
|
||||
|
||||
func newTestController() (*DaemonSetsController, *FakePodControl) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Experimental.Version()})
|
||||
manager := NewDaemonSetsController(client)
|
||||
podControl := &FakePodControl{}
|
||||
manager.podControl = podControl
|
||||
return manager, podControl
|
||||
}
|
||||
|
||||
func validateSyncDaemonSets(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
|
||||
if len(fakePodControl.daemonSet) != expectedCreates {
|
||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSet))
|
||||
}
|
||||
if len(fakePodControl.deletePodName) != expectedDeletes {
|
||||
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName))
|
||||
}
|
||||
}
|
||||
|
||||
func syncAndValidateDaemonSets(t *testing.T, manager *DaemonSetsController, ds *experimental.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) {
|
||||
key, err := controller.KeyFunc(ds)
|
||||
if err != nil {
|
||||
t.Errorf("Could not get key for daemon.")
|
||||
}
|
||||
manager.syncHandler(key)
|
||||
validateSyncDaemonSets(t, podControl, expectedCreates, expectedDeletes)
|
||||
}
|
||||
|
||||
// DaemonSets without node selectors should launch pods on every node.
|
||||
func TestSimpleDaemonSetLaunchesPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
ds := newDaemonSet("foo")
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 5, 0)
|
||||
}
|
||||
|
||||
// DaemonSets without node selectors should launch pods on every node.
|
||||
func TestNoNodesDoesNothing(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
ds := newDaemonSet("foo")
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// DaemonSets without node selectors should launch pods on every node.
|
||||
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
manager.nodeStore.Add(newNode("only-node", nil))
|
||||
ds := newDaemonSet("foo")
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// Controller should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
|
||||
func TestDealsWithExistingPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 2)
|
||||
addPods(manager.podStore.Store, "node-3", simpleDaemonSetLabel, 5)
|
||||
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel2, 2)
|
||||
ds := newDaemonSet("foo")
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 2, 5)
|
||||
}
|
||||
|
||||
// Daemon with node selector should launch pods on nodes matching selector.
|
||||
func TestSelectorDaemonLaunchesPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
daemon := newDaemonSet("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dsStore.Add(daemon)
|
||||
syncAndValidateDaemonSets(t, manager, daemon, podControl, 3, 0)
|
||||
}
|
||||
|
||||
// Daemon with node selector should delete pods from nodes that do not satisfy selector.
|
||||
func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
|
||||
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel2, 2)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 1)
|
||||
addPods(manager.podStore.Store, "node-4", simpleDaemonSetLabel, 1)
|
||||
daemon := newDaemonSet("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dsStore.Add(daemon)
|
||||
syncAndValidateDaemonSets(t, manager, daemon, podControl, 5, 4)
|
||||
}
|
||||
|
||||
// DaemonSet with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes.
|
||||
func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
|
||||
addPods(manager.podStore.Store, "node-0", simpleDaemonSetLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel, 3)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonSetLabel2, 2)
|
||||
addPods(manager.podStore.Store, "node-2", simpleDaemonSetLabel, 4)
|
||||
addPods(manager.podStore.Store, "node-6", simpleDaemonSetLabel, 13)
|
||||
addPods(manager.podStore.Store, "node-7", simpleDaemonSetLabel2, 4)
|
||||
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-9", simpleDaemonSetLabel2, 1)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 3, 20)
|
||||
}
|
||||
|
||||
// DaemonSet with node selector which does not match any node labels should not launch pods.
|
||||
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel2
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// DaemonSet with node name should launch pod on node with corresponding name.
|
||||
func TestNameDaemonSetLaunchesPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// DaemonSet with node name that does not exist should not launch pods.
|
||||
func TestBadNameDaemonSetDoesNothing(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeName = "node-10"
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// DaemonSet with node selector, and node name, matching a node, should launch a pod on the node.
|
||||
func TestNameAndSelectorDaemonSetLaunchesPods(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
ds.Spec.Template.Spec.NodeName = "node-6"
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// DaemonSet with node selector that matches some nodes, and node name that matches a different node, should do nothing.
|
||||
func TestInconsistentNameSelectorDaemonSetDoesNothing(t *testing.T) {
|
||||
manager, podControl := newTestController()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
ds := newDaemonSet("foo")
|
||||
ds.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
ds.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager.dsStore.Add(ds)
|
||||
syncAndValidateDaemonSets(t, manager, ds, podControl, 0, 0)
|
||||
}
|
|
@ -1,475 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"time"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/cache"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/record"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/controller/framework"
|
||||
"k8s.io/kubernetes/pkg/expapi"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/util"
|
||||
"k8s.io/kubernetes/pkg/util/workqueue"
|
||||
"k8s.io/kubernetes/pkg/watch"
|
||||
)
|
||||
|
||||
const (
|
||||
// Daemons will periodically check that their daemon pods are running as expected.
|
||||
FullDaemonResyncPeriod = 30 * time.Second // TODO: Figure out if this time seems reasonable.
|
||||
// Nodes don't need relisting.
|
||||
FullNodeResyncPeriod = 0
|
||||
// Daemon pods don't need relisting.
|
||||
FullDaemonPodResyncPeriod = 0
|
||||
// If sending a status upate to API server fails, we retry a finite number of times.
|
||||
StatusUpdateRetries = 1
|
||||
)
|
||||
|
||||
type DaemonManager struct {
|
||||
kubeClient client.Interface
|
||||
podControl controller.PodControlInterface
|
||||
|
||||
// To allow injection of syncDaemon for testing.
|
||||
syncHandler func(dcKey string) error
|
||||
// A TTLCache of pod creates/deletes each dc expects to see
|
||||
expectations controller.ControllerExpectationsInterface
|
||||
// A store of daemons, populated by the podController.
|
||||
dcStore cache.StoreToDaemonSetLister
|
||||
// A store of pods, populated by the podController
|
||||
podStore cache.StoreToPodLister
|
||||
// A store of pods, populated by the podController
|
||||
nodeStore cache.StoreToNodeLister
|
||||
// Watches changes to all pods.
|
||||
dcController *framework.Controller
|
||||
// Watches changes to all pods
|
||||
podController *framework.Controller
|
||||
// Watches changes to all nodes.
|
||||
nodeController *framework.Controller
|
||||
// Controllers that need to be updated.
|
||||
queue *workqueue.Type
|
||||
}
|
||||
|
||||
func NewDaemonManager(kubeClient client.Interface) *DaemonManager {
|
||||
eventBroadcaster := record.NewBroadcaster()
|
||||
eventBroadcaster.StartLogging(glog.Infof)
|
||||
eventBroadcaster.StartRecordingToSink(kubeClient.Events(""))
|
||||
|
||||
dm := &DaemonManager{
|
||||
kubeClient: kubeClient,
|
||||
podControl: controller.RealPodControl{
|
||||
KubeClient: kubeClient,
|
||||
Recorder: eventBroadcaster.NewRecorder(api.EventSource{Component: "daemon"}),
|
||||
},
|
||||
expectations: controller.NewControllerExpectations(),
|
||||
queue: workqueue.New(),
|
||||
}
|
||||
// Manage addition/update of daemon controllers.
|
||||
dm.dcStore.Store, dm.dcController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).List(labels.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dm.kubeClient.Experimental().Daemons(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&expapi.DaemonSet{},
|
||||
FullDaemonResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(obj interface{}) {
|
||||
daemon := obj.(*expapi.DaemonSet)
|
||||
glog.V(4).Infof("Adding daemon %s", daemon.Name)
|
||||
dm.enqueueController(obj)
|
||||
},
|
||||
UpdateFunc: func(old, cur interface{}) {
|
||||
oldDaemon := old.(*expapi.DaemonSet)
|
||||
glog.V(4).Infof("Updating daemon %s", oldDaemon.Name)
|
||||
dm.enqueueController(cur)
|
||||
},
|
||||
DeleteFunc: func(obj interface{}) {
|
||||
daemon := obj.(*expapi.DaemonSet)
|
||||
glog.V(4).Infof("Deleting daemon %s", daemon.Name)
|
||||
dm.enqueueController(obj)
|
||||
},
|
||||
},
|
||||
)
|
||||
// Watch for creation/deletion of pods. The reason we watch is that we don't want a daemon controller to create/delete
|
||||
// more pods until all the effects (expectations) of a daemon controller's create/delete have been observed.
|
||||
dm.podStore.Store, dm.podController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dm.kubeClient.Pods(api.NamespaceAll).List(labels.Everything(), fields.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dm.kubeClient.Pods(api.NamespaceAll).Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Pod{},
|
||||
FullDaemonPodResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: dm.addPod,
|
||||
UpdateFunc: dm.updatePod,
|
||||
DeleteFunc: dm.deletePod,
|
||||
},
|
||||
)
|
||||
// Watch for new nodes or updates to nodes - daemons are launched on new nodes, and possibly when labels on nodes change,
|
||||
dm.nodeStore.Store, dm.nodeController = framework.NewInformer(
|
||||
&cache.ListWatch{
|
||||
ListFunc: func() (runtime.Object, error) {
|
||||
return dm.kubeClient.Nodes().List(labels.Everything(), fields.Everything())
|
||||
},
|
||||
WatchFunc: func(rv string) (watch.Interface, error) {
|
||||
return dm.kubeClient.Nodes().Watch(labels.Everything(), fields.Everything(), rv)
|
||||
},
|
||||
},
|
||||
&api.Node{},
|
||||
FullNodeResyncPeriod,
|
||||
framework.ResourceEventHandlerFuncs{
|
||||
AddFunc: dm.addNode,
|
||||
UpdateFunc: dm.updateNode,
|
||||
DeleteFunc: func(node interface{}) {},
|
||||
},
|
||||
)
|
||||
dm.syncHandler = dm.syncDaemon
|
||||
return dm
|
||||
}
|
||||
|
||||
// Run begins watching and syncing daemons.
|
||||
func (dm *DaemonManager) Run(workers int, stopCh <-chan struct{}) {
|
||||
go dm.dcController.Run(stopCh)
|
||||
go dm.podController.Run(stopCh)
|
||||
go dm.nodeController.Run(stopCh)
|
||||
for i := 0; i < workers; i++ {
|
||||
go util.Until(dm.worker, time.Second, stopCh)
|
||||
}
|
||||
<-stopCh
|
||||
glog.Infof("Shutting down Daemon Controller Manager")
|
||||
dm.queue.ShutDown()
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) worker() {
|
||||
for {
|
||||
func() {
|
||||
key, quit := dm.queue.Get()
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
defer dm.queue.Done(key)
|
||||
err := dm.syncHandler(key.(string))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing daemon controller with key %s: %v", key.(string), err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) enqueueAllDaemons() {
|
||||
glog.V(4).Infof("Enqueueing all daemons")
|
||||
daemons, err := dm.dcStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Error enqueueing daemon controllers: %v", err)
|
||||
return
|
||||
}
|
||||
for i := range daemons {
|
||||
dm.enqueueController(&daemons[i])
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) enqueueController(obj interface{}) {
|
||||
key, err := controller.KeyFunc(obj)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
||||
return
|
||||
}
|
||||
dm.queue.Add(key)
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) getPodDaemon(pod *api.Pod) *expapi.DaemonSet {
|
||||
controllers, err := dm.dcStore.GetPodDaemonSets(pod)
|
||||
if err != nil {
|
||||
glog.V(4).Infof("No controllers found for pod %v, daemon manager will avoid syncing", pod.Name)
|
||||
return nil
|
||||
}
|
||||
return &controllers[0]
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) addPod(obj interface{}) {
|
||||
pod := obj.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s added.", pod.Name)
|
||||
if dc := dm.getPodDaemon(pod); dc != nil {
|
||||
dcKey, err := controller.KeyFunc(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
|
||||
return
|
||||
}
|
||||
dm.expectations.CreationObserved(dcKey)
|
||||
dm.enqueueController(dc)
|
||||
}
|
||||
}
|
||||
|
||||
// When a pod is updated, figure out what controller/s manage it and wake them
|
||||
// up. If the labels of the pod have changed we need to awaken both the old
|
||||
// and new controller. old and cur must be *api.Pod types.
|
||||
func (dm *DaemonManager) updatePod(old, cur interface{}) {
|
||||
if api.Semantic.DeepEqual(old, cur) {
|
||||
// A periodic relist will send update events for all known pods.
|
||||
return
|
||||
}
|
||||
curPod := cur.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s updated.", curPod.Name)
|
||||
if dc := dm.getPodDaemon(curPod); dc != nil {
|
||||
dm.enqueueController(dc)
|
||||
}
|
||||
oldPod := old.(*api.Pod)
|
||||
// If the labels have not changed, then the daemon controller responsible for
|
||||
// the pod is the same as it was before. In that case we have enqueued the daemon
|
||||
// controller above, and do not have to enqueue the controller again.
|
||||
if !reflect.DeepEqual(curPod.Labels, oldPod.Labels) {
|
||||
// If the old and new dc are the same, the first one that syncs
|
||||
// will set expectations preventing any damage from the second.
|
||||
if oldRC := dm.getPodDaemon(oldPod); oldRC != nil {
|
||||
dm.enqueueController(oldRC)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) deletePod(obj interface{}) {
|
||||
pod, ok := obj.(*api.Pod)
|
||||
glog.V(4).Infof("Pod %s deleted.", pod.Name)
|
||||
// When a delete is dropped, the relist will notice a pod in the store not
|
||||
// in the list, leading to the insertion of a tombstone object which contains
|
||||
// the deleted key/value. Note that this value might be stale. If the pod
|
||||
// changed labels the new rc will not be woken up till the periodic resync.
|
||||
if !ok {
|
||||
tombstone, ok := obj.(cache.DeletedFinalStateUnknown)
|
||||
if !ok {
|
||||
glog.Errorf("Couldn't get object from tombstone %+v", obj)
|
||||
return
|
||||
}
|
||||
pod, ok = tombstone.Obj.(*api.Pod)
|
||||
if !ok {
|
||||
glog.Errorf("Tombstone contained object that is not a pod %+v", obj)
|
||||
return
|
||||
}
|
||||
}
|
||||
if dc := dm.getPodDaemon(pod); dc != nil {
|
||||
dcKey, err := controller.KeyFunc(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
|
||||
return
|
||||
}
|
||||
dm.expectations.DeletionObserved(dcKey)
|
||||
dm.enqueueController(dc)
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) addNode(obj interface{}) {
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too).
|
||||
dm.enqueueAllDaemons()
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) updateNode(old, cur interface{}) {
|
||||
oldNode := old.(*api.Node)
|
||||
curNode := cur.(*api.Node)
|
||||
if api.Semantic.DeepEqual(oldNode.Name, curNode.Name) && api.Semantic.DeepEqual(oldNode.Namespace, curNode.Namespace) && api.Semantic.DeepEqual(oldNode.Labels, curNode.Labels) {
|
||||
// A periodic relist will send update events for all known pods.
|
||||
return
|
||||
}
|
||||
// TODO: it'd be nice to pass a hint with these enqueues, so that each dc would only examine the added node (unless it has other work to do, too).
|
||||
dm.enqueueAllDaemons()
|
||||
}
|
||||
|
||||
// getNodesToDaemonPods returns a map from nodes to daemon pods (corresponding to dc) running on the nodes.
|
||||
func (dm *DaemonManager) getNodesToDaemonPods(dc *expapi.DaemonSet) (map[string][]*api.Pod, error) {
|
||||
nodeToDaemonPods := make(map[string][]*api.Pod)
|
||||
daemonPods, err := dm.podStore.Pods(dc.Namespace).List(labels.Set(dc.Spec.Selector).AsSelector())
|
||||
if err != nil {
|
||||
return nodeToDaemonPods, err
|
||||
}
|
||||
for i := range daemonPods.Items {
|
||||
nodeName := daemonPods.Items[i].Spec.NodeName
|
||||
nodeToDaemonPods[nodeName] = append(nodeToDaemonPods[nodeName], &daemonPods.Items[i])
|
||||
}
|
||||
return nodeToDaemonPods, nil
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) manageDaemons(dc *expapi.DaemonSet) {
|
||||
// Find out which nodes are running the daemon pods selected by dc.
|
||||
nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting node to daemon pod mapping for daemon controller %+v: %v", dc, err)
|
||||
}
|
||||
|
||||
// For each node, if the node is running the daemon pod but isn't supposed to, kill the daemon
|
||||
// pod. If the node is supposed to run the daemon, but isn't, create the daemon on the node.
|
||||
nodeList, err := dm.nodeStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get list of nodes when adding daemon controller %+v: %v", dc, err)
|
||||
}
|
||||
var nodesNeedingDaemons, podsToDelete []string
|
||||
for i := range nodeList.Items {
|
||||
// Check if the node satisfies the daemon's node selector.
|
||||
nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector()
|
||||
shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels))
|
||||
// If the daemon specifies a node name, check that it matches with nodeName.
|
||||
nodeName := nodeList.Items[i].Name
|
||||
shouldRun = shouldRun && (dc.Spec.Template.Spec.NodeName == "" || dc.Spec.Template.Spec.NodeName == nodeName)
|
||||
daemonPods, isRunning := nodeToDaemonPods[nodeName]
|
||||
if shouldRun && !isRunning {
|
||||
// If daemon pod is supposed to be running on node, but isn't, create daemon pod.
|
||||
nodesNeedingDaemons = append(nodesNeedingDaemons, nodeName)
|
||||
} else if shouldRun && len(daemonPods) > 1 {
|
||||
// If daemon pod is supposed to be running on node, but more than 1 daemon pod is running, delete the excess daemon pods.
|
||||
// TODO: sort the daemon pods by creation time, so the the oldest is preserved.
|
||||
for i := 1; i < len(daemonPods); i++ {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
}
|
||||
} else if !shouldRun && isRunning {
|
||||
// If daemon pod isn't supposed to run on node, but it is, delete all daemon pods on node.
|
||||
for i := range daemonPods {
|
||||
podsToDelete = append(podsToDelete, daemonPods[i].Name)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We need to set expectations before creating/deleting pods to avoid race conditions.
|
||||
dcKey, err := controller.KeyFunc(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
|
||||
return
|
||||
}
|
||||
dm.expectations.SetExpectations(dcKey, len(nodesNeedingDaemons), len(podsToDelete))
|
||||
|
||||
glog.V(4).Infof("Nodes needing daemons for daemon %s: %+v", dc.Name, nodesNeedingDaemons)
|
||||
for i := range nodesNeedingDaemons {
|
||||
if err := dm.podControl.CreateReplicaOnNode(dc.Namespace, dc, nodesNeedingDaemons[i]); err != nil {
|
||||
glog.V(2).Infof("Failed creation, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name)
|
||||
dm.expectations.CreationObserved(dcKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}
|
||||
|
||||
glog.V(4).Infof("Pods to delete for daemon %s: %+v", dc.Name, podsToDelete)
|
||||
for i := range podsToDelete {
|
||||
if err := dm.podControl.DeletePod(dc.Namespace, podsToDelete[i]); err != nil {
|
||||
glog.V(2).Infof("Failed deletion, decrementing expectations for controller %q/%q", dc.Namespace, dc.Name)
|
||||
dm.expectations.DeletionObserved(dcKey)
|
||||
util.HandleError(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func storeDaemonStatus(dcClient client.DaemonSetInterface, dc *expapi.DaemonSet, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int) error {
|
||||
if dc.Status.DesiredNumberScheduled == desiredNumberScheduled && dc.Status.CurrentNumberScheduled == currentNumberScheduled && dc.Status.NumberMisscheduled == numberMisscheduled {
|
||||
return nil
|
||||
}
|
||||
|
||||
var updateErr, getErr error
|
||||
for i := 0; i <= StatusUpdateRetries; i++ {
|
||||
dc.Status.DesiredNumberScheduled = desiredNumberScheduled
|
||||
dc.Status.CurrentNumberScheduled = currentNumberScheduled
|
||||
dc.Status.NumberMisscheduled = numberMisscheduled
|
||||
_, updateErr := dcClient.Update(dc)
|
||||
if updateErr == nil {
|
||||
return updateErr
|
||||
}
|
||||
// Update the controller with the latest resource version for the next poll
|
||||
if dc, getErr = dcClient.Get(dc.Name); getErr != nil {
|
||||
// If the GET fails we can't trust status.Replicas anymore. This error
|
||||
// is bound to be more interesting than the update failure.
|
||||
return getErr
|
||||
}
|
||||
}
|
||||
// Failed 2 updates one of which was with the latest controller, return the update error
|
||||
return updateErr
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) updateDaemonStatus(dc *expapi.DaemonSet) {
|
||||
glog.Infof("Updating daemon status")
|
||||
nodeToDaemonPods, err := dm.getNodesToDaemonPods(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Error getting node to daemon pod mapping for daemon %+v: %v", dc, err)
|
||||
}
|
||||
|
||||
nodeList, err := dm.nodeStore.List()
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get list of nodes when adding daemon %+v: %v", dc, err)
|
||||
}
|
||||
|
||||
var desiredNumberScheduled, currentNumberScheduled, numberMisscheduled int
|
||||
for i := range nodeList.Items {
|
||||
nodeSelector := labels.Set(dc.Spec.Template.Spec.NodeSelector).AsSelector()
|
||||
shouldRun := nodeSelector.Matches(labels.Set(nodeList.Items[i].Labels))
|
||||
numDaemonPods := len(nodeToDaemonPods[nodeList.Items[i].Name])
|
||||
if shouldRun {
|
||||
desiredNumberScheduled++
|
||||
if numDaemonPods == 1 {
|
||||
currentNumberScheduled++
|
||||
}
|
||||
} else if numDaemonPods >= 1 {
|
||||
numberMisscheduled++
|
||||
}
|
||||
}
|
||||
|
||||
err = storeDaemonStatus(dm.kubeClient.Experimental().Daemons(dc.Namespace), dc, desiredNumberScheduled, currentNumberScheduled, numberMisscheduled)
|
||||
if err != nil {
|
||||
glog.Errorf("Error storing status for daemon %+v: %v", dc, err)
|
||||
}
|
||||
}
|
||||
|
||||
func (dm *DaemonManager) syncDaemon(key string) error {
|
||||
startTime := time.Now()
|
||||
defer func() {
|
||||
glog.V(4).Infof("Finished syncing daemon %q (%v)", key, time.Now().Sub(startTime))
|
||||
}()
|
||||
obj, exists, err := dm.dcStore.Store.GetByKey(key)
|
||||
if err != nil {
|
||||
glog.Infof("Unable to retrieve dc %v from store: %v", key, err)
|
||||
dm.queue.Add(key)
|
||||
return err
|
||||
}
|
||||
if !exists {
|
||||
glog.V(3).Infof("Daemon Controller has been deleted %v", key)
|
||||
dm.expectations.DeleteExpectations(key)
|
||||
return nil
|
||||
}
|
||||
dc := obj.(*expapi.DaemonSet)
|
||||
|
||||
// Don't process a daemon until all its creations and deletions have been processed.
|
||||
// For example if daemon foo asked for 3 new daemon pods in the previous call to manageDaemons,
|
||||
// then we do not want to call manageDaemons on foo until the daemon pods have been created.
|
||||
dcKey, err := controller.KeyFunc(dc)
|
||||
if err != nil {
|
||||
glog.Errorf("Couldn't get key for object %+v: %v", dc, err)
|
||||
return err
|
||||
}
|
||||
dcNeedsSync := dm.expectations.SatisfiedExpectations(dcKey)
|
||||
if dcNeedsSync {
|
||||
dm.manageDaemons(dc)
|
||||
}
|
||||
|
||||
dm.updateDaemonStatus(dc)
|
||||
return nil
|
||||
}
|
|
@ -1,321 +0,0 @@
|
|||
/*
|
||||
Copyright 2015 The Kubernetes Authors All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package daemon
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/cache"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/expapi"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
)
|
||||
|
||||
var (
|
||||
simpleDaemonLabel = map[string]string{"name": "simple-daemon", "type": "production"}
|
||||
simpleDaemonLabel2 = map[string]string{"name": "simple-daemon", "type": "test"}
|
||||
simpleNodeLabel = map[string]string{"color": "blue", "speed": "fast"}
|
||||
simpleNodeLabel2 = map[string]string{"color": "red", "speed": "fast"}
|
||||
)
|
||||
|
||||
type FakePodControl struct {
|
||||
daemonSpec []expapi.DaemonSet
|
||||
deletePodName []string
|
||||
lock sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func init() {
|
||||
api.ForTesting_ReferencesAllowBlankSelfLinks = true
|
||||
}
|
||||
|
||||
func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationController) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
f.daemonSpec = append(f.daemonSpec, *daemon)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakePodControl) DeletePod(namespace string, podName string) error {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
if f.err != nil {
|
||||
return f.err
|
||||
}
|
||||
f.deletePodName = append(f.deletePodName, podName)
|
||||
return nil
|
||||
}
|
||||
func (f *FakePodControl) clear() {
|
||||
f.lock.Lock()
|
||||
defer f.lock.Unlock()
|
||||
f.deletePodName = []string{}
|
||||
f.daemonSpec = []expapi.DaemonSet{}
|
||||
}
|
||||
|
||||
func newDaemon(name string) *expapi.DaemonSet {
|
||||
return &expapi.DaemonSet{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: expapi.DaemonSetSpec{
|
||||
Selector: simpleDaemonLabel,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: simpleDaemonLabel,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
TerminationMessagePath: api.TerminationMessagePathDefault,
|
||||
ImagePullPolicy: api.PullIfNotPresent,
|
||||
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
|
||||
},
|
||||
},
|
||||
DNSPolicy: api.DNSDefault,
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func newNode(name string, label map[string]string) *api.Node {
|
||||
return &api.Node{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: name,
|
||||
Labels: label,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func addNodes(nodeStore cache.Store, startIndex, numNodes int, label map[string]string) {
|
||||
for i := startIndex; i < startIndex+numNodes; i++ {
|
||||
nodeStore.Add(newNode(fmt.Sprintf("node-%d", i), label))
|
||||
}
|
||||
}
|
||||
|
||||
func newPod(podName string, nodeName string, label map[string]string) *api.Pod {
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{APIVersion: testapi.Version()},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
GenerateName: podName,
|
||||
Labels: label,
|
||||
Namespace: api.NamespaceDefault,
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: nodeName,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Image: "foo/bar",
|
||||
TerminationMessagePath: api.TerminationMessagePathDefault,
|
||||
ImagePullPolicy: api.PullIfNotPresent,
|
||||
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults(),
|
||||
},
|
||||
},
|
||||
DNSPolicy: api.DNSDefault,
|
||||
},
|
||||
}
|
||||
api.GenerateName(api.SimpleNameGenerator, &pod.ObjectMeta)
|
||||
return pod
|
||||
}
|
||||
|
||||
func addPods(podStore cache.Store, nodeName string, label map[string]string, number int) {
|
||||
for i := 0; i < number; i++ {
|
||||
podStore.Add(newPod(fmt.Sprintf("%s-", nodeName), nodeName, label))
|
||||
}
|
||||
}
|
||||
|
||||
func makeTestManager() (*DaemonManager, *FakePodControl) {
|
||||
client := client.NewOrDie(&client.Config{Host: "", Version: testapi.Version()})
|
||||
manager := NewDaemonManager(client)
|
||||
podControl := &FakePodControl{}
|
||||
manager.podControl = podControl
|
||||
return manager, podControl
|
||||
}
|
||||
|
||||
func validateSyncDaemons(t *testing.T, fakePodControl *FakePodControl, expectedCreates, expectedDeletes int) {
|
||||
if len(fakePodControl.daemonSpec) != expectedCreates {
|
||||
t.Errorf("Unexpected number of creates. Expected %d, saw %d\n", expectedCreates, len(fakePodControl.daemonSpec))
|
||||
}
|
||||
if len(fakePodControl.deletePodName) != expectedDeletes {
|
||||
t.Errorf("Unexpected number of deletes. Expected %d, saw %d\n", expectedDeletes, len(fakePodControl.deletePodName))
|
||||
}
|
||||
}
|
||||
|
||||
func syncAndValidateDaemons(t *testing.T, manager *DaemonManager, daemon *expapi.DaemonSet, podControl *FakePodControl, expectedCreates, expectedDeletes int) {
|
||||
key, err := controller.KeyFunc(daemon)
|
||||
if err != nil {
|
||||
t.Errorf("Could not get key for daemon.")
|
||||
}
|
||||
manager.syncHandler(key)
|
||||
validateSyncDaemons(t, podControl, expectedCreates, expectedDeletes)
|
||||
}
|
||||
|
||||
// Daemon without node selectors should launch pods on every node.
|
||||
func TestSimpleDaemonLaunchesPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
daemon := newDaemon("foo")
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 5, 0)
|
||||
}
|
||||
|
||||
// Daemon without node selectors should launch pods on every node.
|
||||
func TestNoNodesDoesNothing(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
daemon := newDaemon("foo")
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// Daemon without node selectors should launch pods on every node.
|
||||
func TestOneNodeDaemonLaunchesPod(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
manager.nodeStore.Add(newNode("only-node", nil))
|
||||
daemon := newDaemon("foo")
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// Manager should not create pods on nodes which have daemon pods, and should remove excess pods from nodes that have extra pods.
|
||||
func TestDealsWithExistingPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 2)
|
||||
addPods(manager.podStore.Store, "node-3", simpleDaemonLabel, 5)
|
||||
addPods(manager.podStore.Store, "node-4", simpleDaemonLabel2, 2)
|
||||
daemon := newDaemon("foo")
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 2, 5)
|
||||
}
|
||||
|
||||
// Daemon with node selector should launch pods on nodes matching selector.
|
||||
func TestSelectorDaemonLaunchesPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 3, 0)
|
||||
}
|
||||
|
||||
// Daemon with node selector should delete pods from nodes that do not satisfy selector.
|
||||
func TestSelectorDaemonDeletesUnselectedPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
|
||||
addPods(manager.podStore.Store, "node-0", simpleDaemonLabel2, 2)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 1)
|
||||
addPods(manager.podStore.Store, "node-4", simpleDaemonLabel, 1)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 5, 4)
|
||||
}
|
||||
|
||||
// Daemon with node selector should launch pods on nodes matching selector, but also deal with existing pods on nodes.
|
||||
func TestSelectorDaemonDealsWithExistingPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
addNodes(manager.nodeStore.Store, 5, 5, simpleNodeLabel)
|
||||
addPods(manager.podStore.Store, "node-0", simpleDaemonLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonLabel, 3)
|
||||
addPods(manager.podStore.Store, "node-1", simpleDaemonLabel2, 2)
|
||||
addPods(manager.podStore.Store, "node-2", simpleDaemonLabel, 4)
|
||||
addPods(manager.podStore.Store, "node-6", simpleDaemonLabel, 13)
|
||||
addPods(manager.podStore.Store, "node-7", simpleDaemonLabel2, 4)
|
||||
addPods(manager.podStore.Store, "node-9", simpleDaemonLabel, 1)
|
||||
addPods(manager.podStore.Store, "node-9", simpleDaemonLabel2, 1)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 3, 20)
|
||||
}
|
||||
|
||||
// Daemon with node selector which does not match any node labels should not launch pods.
|
||||
func TestBadSelectorDaemonDoesNothing(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel2
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// Daemon with node name should launch pod on node with corresponding name.
|
||||
func TestNameDaemonLaunchesPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// Daemon with node name that does not exist should not launch pods.
|
||||
func TestBadNameDaemonDoesNothing(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 5, nil)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeName = "node-10"
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
|
||||
}
|
||||
|
||||
// Daemon with node selector, and node name, matching a node, should launch a pod on the node.
|
||||
func TestNameAndSelectorDaemonLaunchesPods(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
daemon.Spec.Template.Spec.NodeName = "node-6"
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 1, 0)
|
||||
}
|
||||
|
||||
// Daemon with node selector that matches some nodes, and node name that matches a different node, should do nothing.
|
||||
func TestInconsistentNameSelectorDaemonDoesNothing(t *testing.T) {
|
||||
manager, podControl := makeTestManager()
|
||||
addNodes(manager.nodeStore.Store, 0, 4, nil)
|
||||
addNodes(manager.nodeStore.Store, 4, 3, simpleNodeLabel)
|
||||
daemon := newDaemon("foo")
|
||||
daemon.Spec.Template.Spec.NodeSelector = simpleNodeLabel
|
||||
daemon.Spec.Template.Spec.NodeName = "node-0"
|
||||
manager.dcStore.Add(daemon)
|
||||
syncAndValidateDaemons(t, manager, daemon, podControl, 0, 0)
|
||||
}
|
|
@ -27,11 +27,11 @@ import (
|
|||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/api/testapi"
|
||||
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||
"k8s.io/kubernetes/pkg/client/cache"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/client/unversioned/testclient"
|
||||
"k8s.io/kubernetes/pkg/controller"
|
||||
"k8s.io/kubernetes/pkg/expapi"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/runtime"
|
||||
"k8s.io/kubernetes/pkg/securitycontext"
|
||||
|
@ -70,7 +70,7 @@ func (f *FakePodControl) CreateReplica(namespace string, spec *api.ReplicationCo
|
|||
return nil
|
||||
}
|
||||
|
||||
func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *expapi.DaemonSet, nodeName string) error {
|
||||
func (f *FakePodControl) CreateReplicaOnNode(namespace string, daemon *experimental.DaemonSet, nodeName string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
|
|
|
@ -21,8 +21,8 @@ import (
|
|||
"time"
|
||||
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/experimental"
|
||||
client "k8s.io/kubernetes/pkg/client/unversioned"
|
||||
"k8s.io/kubernetes/pkg/expapi"
|
||||
"k8s.io/kubernetes/pkg/fields"
|
||||
"k8s.io/kubernetes/pkg/labels"
|
||||
"k8s.io/kubernetes/pkg/util/wait"
|
||||
|
@ -31,8 +31,8 @@ import (
|
|||
. "github.com/onsi/gomega"
|
||||
)
|
||||
|
||||
var _ = Describe("Daemon", func() {
|
||||
f := &Framework{BaseName: "daemons"}
|
||||
var _ = Describe("Daemon set", func() {
|
||||
f := &Framework{BaseName: "daemonsets"}
|
||||
|
||||
BeforeEach(func() {
|
||||
f.beforeEach()
|
||||
|
@ -47,7 +47,7 @@ var _ = Describe("Daemon", func() {
|
|||
})
|
||||
|
||||
It("should launch a daemon pod on every node of the cluster", func() {
|
||||
testDaemons(f)
|
||||
testDaemonSets(f)
|
||||
})
|
||||
})
|
||||
|
||||
|
@ -72,17 +72,13 @@ func clearNodeLabels(c *client.Client) error {
|
|||
}
|
||||
|
||||
func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames []string) func() (bool, error) {
|
||||
// Don't return an error, because returning an error will abort wait.Poll, but
|
||||
// if there's an error, we want to try getting the daemon again.
|
||||
return func() (bool, error) {
|
||||
// Get list of pods satisfying selector.
|
||||
podList, err := f.Client.Pods(f.Namespace.Name).List(labels.Set(selector).AsSelector(), fields.Everything())
|
||||
if err != nil {
|
||||
return false, nil
|
||||
}
|
||||
pods := podList.Items
|
||||
|
||||
// Get a map of node names to number of daemon pods running on the node.
|
||||
nodesToPodCount := make(map[string]int)
|
||||
for _, pod := range pods {
|
||||
nodesToPodCount[pod.Spec.NodeName] += 1
|
||||
|
@ -103,8 +99,6 @@ func checkDaemonPodOnNodes(f *Framework, selector map[string]string, nodeNames [
|
|||
}
|
||||
|
||||
func checkRunningOnAllNodes(f *Framework, selector map[string]string) func() (bool, error) {
|
||||
// Don't return an error, because returning an error will abort wait.Poll, but
|
||||
// if there's an error, we want to try getting the daemon again.
|
||||
return func() (bool, error) {
|
||||
nodeList, err := f.Client.Nodes().List(labels.Everything(), fields.Everything())
|
||||
if err != nil {
|
||||
|
@ -122,21 +116,21 @@ func checkRunningOnNoNodes(f *Framework, selector map[string]string) func() (boo
|
|||
return checkDaemonPodOnNodes(f, selector, make([]string, 0))
|
||||
}
|
||||
|
||||
func testDaemons(f *Framework) {
|
||||
func testDaemonSets(f *Framework) {
|
||||
ns := f.Namespace.Name
|
||||
c := f.Client
|
||||
simpleDaemonName := "simple-daemon"
|
||||
simpleDSName := "simple-daemon-set"
|
||||
image := "gcr.io/google_containers/serve_hostname:1.1"
|
||||
label := map[string]string{"name": simpleDaemonName}
|
||||
label := map[string]string{"name": simpleDSName}
|
||||
retryTimeout := 1 * time.Minute
|
||||
retryInterval := 5 * time.Second
|
||||
|
||||
By(fmt.Sprintf("Creating simple daemon %s", simpleDaemonName))
|
||||
_, err := c.Daemons(ns).Create(&expapi.DaemonSet{
|
||||
By(fmt.Sprintf("Creating simple daemon set %s", simpleDSName))
|
||||
_, err := c.DaemonSets(ns).Create(&experimental.DaemonSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: simpleDaemonName,
|
||||
Name: simpleDSName,
|
||||
},
|
||||
Spec: expapi.DaemonSetSpec{
|
||||
Spec: experimental.DaemonSetSpec{
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Labels: label,
|
||||
|
@ -144,7 +138,7 @@ func testDaemons(f *Framework) {
|
|||
Spec: api.PodSpec{
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: simpleDaemonName,
|
||||
Name: simpleDSName,
|
||||
Image: image,
|
||||
Ports: []api.ContainerPort{{ContainerPort: 9376}},
|
||||
},
|
||||
|
@ -172,15 +166,15 @@ func testDaemons(f *Framework) {
|
|||
err = wait.Poll(retryInterval, retryTimeout, checkRunningOnAllNodes(f, label))
|
||||
Expect(err).NotTo(HaveOccurred(), "error waiting for daemon pod to revive")
|
||||
|
||||
complexDaemonName := "complex-daemon"
|
||||
complexLabel := map[string]string{"name": complexDaemonName}
|
||||
complexDSName := "complex-daemon-set"
|
||||
complexLabel := map[string]string{"name": complexDSName}
|
||||
nodeSelector := map[string]string{"color": "blue"}
|
||||
By(fmt.Sprintf("Creating daemon with a node selector %s", complexDaemonName))
|
||||
_, err = c.Daemons(ns).Create(&expapi.DaemonSet{
|
||||
By(fmt.Sprintf("Creating daemon with a node selector %s", complexDSName))
|
||||
_, err = c.DaemonSets(ns).Create(&experimental.DaemonSet{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: complexDaemonName,
|
||||
Name: complexDSName,
|
||||
},
|
||||
Spec: expapi.DaemonSetSpec{
|
||||
Spec: experimental.DaemonSetSpec{
|
||||
Selector: complexLabel,
|
||||
Template: &api.PodTemplateSpec{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
|
@ -190,7 +184,7 @@ func testDaemons(f *Framework) {
|
|||
NodeSelector: nodeSelector,
|
||||
Containers: []api.Container{
|
||||
{
|
||||
Name: complexDaemonName,
|
||||
Name: complexDSName,
|
||||
Image: image,
|
||||
Ports: []api.ContainerPort{{ContainerPort: 9376}},
|
||||
},
|
Loading…
Reference in New Issue