Make each pod synchronization async.

pull/6/head
Brendan Burns 2014-07-01 09:37:45 -07:00
parent 7999983311
commit 1798e0fea3
1 changed files with 19 additions and 18 deletions

View File

@ -639,11 +639,11 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
return kl.runContainer(manifest, container, "") return kl.runContainer(manifest, container, "")
} }
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep *map[DockerId]bool, mapLock *sync.Mutex) error { func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerId) error {
// Make sure we have a network container // Make sure we have a network container
netId, err := kl.getNetworkContainerId(manifest) netId, err := kl.getNetworkContainerId(manifest)
if err != nil { if err != nil {
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.Id) glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
return err return err
} }
if netId == "" { if netId == "" {
@ -654,22 +654,18 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep
return err return err
} }
} }
{ keepChannel <- netId
mapLock.Lock()
defer mapLock.Unlock()
(*dockerIdsToKeep)[netId] = true
}
for _, container := range manifest.Containers { for _, container := range manifest.Containers {
containerId, err := kl.getContainerId(manifest, &container) containerId, err := kl.getContainerId(manifest, &container)
if err != nil { if err != nil {
glog.Errorf("Error finding container: %v skipping.", err) glog.Errorf("Error finding container: %v skipping id %s.", err, manifest.ID)
continue continue
} }
if containerId == "" { if containerId == "" {
glog.Infof("%+v doesn't exist, creating", container) glog.Infof("%+v doesn't exist, creating", container)
kl.DockerPuller.Pull(container.Image) kl.DockerPuller.Pull(container.Image)
if err != nil { if err != nil {
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID) glog.Errorf("Failed to create container: %v Skipping container %s", err, manifest.ID)
continue continue
} }
containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId)) containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId))
@ -681,11 +677,7 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep
} else { } else {
glog.V(1).Infof("%s exists as %v", container.Name, containerId) glog.V(1).Infof("%s exists as %v", container.Name, containerId)
} }
{ keepChannel <- containerId
mapLock.Lock()
defer mapLock.Unlock()
(*dockerIdsToKeep)[containerId] = true
}
} }
return nil return nil
} }
@ -695,21 +687,30 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
glog.Infof("Desired: %+v", config) glog.Infof("Desired: %+v", config)
var err error var err error
dockerIdsToKeep := map[DockerId]bool{} dockerIdsToKeep := map[DockerId]bool{}
mapLock := sync.Mutex{} keepChannel := make(chan DockerId)
waitGroup := sync.WaitGroup{} waitGroup := sync.WaitGroup{}
// Check for any containers that need starting // Check for any containers that need starting
for _, manifest := range config { for _, manifest := range config {
waitGroup.Add(1) waitGroup.Add(1)
go func() { go func() {
err := kl.syncManifest(&manifest, &dockerIdsToKeep, &mapLock) defer util.HandleCrash()
defer waitGroup.Done()
err := kl.syncManifest(&manifest, keepChannel)
if err != nil { if err != nil {
glog.Errorf("Error syncing manifest: %v skipping.", err) glog.Errorf("Error syncing manifest: %v skipping.", err)
} }
waitGroup.Done()
}() }()
} }
waitGroup.Wait() go func() {
for id := range keepChannel {
dockerIdsToKeep[id] = true
}
}()
if len(config) > 0 {
waitGroup.Wait()
close(keepChannel)
}
// Kill any containers we don't need // Kill any containers we don't need
existingContainers, err := kl.getDockerContainers() existingContainers, err := kl.getDockerContainers()