diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index 036fe3da17..45922774e7 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -64,6 +64,7 @@ var ( allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") + runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server") ) func init() { @@ -106,6 +107,17 @@ func main() { verflag.PrintAndExitIfRequested() + if *runonce { + exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive" + if len(etcdServerList) > 0 { + glog.Fatalf(exclusiveFlag, "--etcd_servers") + } + if *enableServer { + glog.Infof("--runonce is set, disabling server") + *enableServer = false + } + } + etcd.SetLogger(util.NewLogger("etcd ")) capabilities.Initialize(capabilities.Capabilities{ @@ -128,7 +140,9 @@ func main() { glog.Fatal("Invalid root directory path.") } *rootDirectory = path.Clean(*rootDirectory) - os.MkdirAll(*rootDirectory, 0750) + if err := os.MkdirAll(*rootDirectory, 0750); err != nil { + glog.Warningf("Error creating root directory: %v", err) + } // source of all configuration cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates) @@ -171,6 +185,14 @@ func main() { health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{})) health.AddHealthChecker(&health.TCPHealthChecker{}) + // process pods and exit. + if *runonce { + if _, err := k.RunOnce(cfg.Updates()); err != nil { + glog.Fatalf("--runonce failed: %v", err) + } + return + } + // start the kubelet go util.Forever(func() { k.Run(cfg.Updates()) }, 0) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index e210038368..039be7b890 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -634,9 +634,8 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { } // Check for any containers that need starting - for i := range pods { - pod := &pods[i] - podFullName := GetPodFullName(pod) + for _, pod := range pods { + podFullName := GetPodFullName(&pod) uuid := pod.Manifest.UUID // Add all containers (including net) to the map. @@ -647,7 +646,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error { // Run the sync in an async manifest worker. kl.podWorkers.Run(podFullName, func() { - err := kl.syncPod(pod, dockerContainers) + err := kl.syncPod(&pod, dockerContainers) if err != nil { glog.Errorf("Error syncing pod: %v skipping.", err) } diff --git a/pkg/kubelet/runonce.go b/pkg/kubelet/runonce.go new file mode 100644 index 0000000000..f1f48d25b3 --- /dev/null +++ b/pkg/kubelet/runonce.go @@ -0,0 +1,126 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kubelet + +import ( + "fmt" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools" + "github.com/golang/glog" +) + +const ( + RunOnceManifestDelay = 1 * time.Second + RunOnceMaxRetries = 1 + RunOnceRetryDelay = 1 * time.Second + RunOnceRetryDelayBackoff = 2 +) + +type RunPodResult struct { + Pod *Pod + Info api.PodInfo + Err error +} + +// RunOnce polls from one configuration update and run the associated pods. +func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) { + select { + case u := <-updates: + glog.Infof("processing manifest with %d pods", len(u.Pods)) + result, err := kl.runOnce(u.Pods) + glog.Infof("finished processing %d pods", len(u.Pods)) + return result, err + case <-time.After(RunOnceManifestDelay): + return nil, fmt.Errorf("no pod manifest update after %v", RunOnceManifestDelay) + } +} + +// runOnce runs a given set of pods and returns their status. +func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) { + if kl.dockerPuller == nil { + kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst) + } + pods = filterHostPortConflicts(pods) + + ch := make(chan RunPodResult) + for _, pod := range pods { + go func() { + info, err := kl.runPod(pod) + ch <- RunPodResult{&pod, info, err} + }() + } + + glog.Infof("waiting for %d pods", len(pods)) + failedPods := []string{} + for i := 0; i < len(pods); i++ { + res := <-ch + results = append(results, res) + if res.Err != nil { + glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err) + failedPods = append(failedPods, res.Pod.Name) + } else { + glog.Infof("started pod %q: %#v", res.Pod.Name, res.Info) + } + } + if len(failedPods) > 0 { + return results, fmt.Errorf("error running pods: %v", failedPods) + } + glog.Infof("%d pods started", len(pods)) + return results, err +} + +// Run a single pod and wait until all containers are running. +func (kl *Kubelet) runPod(pod Pod) (api.PodInfo, error) { + dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false) + if err != nil { + return nil, fmt.Errorf("failed to get kubelet docker containers: %v", err) + } + + delay := RunOnceRetryDelay + for i := 0; i < RunOnceMaxRetries; i++ { + err := kl.syncPod(&pod, dockerContainers) + if err != nil { + return nil, fmt.Errorf("error syncing pod: %v", err) + } + info, err := kl.GetPodInfo(GetPodFullName(&pod), pod.Manifest.UUID) + if err != nil { + return nil, fmt.Errorf("error getting pod info: %v", err) + } + if podInfo(info).isRunning() { + return info, nil + } + glog.Infof("pod %q containers not running, waiting for %v", pod.Name, delay) + <-time.After(delay) + delay *= RunOnceRetryDelayBackoff + } + return nil, fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries) +} + +// Alias PodInfo for internal usage. +type podInfo api.PodInfo + +// Check if all containers of a pod are running. +func (info podInfo) isRunning() bool { + for _, container := range info { + if container.State.Running == nil { + return false + } + } + return true +}