From 192ffdfb25ccd3e5c07e9e37bd55d6d0fb28c000 Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Fri, 19 Jun 2015 22:49:18 -0700 Subject: [PATCH] Fix the container bridge so that it can create cbr0 Fix the kubelet so that it tries to sync status, even if Docker is down --- cluster/saltbase/salt/docker/docker-defaults | 6 +- cluster/saltbase/salt/kubelet/default | 7 ++- cmd/kubelet/app/server.go | 6 +- pkg/kubelet/container_bridge.go | 66 +++++++++++++------- pkg/kubelet/kubelet.go | 47 ++++++++++---- pkg/kubelet/status_manager.go | 7 +++ 6 files changed, 98 insertions(+), 41 deletions(-) diff --git a/cluster/saltbase/salt/docker/docker-defaults b/cluster/saltbase/salt/docker/docker-defaults index 43e8fb9eaf..7e5725064d 100644 --- a/cluster/saltbase/salt/docker/docker-defaults +++ b/cluster/saltbase/salt/docker/docker-defaults @@ -3,9 +3,5 @@ DOCKER_OPTS="" DOCKER_OPTS="${DOCKER_OPTS} {{grains.docker_opts}}" {% endif %} -{% set docker_bridge = "" %} -{% if grains['roles'][0] == 'kubernetes-pool' %} - {% set docker_bridge = "--bridge cbr0" %} -{% endif %} -DOCKER_OPTS="${DOCKER_OPTS} {{docker_bridge}} --iptables=false --ip-masq=false" +DOCKER_OPTS="${DOCKER_OPTS} --bridge=cbr0 --iptables=false --ip-masq=false" DOCKER_NOFILE=1000000 diff --git a/cluster/saltbase/salt/kubelet/default b/cluster/saltbase/salt/kubelet/default index 76eb4497b4..a1c1dccc1f 100644 --- a/cluster/saltbase/salt/kubelet/default +++ b/cluster/saltbase/salt/kubelet/default @@ -76,4 +76,9 @@ {% set cgroup_root = "--cgroup_root=/" -%} {% endif -%} -DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}}" +{% set pod_cidr = "" %} +{% if grains['roles'][0] == 'kubernetes-master' %} + {% set pod_cidr = "--pod-cidr=" + grains['cbr-cidr'] %} +{% endif %} + +DAEMON_ARGS="{{daemon_args}} {{api_servers_with_port}} {{debugging_handlers}} {{hostname_override}} {{cloud_provider}} {{config}} --allow_privileged={{pillar['allow_privileged']}} {{pillar['log_level']}} {{cluster_dns}} {{cluster_domain}} {{docker_root}} {{kubelet_root}} {{configure_cbr0}} {{cgroup_root}} {{system_container}} {{pod_cidr}}" diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index b50179b480..1cd1362da1 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -115,6 +115,7 @@ type KubeletServer struct { DockerDaemonContainer string SystemContainer string ConfigureCBR0 bool + PodCIDR string MaxPods int DockerExecHandlerName string @@ -241,7 +242,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.BoolVar(&s.ConfigureCBR0, "configure-cbr0", s.ConfigureCBR0, "If true, kubelet will configure cbr0 based on Node.Spec.PodCIDR.") fs.IntVar(&s.MaxPods, "max-pods", 100, "Number of Pods that can run on this Kubelet.") fs.StringVar(&s.DockerExecHandlerName, "docker-exec-handler", s.DockerExecHandlerName, "Handler to use when executing a command in a container. Valid values are 'native' and 'nsenter'. Defaults to 'native'.") - + fs.StringVar(&s.PodCIDR, "pod-cidr", "", "The CIDR to use for pod IP addresses, only used in standalone mode. In cluster mode, this is obtained from the master.") // Flags intended for testing, not recommended used in production environments. fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.") fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]") @@ -361,6 +362,7 @@ func (s *KubeletServer) Run(_ []string) error { DockerDaemonContainer: s.DockerDaemonContainer, SystemContainer: s.SystemContainer, ConfigureCBR0: s.ConfigureCBR0, + PodCIDR: s.PodCIDR, MaxPods: s.MaxPods, DockerExecHandler: dockerExecHandler, } @@ -714,6 +716,7 @@ type KubeletConfig struct { DockerDaemonContainer string SystemContainer string ConfigureCBR0 bool + PodCIDR string MaxPods int DockerExecHandler dockertools.ExecHandler } @@ -771,6 +774,7 @@ func createAndInitKubelet(kc *KubeletConfig) (k KubeletBootstrap, pc *config.Pod kc.DockerDaemonContainer, kc.SystemContainer, kc.ConfigureCBR0, + kc.PodCIDR, kc.MaxPods, kc.DockerExecHandler) diff --git a/pkg/kubelet/container_bridge.go b/pkg/kubelet/container_bridge.go index a4a30a5301..0aaa885195 100644 --- a/pkg/kubelet/container_bridge.go +++ b/pkg/kubelet/container_bridge.go @@ -19,6 +19,7 @@ package kubelet import ( "bytes" "net" + "os" "os/exec" "regexp" @@ -27,8 +28,39 @@ import ( var cidrRegexp = regexp.MustCompile(`inet ([0-9a-fA-F.:]*/[0-9]*)`) +func createCBR0(wantCIDR *net.IPNet) error { + // recreate cbr0 with wantCIDR + if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil { + glog.Error(err) + return err + } + if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil { + glog.Error(err) + return err + } + if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil { + glog.Error(err) + return err + } + // restart docker + if err := exec.Command("service", "docker", "restart").Run(); err != nil { + glog.Error(err) + // For now just log the error. The containerRuntime check will catch docker failures. + // TODO (dawnchen) figure out what we should do for rkt here. + } + glog.V(2).Info("Recreated cbr0 and restarted docker") + return nil +} + func ensureCbr0(wantCIDR *net.IPNet) error { - if !cbr0CidrCorrect(wantCIDR) { + exists, err := cbr0Exists() + if err != nil { + return err + } + if !exists { + glog.V(2).Infof("CBR0 doesn't exist, attempting to create it with range: %s", wantCIDR) + return createCBR0(wantCIDR) + } else if !cbr0CidrCorrect(wantCIDR) { glog.V(2).Infof("Attempting to recreate cbr0 with address range: %s", wantCIDR) // delete cbr0 @@ -40,30 +72,22 @@ func ensureCbr0(wantCIDR *net.IPNet) error { glog.Error(err) return err } - // recreate cbr0 with wantCIDR - if err := exec.Command("brctl", "addbr", "cbr0").Run(); err != nil { - glog.Error(err) - return err - } - if err := exec.Command("ip", "addr", "add", wantCIDR.String(), "dev", "cbr0").Run(); err != nil { - glog.Error(err) - return err - } - if err := exec.Command("ip", "link", "set", "dev", "cbr0", "up").Run(); err != nil { - glog.Error(err) - return err - } - // restart docker - if err := exec.Command("service", "docker", "restart").Run(); err != nil { - glog.Error(err) - // For now just log the error. The containerRuntime check will catch docker failures. - // TODO (dawnchen) figure out what we should do for rkt here. - } - glog.V(2).Info("Recreated cbr0 and restarted docker") + return createCBR0(wantCIDR) } return nil } +func cbr0Exists() (bool, error) { + _, err := os.Stat("/sys/class/net/cbr0") + if err != nil { + if os.IsNotExist(err) { + return false, nil + } + return false, err + } + return true, nil +} + func cbr0CidrCorrect(wantCIDR *net.IPNet) bool { output, err := exec.Command("ip", "addr", "show", "cbr0").Output() if err != nil { diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 2553c6bbae..537c95bed4 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -147,6 +147,7 @@ func NewMainKubelet( dockerDaemonContainer string, systemContainer string, configureCBR0 bool, + podCIDR string, pods int, dockerExecHandler dockertools.ExecHandler) (*Kubelet, error) { if rootDirectory == "" { @@ -261,6 +262,7 @@ func NewMainKubelet( cgroupRoot: cgroupRoot, mounter: mounter, configureCBR0: configureCBR0, + podCIDR: podCIDR, pods: pods, syncLoopMonitor: util.AtomicValue{}, } @@ -318,6 +320,10 @@ func NewMainKubelet( } klet.containerManager = containerManager + // Start syncing node status immediately, this may set up things the runtime needs to run. + go klet.syncNodeStatus() + go klet.syncNetworkStatus() + // Wait for the runtime to be up with a timeout. if err := waitUntilRuntimeIsUp(klet.containerRuntime, maxWaitForContainerRuntime); err != nil { return nil, fmt.Errorf("timed out waiting for %q to come up: %v", containerRuntime, err) @@ -412,6 +418,9 @@ type Kubelet struct { runtimeUpThreshold time.Duration lastTimestampRuntimeUp time.Time + // Network Status information + networkConfigured bool + // Volume plugins. volumePluginMgr volume.VolumePluginMgr @@ -489,6 +498,7 @@ type Kubelet struct { // Whether or not kubelet should take responsibility for keeping cbr0 in // the correct state. configureCBR0 bool + podCIDR string // Number of Pods which can be run by this Kubelet pods int @@ -707,7 +717,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) { } go util.Until(kl.updateRuntimeUp, 5*time.Second, util.NeverStop) - go kl.syncNodeStatus() // Run the system oom watcher forever. kl.statusManager.Start() kl.syncLoop(updates, kl) @@ -1705,6 +1714,10 @@ func (kl *Kubelet) syncLoopIteration(updates <-chan PodUpdate, handler SyncHandl glog.Infof("Skipping pod synchronization, container runtime is not up.") return } + if !kl.networkConfigured { + time.Sleep(5 * time.Second) + glog.Infof("Skipping pod synchronization, network is not configured") + } unsyncedPod := false podSyncTypes := make(map[types.UID]SyncPodType) select { @@ -1892,6 +1905,22 @@ func (kl *Kubelet) recordNodeStatusEvent(event string) { // Maintains Node.Spec.Unschedulable value from previous run of tryUpdateNodeStatus() var oldNodeUnschedulable bool +func (kl *Kubelet) syncNetworkStatus() { + for { + networkConfigured := true + if kl.configureCBR0 { + if len(kl.podCIDR) == 0 { + networkConfigured = false + } else if err := kl.reconcileCBR0(kl.podCIDR); err != nil { + networkConfigured = false + glog.Errorf("Error configuring cbr0: %v", err) + } + } + kl.networkConfigured = networkConfigured + time.Sleep(30 * time.Second) + } +} + // setNodeStatus fills in the Status fields of the given Node, overwriting // any fields that are currently set. func (kl *Kubelet) setNodeStatus(node *api.Node) error { @@ -1925,16 +1954,6 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { } } - networkConfigured := true - if kl.configureCBR0 { - if len(node.Spec.PodCIDR) == 0 { - networkConfigured = false - } else if err := kl.reconcileCBR0(node.Spec.PodCIDR); err != nil { - networkConfigured = false - glog.Errorf("Error configuring cbr0: %v", err) - } - } - // TODO: Post NotReady if we cannot get MachineInfo from cAdvisor. This needs to start // cAdvisor locally, e.g. for test-cmd.sh, and in integration test. info, err := kl.GetCachedMachineInfo() @@ -1982,7 +2001,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { currentTime := util.Now() var newNodeReadyCondition api.NodeCondition var oldNodeReadyConditionStatus api.ConditionStatus - if containerRuntimeUp && networkConfigured { + if containerRuntimeUp && kl.networkConfigured { newNodeReadyCondition = api.NodeCondition{ Type: api.NodeReady, Status: api.ConditionTrue, @@ -1994,7 +2013,7 @@ func (kl *Kubelet) setNodeStatus(node *api.Node) error { if !containerRuntimeUp { reasons = append(reasons, "container runtime is down") } - if !networkConfigured { + if !kl.networkConfigured { reasons = append(reasons, "network not configured correctly") } newNodeReadyCondition = api.NodeCondition{ @@ -2056,6 +2075,8 @@ func (kl *Kubelet) tryUpdateNodeStatus() error { if node == nil { return fmt.Errorf("no node instance returned for %q", kl.nodeName) } + kl.podCIDR = node.Spec.PodCIDR + if err := kl.setNodeStatus(node); err != nil { return err } diff --git a/pkg/kubelet/status_manager.go b/pkg/kubelet/status_manager.go index 4df39d7c08..d8bf9bbb4e 100644 --- a/pkg/kubelet/status_manager.go +++ b/pkg/kubelet/status_manager.go @@ -17,9 +17,11 @@ limitations under the License. package kubelet import ( + "errors" "fmt" "reflect" "sync" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -58,6 +60,8 @@ func (s *statusManager) Start() { err := s.syncBatch() if err != nil { glog.Warningf("Failed to updated pod status: %v", err) + // Errors and tight-looping are bad, m-kay + time.Sleep(30 * time.Second) } }, 0) } @@ -124,6 +128,9 @@ func (s *statusManager) RemoveOrphanedStatuses(podFullNames map[string]bool) { // syncBatch syncs pods statuses with the apiserver. func (s *statusManager) syncBatch() error { + if s.kubeClient == nil { + return errors.New("Kubernetes client is nil, skipping pod status updates") + } syncRequest := <-s.podStatusChannel pod := syncRequest.pod podFullName := kubecontainer.GetPodFullName(pod)