Merge pull request #33988 from Random-Liu/add-remote-docker-shim

Automatic merge from submit-queue

CRI: Add dockershim grpc server.

This PR adds a in-process grpc server for dockershim.

Flags change:
1. `container-runtime` will not be automatically set to remote when `container-runtime-endpoint` is set. @feiskyer 
2. set kubelet flag `--experimental-runtime-integration-type=remote --container-runtime-endpoint=UNIX_SOCKET_FILE_PATH` to enable the in-process dockershim grpc server.
3. set node e2e test flag `--runtime-integration-type=remote -container-runtime-endpoint=UNIX_SOCKET_FILE_PATH` to run node e2e test against in-process dockershim grpc server.

I've run node e2e test against the remote cri integration, tests which don't rely on stream and log functions can pass.

This unblocks the following work:
1) CRI conformance test.
2) Performance comparison between in-process integration and in-process grpc integration.

@yujuhong @feiskyer 
/cc @kubernetes/sig-node
pull/6/head
Kubernetes Submit Queue 2016-10-25 15:36:29 -07:00 committed by GitHub
commit 67d947996c
11 changed files with 384 additions and 11 deletions

View File

@ -195,7 +195,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
fs.StringSliceVar(&s.AllowedUnsafeSysctls, "experimental-allowed-unsafe-sysctls", s.AllowedUnsafeSysctls, "Comma-separated whitelist of unsafe sysctls or unsafe sysctl patterns (ending in *). Use these at your own risk.")
// Flags intended for testing, not recommended used in production environments.
fs.StringVar(&s.RemoteRuntimeEndpoint, "container-runtime-endpoint", s.RemoteRuntimeEndpoint, "The unix socket endpoint of remote runtime service. If not empty, this option will override --container-runtime. This is an experimental feature. Intended for testing only.")
fs.StringVar(&s.RemoteRuntimeEndpoint, "container-runtime-endpoint", s.RemoteRuntimeEndpoint, "The unix socket endpoint of remote runtime service. This is an experimental feature. Intended for testing only.")
fs.StringVar(&s.RemoteImageEndpoint, "image-service-endpoint", s.RemoteImageEndpoint, "The unix socket endpoint of remote image service. If not specified, it will be the same with container-runtime-endpoint by default. This is an experimental feature. Intended for testing only.")
fs.BoolVar(&s.ReallyCrashForTesting, "really-crash-for-testing", s.ReallyCrashForTesting, "If true, when panics occur crash. Intended for testing.")
fs.Float64Var(&s.ChaosChance, "chaos-chance", s.ChaosChance, "If > 0.0, introduce random client errors and latency. Intended for testing. [default=0.0]")

View File

