From 72aa1bdd2533ed569c7b720e345d935ca73b03c5 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Wed, 4 Nov 2015 13:48:25 +0100 Subject: [PATCH] Make queue private again in Queuer --- .../components/deleter/deleter_test.go | 131 +--------------- .../pkg/scheduler/queuer/deleter_test.go | 148 ++++++++++++++++++ contrib/mesos/pkg/scheduler/queuer/queuer.go | 28 ++-- 3 files changed, 165 insertions(+), 142 deletions(-) create mode 100644 contrib/mesos/pkg/scheduler/queuer/deleter_test.go diff --git a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go index cfa03db0c3..f9c822073e 100644 --- a/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go +++ b/contrib/mesos/pkg/scheduler/components/deleter/deleter_test.go @@ -14,132 +14,7 @@ See the License for the specific language governing permissions and limitations under the License. */ -package deleter +package deleter_test -import ( - "testing" - - "github.com/stretchr/testify/assert" - "k8s.io/kubernetes/contrib/mesos/pkg/queue" - types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" - "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" - "k8s.io/kubernetes/pkg/api" -) - -func TestDeleteOne_NonexistentPod(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - qr := queuer.New(nil) - assert.Equal(0, len(qr.PodQueue.List())) - d := New(obj, qr) - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - Namespace: api.NamespaceDefault, - }}} - err := d.DeleteOne(pod) - assert.Equal(err, errors.NoSuchPodErr) - obj.AssertExpectations(t) -} - -func TestDeleteOne_PendingPod(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) - if err != nil { - t.Fatalf("failed to create task: %v", err) - } - - // preconditions - qr := queuer.New(nil) - qr.PodQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.PodQueue.List())) - _, found := qr.PodQueue.Get("default/foo") - assert.True(found) - - // exec & post conditions - d := New(obj, qr) - err = d.DeleteOne(pod) - assert.Nil(err) - _, found = qr.PodQueue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.PodQueue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_Running(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - reg := podtask.NewInMemoryRegistry() - obj.On("Tasks").Return(reg) - - pod := &queuer.Pod{Pod: &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "foo", - UID: "foo0", - Namespace: api.NamespaceDefault, - }}} - task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - task.Set(podtask.Launched) - err = reg.Update(task) - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - // preconditions - qr := queuer.New(nil) - qr.PodQueue.Add(pod, queue.ReplaceExisting) - assert.Equal(1, len(qr.PodQueue.List())) - _, found := qr.PodQueue.Get("default/foo") - assert.True(found) - - obj.On("KillTask", task.ID).Return(nil) - - // exec & post conditions - d := New(obj, qr) - err = d.DeleteOne(pod) - assert.Nil(err) - _, found = qr.PodQueue.Get("foo0") - assert.False(found) - assert.Equal(0, len(qr.PodQueue.List())) - obj.AssertExpectations(t) -} - -func TestDeleteOne_badPodNaming(t *testing.T) { - assert := assert.New(t) - obj := &types.MockScheduler{} - pod := &queuer.Pod{Pod: &api.Pod{}} - d := New(obj, queuer.New(nil)) - - err := d.DeleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "foo" - err = d.DeleteOne(pod) - assert.NotNil(err) - - pod.Pod.ObjectMeta.Name = "" - pod.Pod.ObjectMeta.Namespace = "bar" - err = d.DeleteOne(pod) - assert.NotNil(err) - - obj.AssertExpectations(t) -} +// Due to access to private members of Queuer the deleter tests are moved to the +// queuer package. diff --git a/contrib/mesos/pkg/scheduler/queuer/deleter_test.go b/contrib/mesos/pkg/scheduler/queuer/deleter_test.go new file mode 100644 index 0000000000..a6731ad0c1 --- /dev/null +++ b/contrib/mesos/pkg/scheduler/queuer/deleter_test.go @@ -0,0 +1,148 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package queuer + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "k8s.io/kubernetes/contrib/mesos/pkg/queue" + types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors" + "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" + "k8s.io/kubernetes/pkg/api" +) + +// The following deleter tests are here in queuer package because they require +// private access to the Queuer. + +func TestDeleteOne_NonexistentPod(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + qr := New(nil) + assert.Equal(0, len(qr.queue.List())) + d := deleter.New(obj, qr) + pod := &Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + Namespace: api.NamespaceDefault, + }}} + err := d.DeleteOne(pod) + assert.Equal(err, errors.NoSuchPodErr) + obj.AssertExpectations(t) +} + +func TestDeleteOne_PendingPod(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + _, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) + if err != nil { + t.Fatalf("failed to create task: %v", err) + } + + // preconditions + qr := New(nil) + qr.queue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.queue.List())) + _, found := qr.queue.Get("default/foo") + assert.True(found) + + // exec & post conditions + d := deleter.New(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = qr.queue.Get("foo0") + assert.False(found) + assert.Equal(0, len(qr.queue.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_Running(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + reg := podtask.NewInMemoryRegistry() + obj.On("Tasks").Return(reg) + + pod := &Pod{Pod: &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "foo0", + Namespace: api.NamespaceDefault, + }}} + task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + task.Set(podtask.Launched) + err = reg.Update(task) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + // preconditions + qr := New(nil) + qr.queue.Add(pod, queue.ReplaceExisting) + assert.Equal(1, len(qr.queue.List())) + _, found := qr.queue.Get("default/foo") + assert.True(found) + + obj.On("KillTask", task.ID).Return(nil) + + // exec & post conditions + d := deleter.New(obj, qr) + err = d.DeleteOne(pod) + assert.Nil(err) + _, found = qr.queue.Get("foo0") + assert.False(found) + assert.Equal(0, len(qr.queue.List())) + obj.AssertExpectations(t) +} + +func TestDeleteOne_badPodNaming(t *testing.T) { + assert := assert.New(t) + obj := &types.MockScheduler{} + pod := &Pod{Pod: &api.Pod{}} + d := deleter.New(obj, New(nil)) + + err := d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "foo" + err = d.DeleteOne(pod) + assert.NotNil(err) + + pod.Pod.ObjectMeta.Name = "" + pod.Pod.ObjectMeta.Namespace = "bar" + err = d.DeleteOne(pod) + assert.NotNil(err) + + obj.AssertExpectations(t) +} diff --git a/contrib/mesos/pkg/scheduler/queuer/queuer.go b/contrib/mesos/pkg/scheduler/queuer/queuer.go index a29e4d0066..f6256570a8 100644 --- a/contrib/mesos/pkg/scheduler/queuer/queuer.go +++ b/contrib/mesos/pkg/scheduler/queuer/queuer.go @@ -40,16 +40,16 @@ const ( type Queuer struct { lock sync.Mutex // shared by condition variables of this struct - podUpdates queue.FIFO // queue of pod updates to be processed - PodQueue *queue.DelayFIFO // queue of pods to be scheduled + updates queue.FIFO // queue of pod updates to be processed + queue *queue.DelayFIFO // queue of pods to be scheduled deltaCond sync.Cond // pod changes are available for processing unscheduledCond sync.Cond // there are unscheduled pods for processing } -func New(store queue.FIFO) *Queuer { +func New(updates queue.FIFO) *Queuer { q := &Queuer{ - PodQueue: queue.NewDelayFIFO(), - podUpdates: store, + queue: queue.NewDelayFIFO(), + updates: updates, } q.deltaCond.L = &q.lock q.unscheduledCond.L = &q.lock @@ -58,14 +58,14 @@ func New(store queue.FIFO) *Queuer { func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) { mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) { - for _, x := range q.PodQueue.List() { + for _, x := range q.queue.List() { if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { break } } }) mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) { - for _, x := range q.podUpdates.List() { + for _, x := range q.updates.List() { if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil { break } @@ -80,7 +80,7 @@ func (q *Queuer) UpdatesAvailable() { // delete a pod from the to-be-scheduled queue func (q *Queuer) Dequeue(id string) { - q.PodQueue.Delete(id) + q.queue.Delete(id) } // re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that @@ -88,7 +88,7 @@ func (q *Queuer) Dequeue(id string) { func (q *Queuer) Requeue(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - q.PodQueue.Add(pod, queue.KeepExisting) + q.queue.Add(pod, queue.KeepExisting) q.unscheduledCond.Broadcast() } @@ -96,7 +96,7 @@ func (q *Queuer) Requeue(pod *Pod) { func (q *Queuer) Reoffer(pod *Pod) { // use KeepExisting in case the pod has already been updated (can happen if binding fails // due to constraint voilations); we don't want to overwrite a newer entry with stale data. - if q.PodQueue.Offer(pod, queue.KeepExisting) { + if q.queue.Offer(pod, queue.KeepExisting) { q.unscheduledCond.Broadcast() } } @@ -112,7 +112,7 @@ func (q *Queuer) Run(done <-chan struct{}) { for { // limit blocking here for short intervals so that scheduling // may proceed even if there have been no recent pod changes - p := q.podUpdates.Await(enqueuePopTimeout) + p := q.updates.Await(enqueuePopTimeout) if p == nil { signalled := runtime.After(q.deltaCond.Wait) // we've yielded the lock @@ -136,7 +136,7 @@ func (q *Queuer) Run(done <-chan struct{}) { // use ReplaceExisting because we are always pushing the latest state now := time.Now() pod.deadline = &now - if q.PodQueue.Offer(pod, queue.ReplaceExisting) { + if q.queue.Offer(pod, queue.ReplaceExisting) { q.unscheduledCond.Broadcast() log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name) } else { @@ -156,7 +156,7 @@ func (q *Queuer) Yield() *api.Pod { for { // limit blocking here to short intervals so that we don't block the // enqueuer Run() routine for very long - kpod := q.PodQueue.Await(yieldPopTimeout) + kpod := q.queue.Await(yieldPopTimeout) if kpod == nil { signalled := runtime.After(q.unscheduledCond.Wait) // lock is yielded at this point and we're going to wait for either @@ -176,7 +176,7 @@ func (q *Queuer) Yield() *api.Pod { pod := kpod.(*Pod).Pod if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil { log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err) - } else if !q.podUpdates.Poll(podName, queue.POP_EVENT) { + } else if !q.updates.Poll(podName, queue.POP_EVENT) { log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod) } else if recoverAssignedSlave(pod) != "" { // should never happen if enqueuePods is filtering properly