mirror of https://github.com/k3s-io/k3s
commit
a0bb739609
|
@ -64,6 +64,7 @@ var (
|
|||
allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]")
|
||||
registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]")
|
||||
registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0")
|
||||
runonce = flag.Bool("runonce", false, "If true, exit after spawning pods from local manifests or remote urls. Exclusive with --etcd_servers and --enable-server")
|
||||
)
|
||||
|
||||
func init() {
|
||||
|
@ -106,6 +107,17 @@ func main() {
|
|||
|
||||
verflag.PrintAndExitIfRequested()
|
||||
|
||||
if *runonce {
|
||||
exclusiveFlag := "invalid option: --runonce and %s are mutually exclusive"
|
||||
if len(etcdServerList) > 0 {
|
||||
glog.Fatalf(exclusiveFlag, "--etcd_servers")
|
||||
}
|
||||
if *enableServer {
|
||||
glog.Infof("--runonce is set, disabling server")
|
||||
*enableServer = false
|
||||
}
|
||||
}
|
||||
|
||||
etcd.SetLogger(util.NewLogger("etcd "))
|
||||
|
||||
capabilities.Initialize(capabilities.Capabilities{
|
||||
|
@ -128,7 +140,9 @@ func main() {
|
|||
glog.Fatal("Invalid root directory path.")
|
||||
}
|
||||
*rootDirectory = path.Clean(*rootDirectory)
|
||||
os.MkdirAll(*rootDirectory, 0750)
|
||||
if err := os.MkdirAll(*rootDirectory, 0750); err != nil {
|
||||
glog.Warningf("Error creating root directory: %v", err)
|
||||
}
|
||||
|
||||
// source of all configuration
|
||||
cfg := kconfig.NewPodConfig(kconfig.PodConfigNotificationSnapshotAndUpdates)
|
||||
|
@ -171,6 +185,14 @@ func main() {
|
|||
health.AddHealthChecker(health.NewHTTPHealthChecker(&http.Client{}))
|
||||
health.AddHealthChecker(&health.TCPHealthChecker{})
|
||||
|
||||
// process pods and exit.
|
||||
if *runonce {
|
||||
if _, err := k.RunOnce(cfg.Updates()); err != nil {
|
||||
glog.Fatalf("--runonce failed: %v", err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// start the kubelet
|
||||
go util.Forever(func() { k.Run(cfg.Updates()) }, 0)
|
||||
|
||||
|
|
|
@ -634,9 +634,8 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
|
|||
}
|
||||
|
||||
// Check for any containers that need starting
|
||||
for i := range pods {
|
||||
pod := &pods[i]
|
||||
podFullName := GetPodFullName(pod)
|
||||
for _, pod := range pods {
|
||||
podFullName := GetPodFullName(&pod)
|
||||
uuid := pod.Manifest.UUID
|
||||
|
||||
// Add all containers (including net) to the map.
|
||||
|
@ -647,7 +646,7 @@ func (kl *Kubelet) SyncPods(pods []Pod) error {
|
|||
|
||||
// Run the sync in an async manifest worker.
|
||||
kl.podWorkers.Run(podFullName, func() {
|
||||
err := kl.syncPod(pod, dockerContainers)
|
||||
err := kl.syncPod(&pod, dockerContainers)
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing pod: %v skipping.", err)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
const (
|
||||
RunOnceManifestDelay = 1 * time.Second
|
||||
RunOnceMaxRetries = 10
|
||||
RunOnceRetryDelay = 1 * time.Second
|
||||
RunOnceRetryDelayBackoff = 2
|
||||
)
|
||||
|
||||
type RunPodResult struct {
|
||||
Pod *Pod
|
||||
Info api.PodInfo
|
||||
Err error
|
||||
}
|
||||
|
||||
// RunOnce polls from one configuration update and run the associated pods.
|
||||
func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
|
||||
select {
|
||||
case u := <-updates:
|
||||
glog.Infof("processing manifest with %d pods", len(u.Pods))
|
||||
result, err := kl.runOnce(u.Pods)
|
||||
glog.Infof("finished processing %d pods", len(u.Pods))
|
||||
return result, err
|
||||
case <-time.After(RunOnceManifestDelay):
|
||||
return nil, fmt.Errorf("no pod manifest update after %v", RunOnceManifestDelay)
|
||||
}
|
||||
}
|
||||
|
||||
// runOnce runs a given set of pods and returns their status.
|
||||
func (kl *Kubelet) runOnce(pods []Pod) (results []RunPodResult, err error) {
|
||||
if kl.dockerPuller == nil {
|
||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
||||
}
|
||||
pods = filterHostPortConflicts(pods)
|
||||
|
||||
ch := make(chan RunPodResult)
|
||||
for _, pod := range pods {
|
||||
go func() {
|
||||
info, err := kl.runPod(pod)
|
||||
ch <- RunPodResult{&pod, info, err}
|
||||
}()
|
||||
}
|
||||
|
||||
glog.Infof("waiting for %d pods", len(pods))
|
||||
failedPods := []string{}
|
||||
for i := 0; i < len(pods); i++ {
|
||||
res := <-ch
|
||||
results = append(results, res)
|
||||
if res.Err != nil {
|
||||
glog.Infof("failed to start pod %q: %v", res.Pod.Name, res.Err)
|
||||
failedPods = append(failedPods, res.Pod.Name)
|
||||
} else {
|
||||
glog.Infof("started pod %q: %#v", res.Pod.Name, res.Info)
|
||||
}
|
||||
}
|
||||
if len(failedPods) > 0 {
|
||||
return results, fmt.Errorf("error running pods: %v", failedPods)
|
||||
}
|
||||
glog.Infof("%d pods started", len(pods))
|
||||
return results, err
|
||||
}
|
||||
|
||||
// Run a single pod and wait until all containers are running.
|
||||
func (kl *Kubelet) runPod(pod Pod) (api.PodInfo, error) {
|
||||
dockerContainers, err := dockertools.GetKubeletDockerContainers(kl.dockerClient, false)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get kubelet docker containers: %v", err)
|
||||
}
|
||||
|
||||
delay := RunOnceRetryDelay
|
||||
retry := 0
|
||||
for {
|
||||
glog.Infof("syncing pod")
|
||||
err := kl.syncPod(&pod, dockerContainers)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error syncing pod: %v", err)
|
||||
}
|
||||
info, err := kl.GetPodInfo(GetPodFullName(&pod), pod.Manifest.UUID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error getting pod info: %v", err)
|
||||
}
|
||||
if podInfo(info).isRunning() {
|
||||
return info, nil
|
||||
}
|
||||
if retry >= RunOnceMaxRetries {
|
||||
return nil, fmt.Errorf("timeout error: pod %q containers not running after %d retries", pod.Name, RunOnceMaxRetries)
|
||||
}
|
||||
glog.Infof("pod %q containers not running, waiting for %v", pod.Name, delay)
|
||||
<-time.After(delay)
|
||||
retry++
|
||||
delay *= RunOnceRetryDelayBackoff
|
||||
}
|
||||
}
|
||||
|
||||
// Alias PodInfo for internal usage.
|
||||
type podInfo api.PodInfo
|
||||
|
||||
// Check if all containers of a pod are running.
|
||||
func (info podInfo) isRunning() bool {
|
||||
for _, container := range info {
|
||||
if container.State.Running == nil {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
|
@ -0,0 +1,115 @@
|
|||
/*
|
||||
Copyright 2014 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strconv"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
docker "github.com/fsouza/go-dockerclient"
|
||||
)
|
||||
|
||||
type listContainersResult struct {
|
||||
label string
|
||||
containers []docker.APIContainers
|
||||
err error
|
||||
}
|
||||
|
||||
type inspectContainersResult struct {
|
||||
label string
|
||||
container docker.Container
|
||||
err error
|
||||
}
|
||||
|
||||
type testDocker struct {
|
||||
listContainersResults []listContainersResult
|
||||
inspectContainersResults []inspectContainersResult
|
||||
dockertools.FakeDockerClient
|
||||
t *testing.T
|
||||
}
|
||||
|
||||
func (d *testDocker) ListContainers(options docker.ListContainersOptions) ([]docker.APIContainers, error) {
|
||||
if len(d.listContainersResults) > 0 {
|
||||
result := d.listContainersResults[0]
|
||||
d.listContainersResults = d.listContainersResults[1:]
|
||||
d.t.Logf("ListContainers: %q, returning: (%v, %v)", result.label, result.containers, result.err)
|
||||
return result.containers, result.err
|
||||
}
|
||||
return nil, fmt.Errorf("ListContainers error: no more test results")
|
||||
}
|
||||
|
||||
func (d *testDocker) InspectContainer(id string) (*docker.Container, error) {
|
||||
if len(d.inspectContainersResults) > 0 {
|
||||
result := d.inspectContainersResults[0]
|
||||
d.inspectContainersResults = d.inspectContainersResults[1:]
|
||||
d.t.Logf("InspectContainers: %q, returning: (%v, %v)", result.label, result.container, result.err)
|
||||
return &result.container, result.err
|
||||
}
|
||||
return nil, fmt.Errorf("InspectContainer error: no more test results")
|
||||
}
|
||||
|
||||
func TestRunOnce(t *testing.T) {
|
||||
kb := &Kubelet{}
|
||||
container := api.Container{Name: "bar"}
|
||||
kb.dockerClient = &testDocker{
|
||||
listContainersResults: []listContainersResult{
|
||||
{label: "pre syncPod", containers: []docker.APIContainers{}},
|
||||
{label: "syncPod #1", containers: []docker.APIContainers{}},
|
||||
{label: "syncPod #2", containers: []docker.APIContainers{}},
|
||||
{label: "post syncPod", containers: []docker.APIContainers{
|
||||
{
|
||||
Names: []string{"/k8s_bar." + strconv.FormatUint(dockertools.HashContainer(&container), 16) + "_foo.test"},
|
||||
ID: "1234",
|
||||
},
|
||||
{
|
||||
Names: []string{"/k8s_net_foo.test_"},
|
||||
ID: "9876",
|
||||
},
|
||||
}},
|
||||
},
|
||||
inspectContainersResults: []inspectContainersResult{
|
||||
{label: "syncPod", container: docker.Container{State: docker.State{Running: true}}},
|
||||
{label: "syncPod", container: docker.Container{State: docker.State{Running: true}}},
|
||||
},
|
||||
t: t,
|
||||
}
|
||||
kb.dockerPuller = &dockertools.FakeDockerPuller{}
|
||||
results, err := kb.runOnce([]Pod{
|
||||
{
|
||||
Name: "foo",
|
||||
Namespace: "test",
|
||||
Manifest: api.ContainerManifest{
|
||||
ID: "foo",
|
||||
Containers: []api.Container{
|
||||
{Name: "bar"},
|
||||
},
|
||||
},
|
||||
},
|
||||
})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if results[0].Err != nil {
|
||||
t.Errorf("unexpected run pod error: %v", results[0].Err)
|
||||
}
|
||||
if results[0].Pod.Name != "foo" {
|
||||
t.Errorf("unexpected pod: %q", results[0].Pod.Name)
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue