Make queue private again in Queuer

pull/6/head
Dr. Stefan Schimanski 2015-11-04 13:48:25 +01:00
parent 6f5d40e5de
commit 72aa1bdd25
3 changed files with 165 additions and 142 deletions

View File

@ -14,132 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/
package deleter
package deleter_test
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api"
)
func TestDeleteOne_NonexistentPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
qr := queuer.New(nil)
assert.Equal(0, len(qr.PodQueue.List()))
d := New(obj, qr)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
}}}
err := d.DeleteOne(pod)
assert.Equal(err, errors.NoSuchPodErr)
obj.AssertExpectations(t)
}
func TestDeleteOne_PendingPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("failed to create task: %v", err)
}
// preconditions
qr := queuer.New(nil)
qr.PodQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.PodQueue.List()))
_, found := qr.PodQueue.Get("default/foo")
assert.True(found)
// exec & post conditions
d := New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.PodQueue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.PodQueue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_Running(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &queuer.Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
task.Set(podtask.Launched)
err = reg.Update(task)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// preconditions
qr := queuer.New(nil)
qr.PodQueue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.PodQueue.List()))
_, found := qr.PodQueue.Get("default/foo")
assert.True(found)
obj.On("KillTask", task.ID).Return(nil)
// exec & post conditions
d := New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.PodQueue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.PodQueue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
pod := &queuer.Pod{Pod: &api.Pod{}}
d := New(obj, queuer.New(nil))
err := d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = "foo"
err = d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = ""
pod.Pod.ObjectMeta.Namespace = "bar"
err = d.DeleteOne(pod)
assert.NotNil(err)
obj.AssertExpectations(t)
}
// Due to access to private members of Queuer the deleter tests are moved to the
// queuer package.

View File

@ -0,0 +1,148 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package queuer
import (
"testing"
"github.com/stretchr/testify/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/queue"
types "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/components/deleter"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/errors"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api"
)
// The following deleter tests are here in queuer package because they require
// private access to the Queuer.
func TestDeleteOne_NonexistentPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
qr := New(nil)
assert.Equal(0, len(qr.queue.List()))
d := deleter.New(obj, qr)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
Namespace: api.NamespaceDefault,
}}}
err := d.DeleteOne(pod)
assert.Equal(err, errors.NoSuchPodErr)
obj.AssertExpectations(t)
}
func TestDeleteOne_PendingPod(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
_, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("failed to create task: %v", err)
}
// preconditions
qr := New(nil)
qr.queue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.queue.List()))
_, found := qr.queue.Get("default/foo")
assert.True(found)
// exec & post conditions
d := deleter.New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.queue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.queue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_Running(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
reg := podtask.NewInMemoryRegistry()
obj.On("Tasks").Return(reg)
pod := &Pod{Pod: &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "foo",
UID: "foo0",
Namespace: api.NamespaceDefault,
}}}
task, err := reg.Register(podtask.New(api.NewDefaultContext(), "bar", pod.Pod))
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
task.Set(podtask.Launched)
err = reg.Update(task)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
// preconditions
qr := New(nil)
qr.queue.Add(pod, queue.ReplaceExisting)
assert.Equal(1, len(qr.queue.List()))
_, found := qr.queue.Get("default/foo")
assert.True(found)
obj.On("KillTask", task.ID).Return(nil)
// exec & post conditions
d := deleter.New(obj, qr)
err = d.DeleteOne(pod)
assert.Nil(err)
_, found = qr.queue.Get("foo0")
assert.False(found)
assert.Equal(0, len(qr.queue.List()))
obj.AssertExpectations(t)
}
func TestDeleteOne_badPodNaming(t *testing.T) {
assert := assert.New(t)
obj := &types.MockScheduler{}
pod := &Pod{Pod: &api.Pod{}}
d := deleter.New(obj, New(nil))
err := d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = "foo"
err = d.DeleteOne(pod)
assert.NotNil(err)
pod.Pod.ObjectMeta.Name = ""
pod.Pod.ObjectMeta.Namespace = "bar"
err = d.DeleteOne(pod)
assert.NotNil(err)
obj.AssertExpectations(t)
}

View File