@ -53,11 +53,13 @@ go_library(
"//pkg/cloudprovider:go_default_library",
"//pkg/fieldpath:go_default_library",
"//pkg/fields:go_default_library",
"//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/cadvisor:go_default_library",
"//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/config:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/kubelet/dockershim/remote:go_default_library",
"//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/envvars:go_default_library",
"//pkg/kubelet/events:go_default_library",

View File

@ -55,7 +55,7 @@ const (
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerLegacyService {
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string) DockerService {
return &dockerService{
seccompProfileRoot: seccompProfileRoot,
client: dockertools.NewInstrumentedDockerInterface(client),
@ -64,13 +64,18 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
}
}
// DockerLegacyService is an interface that embeds both the new
// RuntimeService and ImageService interfaces, while including legacy methods
// for backward compatibility.
type DockerLegacyService interface {
// DockerService is an interface that embeds both the new RuntimeService and
// ImageService interfaces, while including DockerLegacyService for backward
// compatibility.
type DockerService interface {
internalApi.RuntimeService
internalApi.ImageManagerService
DockerLegacyService
}
// DockerLegacyService is an interface that embeds all legacy methods for
// backward compatibility.
type DockerLegacyService interface {
// Supporting legacy methods for docker.
GetContainerLogs(pod *api.Pod, containerID kubecontainer.ContainerID, logOptions *api.PodLogOptions, stdout, stderr io.Writer) (err error)
kubecontainer.ContainerAttacher

View File

@ -0,0 +1,29 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_binary",
"go_library",
"go_test",
"cgo_library",
)
go_library(
name = "go_default_library",
srcs = [
"docker_server.go",
"docker_service.go",
],
tags = ["automanaged"],
deps = [
"//pkg/kubelet/api:go_default_library",
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/dockershim:go_default_library",
"//pkg/util/interrupt:go_default_library",
"//vendor:github.com/golang/glog",
"//vendor:golang.org/x/net/context",
"//vendor:google.golang.org/grpc",
],
)

View File

@ -0,0 +1,89 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"fmt"
"net"
"os"
"syscall"
"github.com/golang/glog"
"google.golang.org/grpc"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
"k8s.io/kubernetes/pkg/util/interrupt"
)
const (
// defaultEndpoint is the default address of dockershim grpc server socket.
defaultAddress = "/var/run/dockershim.sock"
// unixProtocol is the network protocol of unix socket.
unixProtocol = "unix"
)
// DockerServer is the grpc server of dockershim.
type DockerServer struct {
// addr is the address to serve on.
addr string
// service is the docker service which implements runtime and image services.
service DockerService
// server is the grpc server.
server *grpc.Server
}
// NewDockerServer creates the dockershim grpc server.
func NewDockerServer(addr string, s dockershim.DockerService) *DockerServer {
return &DockerServer{
addr: addr,
service: NewDockerService(s),
}
}
// Start starts the dockershim grpc server.
func (s *DockerServer) Start() error {
glog.V(2).Infof("Start dockershim grpc server")
// Unlink to cleanup the previous socket file.
err := syscall.Unlink(s.addr)
if err != nil && !os.IsNotExist(err) {
return fmt.Errorf("failed to unlink socket file %q: %v", s.addr, err)
}
l, err := net.Listen(unixProtocol, s.addr)
if err != nil {
return fmt.Errorf("failed to listen on %q: %v", s.addr, err)
}
// Create the grpc server and register runtime and image services.
s.server = grpc.NewServer()
runtimeApi.RegisterRuntimeServiceServer(s.server, s.service)
runtimeApi.RegisterImageServiceServer(s.server, s.service)
go func() {
// Use interrupt handler to make sure the server to be stopped properly.
h := interrupt.New(nil, s.Stop)
err := h.Run(func() error { return s.server.Serve(l) })
if err != nil {
glog.Errorf("Failed to serve connections: %v", err)
}
}()
return nil
}
// Stop stops the dockershim grpc server.
func (s *DockerServer) Stop() {
glog.V(2).Infof("Stop docker server")
s.server.Stop()
}

View File

@ -0,0 +1,194 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package remote
import (
"fmt"
"golang.org/x/net/context"
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
)
// DockerService is the interface implement CRI remote service server.
type DockerService interface {
runtimeApi.RuntimeServiceServer
runtimeApi.ImageServiceServer
}
// dockerService uses dockershim service to implement DockerService.
// Notice that the contexts in the functions are not used now.
// TODO(random-liu): Change the dockershim service to support context, and implement
// internal services and remote services with the dockershim service.
type dockerService struct {
runtimeService internalApi.RuntimeService
imageService internalApi.ImageManagerService
}
func NewDockerService(s dockershim.DockerService) DockerService {
return &dockerService{runtimeService: s, imageService: s}
}
func (d *dockerService) Version(ctx context.Context, r *runtimeApi.VersionRequest) (*runtimeApi.VersionResponse, error) {
return d.runtimeService.Version(r.GetVersion())
}
func (d *dockerService) RunPodSandbox(ctx context.Context, r *runtimeApi.RunPodSandboxRequest) (*runtimeApi.RunPodSandboxResponse, error) {
podSandboxId, err := d.runtimeService.RunPodSandbox(r.GetConfig())
if err != nil {
return nil, err
}
return &runtimeApi.RunPodSandboxResponse{PodSandboxId: &podSandboxId}, nil
}
func (d *dockerService) StopPodSandbox(ctx context.Context, r *runtimeApi.StopPodSandboxRequest) (*runtimeApi.StopPodSandboxResponse, error) {
err := d.runtimeService.StopPodSandbox(r.GetPodSandboxId())
if err != nil {
return nil, err
}
return &runtimeApi.StopPodSandboxResponse{}, nil
}
func (d *dockerService) RemovePodSandbox(ctx context.Context, r *runtimeApi.RemovePodSandboxRequest) (*runtimeApi.RemovePodSandboxResponse, error) {
err := d.runtimeService.RemovePodSandbox(r.GetPodSandboxId())
if err != nil {
return nil, err
}
return &runtimeApi.RemovePodSandboxResponse{}, nil
}
func (d *dockerService) PodSandboxStatus(ctx context.Context, r *runtimeApi.PodSandboxStatusRequest) (*runtimeApi.PodSandboxStatusResponse, error) {
podSandboxStatus, err := d.runtimeService.PodSandboxStatus(r.GetPodSandboxId())
if err != nil {
return nil, err
}
return &runtimeApi.PodSandboxStatusResponse{Status: podSandboxStatus}, nil
}
func (d *dockerService) ListPodSandbox(ctx context.Context, r *runtimeApi.ListPodSandboxRequest) (*runtimeApi.ListPodSandboxResponse, error) {
items, err := d.runtimeService.ListPodSandbox(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeApi.ListPodSandboxResponse{Items: items}, nil
}
func (d *dockerService) CreateContainer(ctx context.Context, r *runtimeApi.CreateContainerRequest) (*runtimeApi.CreateContainerResponse, error) {
containerId, err := d.runtimeService.CreateContainer(r.GetPodSandboxId(), r.GetConfig(), r.GetSandboxConfig())
if err != nil {
return nil, err
}
return &runtimeApi.CreateContainerResponse{ContainerId: &containerId}, nil
}
func (d *dockerService) StartContainer(ctx context.Context, r *runtimeApi.StartContainerRequest) (*runtimeApi.StartContainerResponse, error) {
err := d.runtimeService.StartContainer(r.GetContainerId())
if err != nil {
return nil, err
}
return &runtimeApi.StartContainerResponse{}, nil
}
func (d *dockerService) StopContainer(ctx context.Context, r *runtimeApi.StopContainerRequest) (*runtimeApi.StopContainerResponse, error) {
err := d.runtimeService.StopContainer(r.GetContainerId(), r.GetTimeout())
if err != nil {
return nil, err
}
return &runtimeApi.StopContainerResponse{}, nil
}
func (d *dockerService) RemoveContainer(ctx context.Context, r *runtimeApi.RemoveContainerRequest) (*runtimeApi.RemoveContainerResponse, error) {
err := d.runtimeService.RemoveContainer(r.GetContainerId())
if err != nil {
return nil, err
}
return &runtimeApi.RemoveContainerResponse{}, nil
}
func (d *dockerService) ListContainers(ctx context.Context, r *runtimeApi.ListContainersRequest) (*runtimeApi.ListContainersResponse, error) {
containers, err := d.runtimeService.ListContainers(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeApi.ListContainersResponse{Containers: containers}, nil
}
func (d *dockerService) ContainerStatus(ctx context.Context, r *runtimeApi.ContainerStatusRequest) (*runtimeApi.ContainerStatusResponse, error) {
status, err := d.runtimeService.ContainerStatus(r.GetContainerId())
if err != nil {
return nil, err
}
return &runtimeApi.ContainerStatusResponse{Status: status}, nil
}
func (d *dockerService) ExecSync(ctx context.Context, r *runtimeApi.ExecSyncRequest) (*runtimeApi.ExecSyncResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (d *dockerService) Exec(ctx context.Context, r *runtimeApi.ExecRequest) (*runtimeApi.ExecResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (d *dockerService) Attach(ctx context.Context, r *runtimeApi.AttachRequest) (*runtimeApi.AttachResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (d *dockerService) PortForward(ctx context.Context, r *runtimeApi.PortForwardRequest) (*runtimeApi.PortForwardResponse, error) {
return nil, fmt.Errorf("not implemented")
}
func (d *dockerService) UpdateRuntimeConfig(ctx context.Context, r *runtimeApi.UpdateRuntimeConfigRequest) (*runtimeApi.UpdateRuntimeConfigResponse, error) {
err := d.runtimeService.UpdateRuntimeConfig(r.GetRuntimeConfig())
if err != nil {
return nil, err
}
return &runtimeApi.UpdateRuntimeConfigResponse{}, nil
}
func (d *dockerService) ListImages(ctx context.Context, r *runtimeApi.ListImagesRequest) (*runtimeApi.ListImagesResponse, error) {
images, err := d.imageService.ListImages(r.GetFilter())
if err != nil {
return nil, err
}
return &runtimeApi.ListImagesResponse{Images: images}, nil
}
func (d *dockerService) ImageStatus(ctx context.Context, r *runtimeApi.ImageStatusRequest) (*runtimeApi.ImageStatusResponse, error) {
image, err := d.imageService.ImageStatus(r.GetImage())
if err != nil {
return nil, err
}
return &runtimeApi.ImageStatusResponse{Image: image}, nil
}
func (d *dockerService) PullImage(ctx context.Context, r *runtimeApi.PullImageRequest) (*runtimeApi.PullImageResponse, error) {
err := d.imageService.PullImage(r.GetImage(), r.GetAuth())
if err != nil {
return nil, err
}
return &runtimeApi.PullImageResponse{}, nil
}
func (d *dockerService) RemoveImage(ctx context.Context, r *runtimeApi.RemoveImageRequest) (*runtimeApi.RemoveImageResponse, error) {
err := d.imageService.RemoveImage(r.GetImage())
if err != nil {
return nil, err
}
return &runtimeApi.RemoveImageResponse{}, nil
}

View File

@ -39,11 +39,13 @@ import (
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/cloudprovider"
"k8s.io/kubernetes/pkg/fields"
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
"k8s.io/kubernetes/pkg/kubelet/cadvisor"
"k8s.io/kubernetes/pkg/kubelet/cm"
"k8s.io/kubernetes/pkg/kubelet/config"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim"
dockerremote "k8s.io/kubernetes/pkg/kubelet/dockershim/remote"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/events"
"k8s.io/kubernetes/pkg/kubelet/eviction"
@ -475,8 +477,6 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
klet.podManager = kubepod.NewBasicPodManager(kubepod.NewBasicMirrorClient(klet.kubeClient))
if kubeCfg.RemoteRuntimeEndpoint != "" {
kubeCfg.ContainerRuntime = "remote"
// kubeCfg.RemoteImageEndpoint is same as kubeCfg.RemoteRuntimeEndpoint if not explicitly specified
if kubeCfg.RemoteImageEndpoint == "" {
kubeCfg.RemoteImageEndpoint = kubeCfg.RemoteRuntimeEndpoint
@ -488,7 +488,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
case "docker":
switch kubeCfg.ExperimentalRuntimeIntegrationType {
case "cri":
// Use the new CRI shim for docker. This is need for testing the
// Use the new CRI shim for docker. This is needed for testing the
// docker integration through CRI, and may be removed in the future.
dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage)
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
@ -512,6 +512,53 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
if err != nil {
return nil, err
}
case "remote":
// kubelet will talk to the shim over a unix socket using grpc. This may become the default in the near future.
dockerService := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage)
// Start the in process dockershim grpc server.
server := dockerremote.NewDockerServer(kubeCfg.RemoteRuntimeEndpoint, dockerService)
if err := server.Start(); err != nil {
return nil, err
}
// Start the remote kuberuntime manager.
remoteRuntimeService, err := remote.NewRemoteRuntimeService(kubeCfg.RemoteRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
remoteImageService, err := remote.NewRemoteImageService(kubeCfg.RemoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration)
if err != nil {
return nil, err
}
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
klet.livenessManager,
containerRefManager,
machineInfo,
klet.podManager,
kubeDeps.OSInterface,
klet.networkPlugin,
klet,
klet.httpClient,
imageBackOff,
kubeCfg.SerializeImagePulls,
float32(kubeCfg.RegistryPullQPS),
int(kubeCfg.RegistryBurst),
klet.cpuCFSQuota,
// Use DockerLegacyService directly to workaround unimplemented functions.
// We add short hack here to keep other code clean.
// TODO: Remove this hack after CRI is fully designed and implemented.
&struct {
internalApi.RuntimeService
dockershim.DockerLegacyService
}{
RuntimeService: remoteRuntimeService,
DockerLegacyService: dockerService,
},
remoteImageService,
)
if err != nil {
return nil, err
}
default:
// Only supported one for now, continue.
klet.containerRuntime = dockertools.NewDockerManager(

View File

@ -34,7 +34,7 @@ type RemoteImageService struct {
// NewRemoteImageService creates a new internalApi.ImageManagerService.
func NewRemoteImageService(addr string, connectionTimout time.Duration) (internalApi.ImageManagerService, error) {
glog.V(3).Infof("Connecting to image service %s", addr)
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial))
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial))
if err != nil {
glog.Errorf("Connect remote image service %s failed: %v", addr, err)
return nil, err

View File

@ -36,7 +36,7 @@ type RemoteRuntimeService struct {
// NewRemoteRuntimeService creates a new internalApi.RuntimeService.
func NewRemoteRuntimeService(addr string, connectionTimout time.Duration) (internalApi.RuntimeService, error) {
glog.V(3).Infof("Connecting to runtime service %s", addr)
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithDialer(dial))
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial))
if err != nil {
glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
return nil, err

View File

@ -114,6 +114,9 @@ type NodeTestContextType struct {
PrepullImages bool
// RuntimeIntegrationType indicates how runtime is integrated with Kubelet. This is mainly used for CRI validation test.
RuntimeIntegrationType string
// ContainerRuntimeEndpoint is the endpoint of remote container runtime grpc server. This is mainly used for Remote CRI
// validation test.
ContainerRuntimeEndpoint string
// MounterPath is the path to the program to run to perform a mount
MounterPath string
}
@ -211,6 +214,7 @@ func RegisterNodeFlags() {
flag.StringVar(&TestContext.ManifestPath, "manifest-path", "", "The path to the static pod manifest file.")
flag.BoolVar(&TestContext.PrepullImages, "prepull-images", true, "If true, prepull images so image pull failures do not cause test failures.")
flag.StringVar(&TestContext.RuntimeIntegrationType, "runtime-integration-type", "", "Choose the integration path for the container runtime, mainly used for CRI validation.")
flag.StringVar(&TestContext.ContainerRuntimeEndpoint, "container-runtime-endpoint", "", "The endpoint of remote container runtime grpc server, mainly used for Remote CRI validation.")
flag.StringVar(&TestContext.MounterPath, "mounter-path", "", "Path of mounter binary. Leave empty to use the default mount.")
}

View File

@ -218,6 +218,9 @@ func (e *E2EServices) startKubelet() (*server, error) {
cmdArgs = append(cmdArgs, "--experimental-runtime-integration-type",
framework.TestContext.RuntimeIntegrationType) // Whether to use experimental cri integration.
}
if framework.TestContext.ContainerRuntimeEndpoint != "" {
cmdArgs = append(cmdArgs, "--container-runtime-endpoint", framework.TestContext.ContainerRuntimeEndpoint)
}
if framework.TestContext.CgroupsPerQOS {
// TODO: enable this when the flag is stable and available in kubelet.
// cmdArgs = append(cmdArgs,