mirror of https://github.com/k3s-io/k3s
Make each pod synchronization async.
parent
7999983311
commit
1798e0fea3
|
@ -639,11 +639,11 @@ func (kl *Kubelet) createNetworkContainer(manifest *api.ContainerManifest) (Dock
|
|||
return kl.runContainer(manifest, container, "")
|
||||
}
|
||||
|
||||
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep *map[DockerId]bool, mapLock *sync.Mutex) error {
|
||||
func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel chan<- DockerId) error {
|
||||
// Make sure we have a network container
|
||||
netId, err := kl.getNetworkContainerId(manifest)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.Id)
|
||||
glog.Errorf("Failed to introspect network container. (%v) Skipping container %s", err, manifest.ID)
|
||||
return err
|
||||
}
|
||||
if netId == "" {
|
||||
|
@ -654,22 +654,18 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep
|
|||
return err
|
||||
}
|
||||
}
|
||||
{
|
||||
mapLock.Lock()
|
||||
defer mapLock.Unlock()
|
||||
(*dockerIdsToKeep)[netId] = true
|
||||
}
|
||||
keepChannel <- netId
|
||||
for _, container := range manifest.Containers {
|
||||
containerId, err := kl.getContainerId(manifest, &container)
|
||||
if err != nil {
|
||||
glog.Errorf("Error finding container: %v skipping.", err)
|
||||
glog.Errorf("Error finding container: %v skipping id %s.", err, manifest.ID)
|
||||
continue
|
||||
}
|
||||
if containerId == "" {
|
||||
glog.Infof("%+v doesn't exist, creating", container)
|
||||
kl.DockerPuller.Pull(container.Image)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to create network container: %v Skipping container %s", err, manifest.ID)
|
||||
glog.Errorf("Failed to create container: %v Skipping container %s", err, manifest.ID)
|
||||
continue
|
||||
}
|
||||
containerId, err = kl.runContainer(manifest, &container, "container:"+string(netId))
|
||||
|
@ -681,11 +677,7 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, dockerIdsToKeep
|
|||
} else {
|
||||
glog.V(1).Infof("%s exists as %v", container.Name, containerId)
|
||||
}
|
||||
{
|
||||
mapLock.Lock()
|
||||
defer mapLock.Unlock()
|
||||
(*dockerIdsToKeep)[containerId] = true
|
||||
}
|
||||
keepChannel <- containerId
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -695,21 +687,30 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error {
|
|||
glog.Infof("Desired: %+v", config)
|
||||
var err error
|
||||
dockerIdsToKeep := map[DockerId]bool{}
|
||||
mapLock := sync.Mutex{}
|
||||
keepChannel := make(chan DockerId)
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
// Check for any containers that need starting
|
||||
for _, manifest := range config {
|
||||
waitGroup.Add(1)
|
||||
go func() {
|
||||
err := kl.syncManifest(&manifest, &dockerIdsToKeep, &mapLock)
|
||||
defer util.HandleCrash()
|
||||
defer waitGroup.Done()
|
||||
err := kl.syncManifest(&manifest, keepChannel)
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing manifest: %v skipping.", err)
|
||||
}
|
||||
waitGroup.Done()
|
||||
}()
|
||||
}
|
||||
waitGroup.Wait()
|
||||
go func() {
|
||||
for id := range keepChannel {
|
||||
dockerIdsToKeep[id] = true
|
||||
}
|
||||
}()
|
||||
if len(config) > 0 {
|
||||
waitGroup.Wait()
|
||||
close(keepChannel)
|
||||
}
|
||||
|
||||
// Kill any containers we don't need
|
||||
existingContainers, err := kl.getDockerContainers()
|
||||
|
|
Loading…
Reference in New Issue