@ -40,16 +40,16 @@ const (
type Queuer struct {
lock sync.Mutex // shared by condition variables of this struct
podUpdates queue.FIFO // queue of pod updates to be processed
PodQueue *queue.DelayFIFO // queue of pods to be scheduled
updates queue.FIFO // queue of pod updates to be processed
queue *queue.DelayFIFO // queue of pods to be scheduled
deltaCond sync.Cond // pod changes are available for processing
unscheduledCond sync.Cond // there are unscheduled pods for processing
}
func New(store queue.FIFO) *Queuer {
func New(updates queue.FIFO) *Queuer {
q := &Queuer{
PodQueue: queue.NewDelayFIFO(),
podUpdates: store,
queue: queue.NewDelayFIFO(),
updates: updates,
}
q.deltaCond.L = &q.lock
q.unscheduledCond.L = &q.lock
@ -58,14 +58,14 @@ func New(store queue.FIFO) *Queuer {
func (q *Queuer) InstallDebugHandlers(mux *http.ServeMux) {
mux.HandleFunc("/debug/scheduler/podqueue", func(w http.ResponseWriter, r *http.Request) {
for _, x := range q.PodQueue.List() {
for _, x := range q.queue.List() {
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
break
}
}
})
mux.HandleFunc("/debug/scheduler/podstore", func(w http.ResponseWriter, r *http.Request) {
for _, x := range q.podUpdates.List() {
for _, x := range q.updates.List() {
if _, err := io.WriteString(w, fmt.Sprintf("%+v\n", x)); err != nil {
break
}
@ -80,7 +80,7 @@ func (q *Queuer) UpdatesAvailable() {
// delete a pod from the to-be-scheduled queue
func (q *Queuer) Dequeue(id string) {
q.PodQueue.Delete(id)
q.queue.Delete(id)
}
// re-add a pod to the to-be-scheduled queue, will not overwrite existing pod data (that
@ -88,7 +88,7 @@ func (q *Queuer) Dequeue(id string) {
func (q *Queuer) Requeue(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
q.PodQueue.Add(pod, queue.KeepExisting)
q.queue.Add(pod, queue.KeepExisting)
q.unscheduledCond.Broadcast()
}
@ -96,7 +96,7 @@ func (q *Queuer) Requeue(pod *Pod) {
func (q *Queuer) Reoffer(pod *Pod) {
// use KeepExisting in case the pod has already been updated (can happen if binding fails
// due to constraint voilations); we don't want to overwrite a newer entry with stale data.
if q.PodQueue.Offer(pod, queue.KeepExisting) {
if q.queue.Offer(pod, queue.KeepExisting) {
q.unscheduledCond.Broadcast()
}
}
@ -112,7 +112,7 @@ func (q *Queuer) Run(done <-chan struct{}) {
for {
// limit blocking here for short intervals so that scheduling
// may proceed even if there have been no recent pod changes
p := q.podUpdates.Await(enqueuePopTimeout)
p := q.updates.Await(enqueuePopTimeout)
if p == nil {
signalled := runtime.After(q.deltaCond.Wait)
// we've yielded the lock
@ -136,7 +136,7 @@ func (q *Queuer) Run(done <-chan struct{}) {
// use ReplaceExisting because we are always pushing the latest state
now := time.Now()
pod.deadline = &now
if q.PodQueue.Offer(pod, queue.ReplaceExisting) {
if q.queue.Offer(pod, queue.ReplaceExisting) {
q.unscheduledCond.Broadcast()
log.V(3).Infof("queued pod for scheduling: %v", pod.Pod.Name)
} else {
@ -156,7 +156,7 @@ func (q *Queuer) Yield() *api.Pod {
for {
// limit blocking here to short intervals so that we don't block the
// enqueuer Run() routine for very long
kpod := q.PodQueue.Await(yieldPopTimeout)
kpod := q.queue.Await(yieldPopTimeout)
if kpod == nil {
signalled := runtime.After(q.unscheduledCond.Wait)
// lock is yielded at this point and we're going to wait for either
@ -176,7 +176,7 @@ func (q *Queuer) Yield() *api.Pod {
pod := kpod.(*Pod).Pod
if podName, err := cache.MetaNamespaceKeyFunc(pod); err != nil {
log.Warningf("yield unable to understand pod object %+v, will skip: %v", pod, err)
} else if !q.podUpdates.Poll(podName, queue.POP_EVENT) {
} else if !q.updates.Poll(podName, queue.POP_EVENT) {
log.V(1).Infof("yield popped a transitioning pod, skipping: %+v", pod)
} else if recoverAssignedSlave(pod) != "" {
// should never happen if enqueuePods is filtering properly