From 3512975c31f410aec6ecea8734046bff1aa5a136 Mon Sep 17 00:00:00 2001 From: Pengfei Ni Date: Fri, 5 Aug 2016 00:29:19 +0800 Subject: [PATCH] Kubelet: generate sandbox/container config for new runtime API --- pkg/kubelet/kuberuntime/helpers.go | 160 +++++++++++++++++ .../kuberuntime/kuberuntime_container.go | 166 ++++++++++++++++++ .../kuberuntime/kuberuntime_sandbox.go | 107 +++++++++++ 3 files changed, 433 insertions(+) create mode 100644 pkg/kubelet/kuberuntime/helpers.go create mode 100644 pkg/kubelet/kuberuntime/kuberuntime_sandbox.go diff --git a/pkg/kubelet/kuberuntime/helpers.go b/pkg/kubelet/kuberuntime/helpers.go new file mode 100644 index 0000000000..cfbc9540af --- /dev/null +++ b/pkg/kubelet/kuberuntime/helpers.go @@ -0,0 +1,160 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kuberuntime + +import ( + "fmt" + "math/rand" + "strconv" + "strings" + + "github.com/golang/glog" + "k8s.io/kubernetes/pkg/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +const ( + // kubePrefix is used to identify the containers/sandboxes on the node managed by kubelet + kubePrefix = "k8s" + // kubeSandboxNamePrefix is used to keep sandbox name consistent with old podInfraContainer name + kubeSandboxNamePrefix = "POD" + + // Taken from lmctfy https://github.com/google/lmctfy/blob/master/lmctfy/controllers/cpu_controller.cc + minShares = 2 + sharesPerCPU = 1024 + milliCPUToCPU = 1000 + + // 100000 is equivalent to 100ms + quotaPeriod = 100000 + minQuotaPeriod = 1000 +) + +// buildSandboxName creates a name which can be reversed to identify sandbox full name +func buildSandboxName(pod *api.Pod) string { + _, sandboxName, _ := buildKubeGenericName(pod, kubeSandboxNamePrefix) + return sandboxName +} + +// parseSandboxName unpacks a sandbox full name, returning the pod name, namespace and uid +func parseSandboxName(name string) (string, string, string, error) { + podName, podNamespace, podUID, _, _, err := parseContainerName(name) + if err != nil { + return "", "", "", err + } + + return podName, podNamespace, podUID, nil +} + +// buildContainerName creates a name which can be reversed to identify container name. +// This function returns stable name, unique name and an unique id. +func buildContainerName(pod *api.Pod, container *api.Container) (string, string, string) { + // kubelet uses hash to determine whether an existing container matches the desired spec. + containerName := container.Name + "." + strconv.FormatUint(kubecontainer.HashContainer(container), 16) + return buildKubeGenericName(pod, containerName) +} + +// buildKubeGenericName creates a name which can be reversed to identify container/sandbox name. +// This function returns stable name, unique name and an unique id. +func buildKubeGenericName(pod *api.Pod, containerName string) (string, string, string) { + stableName := fmt.Sprintf("%s_%s_%s_%s_%s", + kubePrefix, + containerName, + pod.Name, + pod.Namespace, + string(pod.UID), + ) + UID := fmt.Sprintf("%08x", rand.Uint32()) + return stableName, fmt.Sprintf("%s_%s", stableName, UID), UID +} + +// parseContainerName unpacks a container name, returning the pod name, namespace, UID and container name +func parseContainerName(name string) (podName, podNamespace, podUID, containerName string, hash uint64, err error) { + parts := strings.Split(name, "_") + if len(parts) == 0 || parts[0] != kubePrefix { + err = fmt.Errorf("failed to parse container name %q into parts", name) + return "", "", "", "", 0, err + } + if len(parts) < 6 { + glog.Warningf("Found a container with the %q prefix, but too few fields (%d): %q", kubePrefix, len(parts), name) + err = fmt.Errorf("container name %q has fewer parts than expected %v", name, parts) + return "", "", "", "", 0, err + } + + nameParts := strings.Split(parts[1], ".") + containerName = nameParts[0] + if len(nameParts) > 1 { + hash, err = strconv.ParseUint(nameParts[1], 16, 32) + if err != nil { + glog.Warningf("Invalid container hash %q in container %q", nameParts[1], name) + } + } + + return parts[2], parts[3], parts[4], containerName, hash, nil +} + +// toRuntimeProtocol converts api.Protocol to runtimeApi.Protocol +func toRuntimeProtocol(protocol api.Protocol) runtimeApi.Protocol { + switch protocol { + case api.ProtocolTCP: + return runtimeApi.Protocol_TCP + case api.ProtocolUDP: + return runtimeApi.Protocol_UDP + } + + glog.Warningf("Unknown protocol %q: defaulting to TCP", protocol) + return runtimeApi.Protocol_TCP +} + +// milliCPUToShares converts milliCPU to CPU shares +func milliCPUToShares(milliCPU int64) int64 { + if milliCPU == 0 { + // Return 2 here to really match kernel default for zero milliCPU. + return minShares + } + // Conceptually (milliCPU / milliCPUToCPU) * sharesPerCPU, but factored to improve rounding. + shares := (milliCPU * sharesPerCPU) / milliCPUToCPU + if shares < minShares { + return minShares + } + return shares +} + +// milliCPUToQuota converts milliCPU to CFS quota and period values +func milliCPUToQuota(milliCPU int64) (quota int64, period int64) { + // CFS quota is measured in two values: + // - cfs_period_us=100ms (the amount of time to measure usage across) + // - cfs_quota=20ms (the amount of cpu time allowed to be used across a period) + // so in the above example, you are limited to 20% of a single CPU + // for multi-cpu environments, you just scale equivalent amounts + if milliCPU == 0 { + return + } + + // we set the period to 100ms by default + period = quotaPeriod + + // we then convert your milliCPU to a value normalized over a period + quota = (milliCPU * quotaPeriod) / milliCPUToCPU + + // quota needs to be a minimum of 1ms. + if quota < minQuotaPeriod { + quota = minQuotaPeriod + } + + return +} diff --git a/pkg/kubelet/kuberuntime/kuberuntime_container.go b/pkg/kubelet/kuberuntime/kuberuntime_container.go index 08ce278aa1..6bea1abde3 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_container.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_container.go @@ -19,12 +19,178 @@ package kuberuntime import ( "fmt" "io" + "os" + "path" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" + "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/term" ) +// getContainerLogsPath gets log path for container. +func getContainerLogsPath(containerName string, podUID types.UID) string { + return path.Join(podLogsRootDirectory, string(podUID), fmt.Sprintf("%s.log", containerName)) +} + +// generateContainerConfig generates container config for kubelet runtime api. +func (m *kubeGenericRuntimeManager) generateContainerConfig(container *api.Container, pod *api.Pod, restartCount int, podIP string) (*runtimeApi.ContainerConfig, error) { + opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, container, podIP) + if err != nil { + return nil, err + } + + _, containerName, cid := buildContainerName(pod, container) + command, args := kubecontainer.ExpandContainerCommandAndArgs(container, opts.Envs) + containerLogsPath := getContainerLogsPath(containerName, pod.UID) + podHasSELinuxLabel := pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.SELinuxOptions != nil + config := &runtimeApi.ContainerConfig{ + Name: &containerName, + Image: &runtimeApi.ImageSpec{Image: &container.Image}, + Command: command, + Args: args, + WorkingDir: &container.WorkingDir, + Labels: newContainerLabels(container, pod), + Annotations: newContainerAnnotations(container, pod, restartCount), + Mounts: makeMounts(cid, opts, container, podHasSELinuxLabel), + LogPath: &containerLogsPath, + Stdin: &container.Stdin, + StdinOnce: &container.StdinOnce, + Tty: &container.TTY, + Linux: m.generateLinuxContainerConfig(container), + } + + // set priviledged and readonlyRootfs + if container.SecurityContext != nil { + securityContext := container.SecurityContext + if securityContext.Privileged != nil { + config.Privileged = securityContext.Privileged + } + if securityContext.ReadOnlyRootFilesystem != nil { + config.ReadonlyRootfs = securityContext.ReadOnlyRootFilesystem + } + } + + // set environment variables + envs := make([]*runtimeApi.KeyValue, len(opts.Envs)) + for idx := range opts.Envs { + e := opts.Envs[idx] + envs[idx] = &runtimeApi.KeyValue{ + Key: &e.Name, + Value: &e.Value, + } + } + config.Envs = envs + + return config, nil +} + +// generateLinuxContainerConfig generates linux container config for kubelet runtime api. +func (m *kubeGenericRuntimeManager) generateLinuxContainerConfig(container *api.Container) *runtimeApi.LinuxContainerConfig { + linuxConfig := &runtimeApi.LinuxContainerConfig{ + Resources: &runtimeApi.LinuxContainerResources{}, + } + + // set linux container resources + var cpuShares int64 + cpuRequest := container.Resources.Requests.Cpu() + cpuLimit := container.Resources.Limits.Cpu() + memoryLimit := container.Resources.Limits.Memory().Value() + // If request is not specified, but limit is, we want request to default to limit. + // API server does this for new containers, but we repeat this logic in Kubelet + // for containers running on existing Kubernetes clusters. + if cpuRequest.IsZero() && !cpuLimit.IsZero() { + cpuShares = milliCPUToShares(cpuLimit.MilliValue()) + } else { + // if cpuRequest.Amount is nil, then milliCPUToShares will return the minimal number + // of CPU shares. + cpuShares = milliCPUToShares(cpuRequest.MilliValue()) + } + linuxConfig.Resources.CpuShares = &cpuShares + if memoryLimit != 0 { + linuxConfig.Resources.MemoryLimitInBytes = &memoryLimit + } + if m.cpuCFSQuota { + // if cpuLimit.Amount is nil, then the appropriate default value is returned + // to allow full usage of cpu resource. + cpuQuota, cpuPeriod := milliCPUToQuota(cpuLimit.MilliValue()) + linuxConfig.Resources.CpuQuota = &cpuQuota + linuxConfig.Resources.CpuPeriod = &cpuPeriod + } + + // set security context options + if container.SecurityContext != nil { + securityContext := container.SecurityContext + if securityContext.Capabilities != nil { + linuxConfig.Capabilities = &runtimeApi.Capability{ + AddCapabilities: make([]string, 0, len(securityContext.Capabilities.Add)), + DropCapabilities: make([]string, 0, len(securityContext.Capabilities.Drop)), + } + for index, value := range securityContext.Capabilities.Add { + linuxConfig.Capabilities.AddCapabilities[index] = string(value) + } + for index, value := range securityContext.Capabilities.Drop { + linuxConfig.Capabilities.DropCapabilities[index] = string(value) + } + } + + if securityContext.SELinuxOptions != nil { + linuxConfig.SelinuxOptions = &runtimeApi.SELinuxOption{ + User: &securityContext.SELinuxOptions.User, + Role: &securityContext.SELinuxOptions.Role, + Type: &securityContext.SELinuxOptions.Type, + Level: &securityContext.SELinuxOptions.Level, + } + } + } + + return linuxConfig +} + +// makeMounts generates container volume mounts for kubelet runtime api. +func makeMounts(cid string, opts *kubecontainer.RunContainerOptions, container *api.Container, podHasSELinuxLabel bool) []*runtimeApi.Mount { + volumeMounts := []*runtimeApi.Mount{} + + for idx := range opts.Mounts { + v := opts.Mounts[idx] + m := &runtimeApi.Mount{ + Name: &v.Name, + HostPath: &v.HostPath, + ContainerPath: &v.ContainerPath, + Readonly: &v.ReadOnly, + } + if podHasSELinuxLabel && v.SELinuxRelabel { + m.SelinuxRelabel = &v.SELinuxRelabel + } + + volumeMounts = append(volumeMounts, m) + } + + // The reason we create and mount the log file in here (not in kubelet) is because + // the file's location depends on the ID of the container, and we need to create and + // mount the file before actually starting the container. + if opts.PodContainerDir != "" && len(container.TerminationMessagePath) != 0 { + // Because the PodContainerDir contains pod uid and container name which is unique enough, + // here we just add an unique container id to make the path unique for different instances + // of the same container. + containerLogPath := path.Join(opts.PodContainerDir, cid) + fs, err := os.Create(containerLogPath) + if err != nil { + glog.Errorf("Error on creating termination-log file %q: %v", containerLogPath, err) + } else { + fs.Close() + volumeMounts = append(volumeMounts, &runtimeApi.Mount{ + HostPath: &containerLogPath, + ContainerPath: &container.TerminationMessagePath, + }) + } + } + + return volumeMounts +} + // AttachContainer attaches to the container's console func (m *kubeGenericRuntimeManager) AttachContainer(id kubecontainer.ContainerID, stdin io.Reader, stdout, stderr io.WriteCloser, tty bool, resize <-chan term.Size) (err error) { return fmt.Errorf("not implemented") diff --git a/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go new file mode 100644 index 0000000000..b6a4709de7 --- /dev/null +++ b/pkg/kubelet/kuberuntime/kuberuntime_sandbox.go @@ -0,0 +1,107 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package kuberuntime + +import ( + "k8s.io/kubernetes/pkg/api" + runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" + kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" +) + +// generatePodSandboxConfig generates pod sandbox config from api.Pod. +func (m *kubeGenericRuntimeManager) generatePodSandboxConfig(pod *api.Pod, podIP string) (*runtimeApi.PodSandboxConfig, error) { + sandboxName := buildSandboxName(pod) + // TODO: deprecating podsandbox resource requirements in favor of the pod level cgroup + // Refer https://github.com/kubernetes/kubernetes/issues/29871 + podSandboxConfig := &runtimeApi.PodSandboxConfig{ + Name: &sandboxName, + Labels: newPodLabels(pod), + Annotations: newPodAnnotations(pod), + } + + if !kubecontainer.IsHostNetworkPod(pod) { + dnsServers, dnsSearches, err := m.runtimeHelper.GetClusterDNS(pod) + if err != nil { + return nil, err + } + podSandboxConfig.DnsOptions = &runtimeApi.DNSOption{ + Servers: dnsServers, + Searches: dnsSearches, + } + // TODO: Add domain support in new runtime interface + hostname, _, err := m.runtimeHelper.GeneratePodHostNameAndDomain(pod) + if err != nil { + return nil, err + } + podSandboxConfig.Hostname = &hostname + } + + cgroupParent := "" + portMappings := []*runtimeApi.PortMapping{} + for _, c := range pod.Spec.Containers { + opts, err := m.runtimeHelper.GenerateRunContainerOptions(pod, &c, podIP) + if err != nil { + return nil, err + } + + for idx := range opts.PortMappings { + port := opts.PortMappings[idx] + hostPort := int32(port.HostPort) + containerPort := int32(port.ContainerPort) + protocol := toRuntimeProtocol(port.Protocol) + portMappings = append(portMappings, &runtimeApi.PortMapping{ + HostIp: &port.HostIP, + HostPort: &hostPort, + ContainerPort: &containerPort, + Protocol: &protocol, + Name: &port.Name, + }) + } + + // TODO: refactor kubelet to get cgroup parent for pod instead of containers + cgroupParent = opts.CgroupParent + } + podSandboxConfig.Linux = generatePodSandboxLinuxConfig(pod, cgroupParent) + if len(portMappings) > 0 { + podSandboxConfig.PortMappings = portMappings + } + + return podSandboxConfig, nil +} + +// generatePodSandboxLinuxConfig generates LinuxPodSandboxConfig from api.Pod. +func generatePodSandboxLinuxConfig(pod *api.Pod, cgroupParent string) *runtimeApi.LinuxPodSandboxConfig { + if pod.Spec.SecurityContext == nil && cgroupParent == "" { + return nil + } + + linuxPodSandboxConfig := &runtimeApi.LinuxPodSandboxConfig{} + if pod.Spec.SecurityContext != nil { + securityContext := pod.Spec.SecurityContext + linuxPodSandboxConfig.NamespaceOptions = &runtimeApi.NamespaceOption{ + HostNetwork: &securityContext.HostNetwork, + HostIpc: &securityContext.HostIPC, + HostPid: &securityContext.HostPID, + } + } + + if cgroupParent != "" { + linuxPodSandboxConfig.CgroupParent = &cgroupParent + } + + return linuxPodSandboxConfig +}