Move fcfs and types to scheduler.algorithm module

pull/6/head
Dr. Stefan Schimanski 2015-10-25 11:52:11 -07:00
parent 985ebecd8c
commit beaaf81ee3
7 changed files with 34 additions and 27 deletions

View File

@ -101,5 +101,5 @@ func (fps *fcfsPodScheduler) SchedulePod(r offers.Registry, unused SlaveIndex, t
return nil, err return nil, err
} }
log.V(2).Infof("failed to find a fit for pod: %s", podName) log.V(2).Infof("failed to find a fit for pod: %s", podName)
return nil, noSuitableOffersErr return nil, NoSuitableOffersErr
} }

View File

@ -51,11 +51,9 @@ type PodScheduler interface {
type empty struct{} type empty struct{}
var ( var (
noSuitableOffersErr = errors.New("No suitable offers for pod/task") NoSuitableOffersErr = errors.New("No suitable offers for pod/task")
noSuchPodErr = errors.New("No such pod exists")
noSuchTaskErr = errors.New("No such task exists")
) )
type SlaveIndex interface { type SlaveIndex interface {
slaveHostNameFor(id string) string SlaveHostNameFor(id string) string
} }

View File

@ -23,6 +23,7 @@ import (
mesos "github.com/mesos/mesos-go/mesosproto" mesos "github.com/mesos/mesos-go/mesosproto"
"github.com/stretchr/testify/mock" "github.com/stretchr/testify/mock"
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
) )
@ -33,7 +34,7 @@ type MockScheduler struct {
mock.Mock mock.Mock
} }
func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) { func (m *MockScheduler) SlaveHostNameFor(id string) (hostName string) {
args := m.Called(id) args := m.Called(id)
x := args.Get(0) x := args.Get(0)
if x != nil { if x != nil {
@ -41,11 +42,11 @@ func (m *MockScheduler) slaveHostNameFor(id string) (hostName string) {
} }
return return
} }
func (m *MockScheduler) algorithm() (f PodScheduler) { func (m *MockScheduler) algorithm() (f malgorithm.PodScheduler) {
args := m.Called() args := m.Called()
x := args.Get(0) x := args.Get(0)
if x != nil { if x != nil {
f = x.(PodScheduler) f = x.(malgorithm.PodScheduler)
} }
return return
} }
@ -86,8 +87,8 @@ func (m *MockScheduler) launchTask(task *podtask.T) error {
// @deprecated this is a placeholder for me to test the mock package // @deprecated this is a placeholder for me to test the mock package
func TestNoSlavesYet(t *testing.T) { func TestNoSlavesYet(t *testing.T) {
obj := &MockScheduler{} obj := &MockScheduler{}
obj.On("slaveHostNameFor", "foo").Return(nil) obj.On("SlaveHostNameFor", "foo").Return(nil)
obj.slaveHostNameFor("foo") obj.SlaveHostNameFor("foo")
obj.AssertExpectations(t) obj.AssertExpectations(t)
} }

View File

@ -17,6 +17,7 @@ limitations under the License.
package scheduler package scheduler
import ( import (
"errors"
"fmt" "fmt"
"net/http" "net/http"
"sync" "sync"
@ -29,10 +30,11 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/offers" "k8s.io/kubernetes/contrib/mesos/pkg/offers"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/podtask"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/queuer"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" apierrors "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
@ -44,19 +46,22 @@ import (
const ( const (
pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling pluginRecoveryDelay = 100 * time.Millisecond // delay after scheduler plugin crashes, before we resume scheduling
)
const (
FailedScheduling = "FailedScheduling" FailedScheduling = "FailedScheduling"
Scheduled = "Scheduled" Scheduled = "Scheduled"
) )
var (
noSuchPodErr = errors.New("No such pod exists")
noSuchTaskErr = errors.New("No such task exists")
)
// scheduler abstraction to allow for easier unit testing // scheduler abstraction to allow for easier unit testing
type schedulerInterface interface { type schedulerInterface interface {
sync.Locker // synchronize scheduler plugin operations sync.Locker // synchronize scheduler plugin operations
SlaveIndex malgorithm.SlaveIndex
algorithm() PodScheduler algorithm() malgorithm.PodScheduler
offers() offers.Registry offers() offers.Registry
tasks() podtask.Registry tasks() podtask.Registry
@ -75,7 +80,7 @@ type k8smScheduler struct {
internal *KubernetesMesosScheduler internal *KubernetesMesosScheduler
} }
func (k *k8smScheduler) algorithm() PodScheduler { func (k *k8smScheduler) algorithm() malgorithm.PodScheduler {
return k.internal return k.internal
} }
@ -91,7 +96,7 @@ func (k *k8smScheduler) createPodTask(ctx api.Context, pod *api.Pod) (*podtask.T
return podtask.New(ctx, "", *pod, k.internal.executor) return podtask.New(ctx, "", *pod, k.internal.executor)
} }
func (k *k8smScheduler) slaveHostNameFor(id string) string { func (k *k8smScheduler) SlaveHostNameFor(id string) string {
return k.internal.slaveHostNames.HostName(id) return k.internal.slaveHostNames.HostName(id)
} }
@ -197,7 +202,7 @@ func (k *kubeScheduler) doSchedule(task *podtask.T, err error) (string, error) {
return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID) return "", fmt.Errorf("offer already invalid/expired for task %v", task.ID)
} }
slaveId := details.GetSlaveId().GetValue() slaveId := details.GetSlaveId().GetValue()
if slaveHostName := k.api.slaveHostNameFor(slaveId); slaveHostName == "" { if slaveHostName := k.api.SlaveHostNameFor(slaveId); slaveHostName == "" {
// not much sense in Release()ing the offer here since its owner died // not much sense in Release()ing the offer here since its owner died
offer.Release() offer.Release()
k.api.offers().Invalidate(details.Id.GetValue()) k.api.offers().Invalidate(details.Id.GetValue())
@ -259,7 +264,7 @@ func (k *errorHandler) handleSchedulingError(pod *api.Pod, schedulingErr error)
return return
} }
breakoutEarly := queue.BreakChan(nil) breakoutEarly := queue.BreakChan(nil)
if schedulingErr == noSuitableOffersErr { if schedulingErr == malgorithm.NoSuitableOffersErr {
log.V(3).Infof("adding backoff breakout handler for pod %v", podKey) log.V(3).Infof("adding backoff breakout handler for pod %v", podKey)
breakoutEarly = queue.BreakChan(k.api.offers().Listen(podKey, func(offer *mesos.Offer) bool { breakoutEarly = queue.BreakChan(k.api.offers().Listen(podKey, func(offer *mesos.Offer) bool {
k.api.Lock() k.api.Lock()
@ -433,7 +438,7 @@ func (s *schedulingPlugin) reconcileTask(t *podtask.T) {
ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace) ctx := api.WithNamespace(api.NewDefaultContext(), t.Pod.Namespace)
pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name) pod, err := s.client.Pods(api.NamespaceValue(ctx)).Get(t.Pod.Name)
if err != nil { if err != nil {
if errors.IsNotFound(err) { if apierrors.IsNotFound(err) {
// attempt to delete // attempt to delete
if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr { if err = s.deleter.deleteOne(&queuer.Pod{Pod: &t.Pod}); err != nil && err != noSuchPodErr && err != noSuchTaskErr {
log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err) log.Errorf("failed to delete pod: %v: %v", t.Pod.Name, err)

View File

@ -42,6 +42,7 @@ import (
assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert" assertext "k8s.io/kubernetes/contrib/mesos/pkg/assert"
"k8s.io/kubernetes/contrib/mesos/pkg/executor/messages" "k8s.io/kubernetes/contrib/mesos/pkg/executor/messages"
"k8s.io/kubernetes/contrib/mesos/pkg/queue" "k8s.io/kubernetes/contrib/mesos/pkg/queue"
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
@ -463,7 +464,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
ei.Data = []byte{0, 1, 2} ei.Data = []byte{0, 1, 2}
// create scheduler // create scheduler
strategy := NewAllocationStrategy( strategy := malgorithm.NewAllocationStrategy(
podtask.NewDefaultPredicate( podtask.NewDefaultPredicate(
mresource.DefaultDefaultContainerCPULimit, mresource.DefaultDefaultContainerCPULimit,
mresource.DefaultDefaultContainerMemLimit, mresource.DefaultDefaultContainerMemLimit,
@ -480,7 +481,7 @@ func newLifecycleTest(t *testing.T) lifecycleTest {
Host: apiServer.server.URL, Host: apiServer.server.URL,
Version: testapi.Default.Version(), Version: testapi.Default.Version(),
}), }),
PodScheduler: NewFCFSPodScheduler(strategy, apiServer.LookupNode), PodScheduler: malgorithm.NewFCFSPodScheduler(strategy, apiServer.LookupNode),
Schedcfg: *schedcfg.CreateDefaultConfig(), Schedcfg: *schedcfg.CreateDefaultConfig(),
LookupNode: apiServer.LookupNode, LookupNode: apiServer.LookupNode,
}) })

View File

@ -35,6 +35,7 @@ import (
offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics" offermetrics "k8s.io/kubernetes/contrib/mesos/pkg/offers/metrics"
"k8s.io/kubernetes/contrib/mesos/pkg/proc" "k8s.io/kubernetes/contrib/mesos/pkg/proc"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/metrics"
@ -71,7 +72,7 @@ type KubernetesMesosScheduler struct {
// and the invoking the pod registry interfaces. // and the invoking the pod registry interfaces.
// In particular, changes to podtask.T objects are currently guarded by this lock. // In particular, changes to podtask.T objects are currently guarded by this lock.
*sync.RWMutex *sync.RWMutex
PodScheduler malgorithm.PodScheduler
// Config related, write-once // Config related, write-once
@ -111,7 +112,7 @@ type KubernetesMesosScheduler struct {
type Config struct { type Config struct {
Schedcfg schedcfg.Config Schedcfg schedcfg.Config
Executor *mesos.ExecutorInfo Executor *mesos.ExecutorInfo
PodScheduler PodScheduler PodScheduler malgorithm.PodScheduler
Client *client.Client Client *client.Client
EtcdClient tools.EtcdClient EtcdClient tools.EtcdClient
FailoverTimeout float64 FailoverTimeout float64

View File

@ -55,6 +55,7 @@ import (
"k8s.io/kubernetes/contrib/mesos/pkg/profile" "k8s.io/kubernetes/contrib/mesos/pkg/profile"
"k8s.io/kubernetes/contrib/mesos/pkg/runtime" "k8s.io/kubernetes/contrib/mesos/pkg/runtime"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler"
malgorithm "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/algorithm"
schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config" schedcfg "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/config"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/ha"
"k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta" "k8s.io/kubernetes/contrib/mesos/pkg/scheduler/meta"
@ -681,7 +682,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
log.Fatalf("misconfigured etcd: %v", err) log.Fatalf("misconfigured etcd: %v", err)
} }
as := scheduler.NewAllocationStrategy( as := malgorithm.NewAllocationStrategy(
podtask.NewDefaultPredicate( podtask.NewDefaultPredicate(
s.DefaultContainerCPULimit, s.DefaultContainerCPULimit,
s.DefaultContainerMemLimit, s.DefaultContainerMemLimit,
@ -694,7 +695,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
// downgrade allocation strategy if user disables "account-for-pod-resources" // downgrade allocation strategy if user disables "account-for-pod-resources"
if !s.AccountForPodResources { if !s.AccountForPodResources {
as = scheduler.NewAllocationStrategy( as = malgorithm.NewAllocationStrategy(
podtask.DefaultMinimalPredicate, podtask.DefaultMinimalPredicate,
podtask.DefaultMinimalProcurement) podtask.DefaultMinimalProcurement)
} }
@ -716,7 +717,7 @@ func (s *SchedulerServer) bootstrap(hks hyperkube.Interface, sc *schedcfg.Config
return n.(*api.Node) return n.(*api.Node)
} }
fcfs := scheduler.NewFCFSPodScheduler(as, lookupNode) fcfs := malgorithm.NewFCFSPodScheduler(as, lookupNode)
mesosPodScheduler := scheduler.New(scheduler.Config{ mesosPodScheduler := scheduler.New(scheduler.Config{
Schedcfg: *sc, Schedcfg: *sc,
Executor: executor, Executor: executor,