Merge pull request #619 from brendandburns/controller

Make individual controller actions asynchronous.
pull/6/head
Daniel Smith 2014-07-28 15:35:19 -07:00
commit 31a78c3e2b
3 changed files with 30 additions and 8 deletions

View File

@ -183,15 +183,27 @@ func (rm *ReplicationManager) syncReplicationController(controllerSpec api.Repli
diff := len(filteredList) - controllerSpec.DesiredState.Replicas
if diff < 0 {
diff *= -1
wait := sync.WaitGroup{}
wait.Add(diff)
glog.Infof("Too few replicas, creating %d\n", diff)
for i := 0; i < diff; i++ {
rm.podControl.createReplica(controllerSpec)
go func() {
defer wait.Done()
rm.podControl.createReplica(controllerSpec)
}()
}
wait.Wait()
} else if diff > 0 {
glog.Infof("Too many replicas, deleting %d\n", diff)
wait := sync.WaitGroup{}
wait.Add(diff)
for i := 0; i < diff; i++ {
rm.podControl.deletePod(filteredList[i].ID)
go func(ix int) {
defer wait.Done()
rm.podControl.deletePod(filteredList[ix].ID)
}(i)
}
wait.Wait()
}
return nil
}

View File

@ -36,6 +36,8 @@ type FakeDockerClient struct {
}
func (f *FakeDockerClient) clearCalls() {
f.lock.Lock()
defer f.lock.Unlock()
f.called = []string{}
}

View File

@ -19,6 +19,7 @@ package registry
import (
"fmt"
"strings"
"sync"
"time"
"code.google.com/p/go-uuid/uuid"
@ -40,6 +41,7 @@ type PodRegistryStorage struct {
minionLister scheduler.MinionLister
cloud cloudprovider.Interface
podPollPeriod time.Duration
lock sync.Mutex
}
// MakePodRegistryStorage makes a RESTStorage object for a pod registry.
@ -193,6 +195,17 @@ func (storage *PodRegistryStorage) Extract(body []byte) (interface{}, error) {
return pod, err
}
func (storage *PodRegistryStorage) scheduleAndCreatePod(pod api.Pod) error {
storage.lock.Lock()
defer storage.lock.Unlock()
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod, storage.minionLister)
if err != nil {
return err
}
return storage.registry.CreatePod(machine, pod)
}
func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{}, error) {
pod := obj.(api.Pod)
if len(pod.ID) == 0 {
@ -201,12 +214,7 @@ func (storage *PodRegistryStorage) Create(obj interface{}) (<-chan interface{},
pod.DesiredState.Manifest.ID = pod.ID
return apiserver.MakeAsync(func() (interface{}, error) {
// TODO(lavalamp): Separate scheduler more cleanly.
machine, err := storage.scheduler.Schedule(pod, storage.minionLister)
if err != nil {
return nil, err
}
err = storage.registry.CreatePod(machine, pod)
err := storage.scheduleAndCreatePod(pod)
if err != nil {
return nil, err
}