mirror of https://github.com/k3s-io/k3s
Merge pull request #35490 from bprashanth/dockershim_probe_net
Automatic merge from submit-queue Migrate Network Logic into runtime II - [x] Probing - [x] Bootstrap CIDR - [x] Setup/Teardown - [ ] ~~Hostports~~ - [ ] ~~BWShaping~~ https://github.com/kubernetes/kubernetes/pull/34780pull/6/head
commit
a02750dd8f
|
@ -118,6 +118,10 @@ type Runtime interface {
|
|||
ContainerAttacher
|
||||
// ImageService provides methods to image-related methods.
|
||||
ImageService
|
||||
// UpdatePodCIDR sends a new podCIDR to the runtime.
|
||||
// This method just proxies a new runtimeConfig with the updated
|
||||
// CIDR value down to the runtime shim.
|
||||
UpdatePodCIDR(podCIDR string) error
|
||||
}
|
||||
|
||||
type ImageService interface {
|
||||
|
|
|
@ -118,6 +118,11 @@ func (f *FakeRuntime) ClearCalls() {
|
|||
f.StatusErr = nil
|
||||
}
|
||||
|
||||
// UpdatePodCIDR fulfills the cri interface.
|
||||
func (f *FakeRuntime) UpdatePodCIDR(c string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (f *FakeRuntime) assertList(expect []string, test []string) error {
|
||||
if !reflect.DeepEqual(expect, test) {
|
||||
return fmt.Errorf("expected %#v, got %#v", expect, test)
|
||||
|
|
|
@ -153,3 +153,8 @@ func (r *Mock) ImageStats() (*ImageStats, error) {
|
|||
args := r.Called()
|
||||
return args.Get(0).(*ImageStats), args.Error(1)
|
||||
}
|
||||
|
||||
// UpdatePodCIDR fulfills the cri interface.
|
||||
func (r *Mock) UpdatePodCIDR(c string) error {
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -27,11 +27,15 @@ go_library(
|
|||
tags = ["automanaged"],
|
||||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/apis/componentconfig:go_default_library",
|
||||
"//pkg/kubelet/api:go_default_library",
|
||||
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/dockertools:go_default_library",
|
||||
"//pkg/kubelet/leaky:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/cni:go_default_library",
|
||||
"//pkg/kubelet/network/kubenet:go_default_library",
|
||||
"//pkg/kubelet/qos:go_default_library",
|
||||
"//pkg/kubelet/server/streaming:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
|
@ -63,12 +67,16 @@ go_test(
|
|||
deps = [
|
||||
"//pkg/api:go_default_library",
|
||||
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
|
||||
"//pkg/kubelet/container:go_default_library",
|
||||
"//pkg/kubelet/container/testing:go_default_library",
|
||||
"//pkg/kubelet/dockertools:go_default_library",
|
||||
"//pkg/kubelet/network:go_default_library",
|
||||
"//pkg/kubelet/network/mock_network:go_default_library",
|
||||
"//pkg/kubelet/types:go_default_library",
|
||||
"//pkg/security/apparmor:go_default_library",
|
||||
"//pkg/util/clock:go_default_library",
|
||||
"//vendor:github.com/docker/engine-api/types",
|
||||
"//vendor:github.com/golang/mock/gomock",
|
||||
"//vendor:github.com/stretchr/testify/assert",
|
||||
],
|
||||
)
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/golang/glog"
|
||||
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/qos"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
)
|
||||
|
@ -37,6 +38,9 @@ const (
|
|||
|
||||
// Termination grace period
|
||||
defaultSandboxGracePeriod int = 10
|
||||
|
||||
// Name of the underlying container runtime
|
||||
runtimeName = "docker"
|
||||
)
|
||||
|
||||
// RunPodSandbox creates and starts a pod-level sandbox. Runtimes should ensure
|
||||
|
@ -72,12 +76,48 @@ func (ds *dockerService) RunPodSandbox(config *runtimeApi.PodSandboxConfig) (str
|
|||
// Assume kubelet's garbage collector would remove the sandbox later, if
|
||||
// startContainer failed.
|
||||
err = ds.client.StartContainer(createResp.ID)
|
||||
if err != nil {
|
||||
return createResp.ID, fmt.Errorf("failed to start sandbox container for pod %q: %v", config.Metadata.GetName(), err)
|
||||
}
|
||||
if config.GetLinux().GetNamespaceOptions().GetHostNetwork() {
|
||||
return createResp.ID, nil
|
||||
}
|
||||
|
||||
// Step 4: Setup networking for the sandbox.
|
||||
// All pod networking is setup by a CNI plugin discovered at startup time.
|
||||
// This plugin assigns the pod ip, sets up routes inside the sandbox,
|
||||
// creates interfaces etc. In theory, its jurisdiction ends with pod
|
||||
// sandbox networking, but it might insert iptables rules or open ports
|
||||
// on the host as well, to satisfy parts of the pod spec that aren't
|
||||
// recognized by the CNI standard yet.
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, createResp.ID)
|
||||
err = ds.networkPlugin.SetUpPod(config.GetMetadata().GetNamespace(), config.GetMetadata().GetName(), cID)
|
||||
// TODO: Do we need to teardown on failure or can we rely on a StopPodSandbox call with the given ID?
|
||||
return createResp.ID, err
|
||||
}
|
||||
|
||||
// StopPodSandbox stops the sandbox. If there are any running containers in the
|
||||
// sandbox, they should be force terminated.
|
||||
// TODO: This function blocks sandbox teardown on networking teardown. Is it
|
||||
// better to cut our losses assuming an out of band GC routine will cleanup
|
||||
// after us?
|
||||
func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
|
||||
status, err := ds.PodSandboxStatus(podSandboxID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to get sandbox status: %v", err)
|
||||
}
|
||||
if !status.GetLinux().GetNamespaces().GetOptions().GetHostNetwork() {
|
||||
m := status.GetMetadata()
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID)
|
||||
if err := ds.networkPlugin.TearDownPod(m.GetNamespace(), m.GetName(), cID); err != nil {
|
||||
// TODO: Figure out a way to retry this error. We can't
|
||||
// right now because the plugin throws errors when it doesn't find
|
||||
// eth0, which might not exist for various reasons (setup failed,
|
||||
// conf changed etc). In theory, it should teardown everything else
|
||||
// so there's no need to retry.
|
||||
glog.Errorf("Failed to teardown sandbox %v for pod %v/%v: %v", m.GetNamespace(), m.GetName(), podSandboxID, err)
|
||||
}
|
||||
}
|
||||
return ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod)
|
||||
// TODO: Stop all running containers in the sandbox.
|
||||
}
|
||||
|
@ -89,6 +129,53 @@ func (ds *dockerService) RemovePodSandbox(podSandboxID string) error {
|
|||
// TODO: remove all containers in the sandbox.
|
||||
}
|
||||
|
||||
// getIPFromPlugin interrogates the network plugin for an IP.
|
||||
func (ds *dockerService) getIPFromPlugin(sandbox *dockertypes.ContainerJSON) (string, error) {
|
||||
metadata, err := parseSandboxName(sandbox.Name)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
msg := fmt.Sprintf("Couldn't find network status for %s/%s through plugin", *metadata.Namespace, *metadata.Name)
|
||||
if sharesHostNetwork(sandbox) {
|
||||
return "", fmt.Errorf("%v: not responsible for host-network sandboxes", msg)
|
||||
}
|
||||
cID := kubecontainer.BuildContainerID(runtimeName, sandbox.ID)
|
||||
networkStatus, err := ds.networkPlugin.GetPodNetworkStatus(*metadata.Namespace, *metadata.Name, cID)
|
||||
if err != nil {
|
||||
// This might be a sandbox that somehow ended up without a default
|
||||
// interface (eth0). We can't distinguish this from a more serious
|
||||
// error, so callers should probably treat it as non-fatal.
|
||||
return "", fmt.Errorf("%v: %v", msg, err)
|
||||
}
|
||||
if networkStatus == nil {
|
||||
return "", fmt.Errorf("%v: invalid network status for", msg)
|
||||
}
|
||||
return networkStatus.IP.String(), nil
|
||||
}
|
||||
|
||||
// getIP returns the ip given the output of `docker inspect` on a pod sandbox,
|
||||
// first interrogating any registered plugins, then simply trusting the ip
|
||||
// in the sandbox itself. We look for an ipv4 address before ipv6.
|
||||
func (ds *dockerService) getIP(sandbox *dockertypes.ContainerJSON) (string, error) {
|
||||
if sandbox.NetworkSettings == nil {
|
||||
return "", nil
|
||||
}
|
||||
if IP, err := ds.getIPFromPlugin(sandbox); err != nil {
|
||||
glog.Warningf("%v", err)
|
||||
} else if IP != "" {
|
||||
return IP, nil
|
||||
}
|
||||
// TODO: trusting the docker ip is not a great idea. However docker uses
|
||||
// eth0 by default and so does CNI, so if we find a docker IP here, we
|
||||
// conclude that the plugin must have failed setup, or forgotten its ip.
|
||||
// This is not a sensible assumption for plugins across the board, but if
|
||||
// a plugin doesn't want this behavior, it can throw an error.
|
||||
if sandbox.NetworkSettings.IPAddress != "" {
|
||||
return sandbox.NetworkSettings.IPAddress, nil
|
||||
}
|
||||
return sandbox.NetworkSettings.GlobalIPv6Address, nil
|
||||
}
|
||||
|
||||
// PodSandboxStatus returns the status of the PodSandbox.
|
||||
func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodSandboxStatus, error) {
|
||||
// Inspect the container.
|
||||
|
@ -109,20 +196,9 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS
|
|||
if r.State.Running {
|
||||
state = runtimeApi.PodSandBoxState_READY
|
||||
}
|
||||
|
||||
// TODO: We can't really get the IP address from the network plugin, which
|
||||
// is handled by kubelet as of now. Should we amend the interface? How is
|
||||
// this handled in the new remote runtime integration?
|
||||
// See DockerManager.determineContainerIP() for more details.
|
||||
// For now, just assume that there is no network plugin.
|
||||
// Related issue: https://github.com/kubernetes/kubernetes/issues/28667
|
||||
var IP string
|
||||
if r.NetworkSettings != nil {
|
||||
IP = r.NetworkSettings.IPAddress
|
||||
// Fall back to IPv6 address if no IPv4 address is present
|
||||
if IP == "" {
|
||||
IP = r.NetworkSettings.GlobalIPv6Address
|
||||
}
|
||||
IP, err := ds.getIP(r)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
network := &runtimeApi.PodSandboxNetworkStatus{Ip: &IP}
|
||||
netNS := getNetworkNamespace(r)
|
||||
|
@ -131,7 +207,7 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
hostNetwork := sharesHostNetwork(r)
|
||||
labels, annotations := extractLabels(r.Config.Labels)
|
||||
return &runtimeApi.PodSandboxStatus{
|
||||
Id: &r.ID,
|
||||
|
@ -141,7 +217,14 @@ func (ds *dockerService) PodSandboxStatus(podSandboxID string) (*runtimeApi.PodS
|
|||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
Network: network,
|
||||
Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &netNS}},
|
||||
Linux: &runtimeApi.LinuxPodSandboxStatus{
|
||||
Namespaces: &runtimeApi.Namespace{
|
||||
Network: &netNS,
|
||||
Options: &runtimeApi.NamespaceOption{
|
||||
HostNetwork: &hostNetwork,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
|
@ -279,6 +362,15 @@ func (ds *dockerService) makeSandboxDockerConfig(c *runtimeApi.PodSandboxConfig,
|
|||
return createConfig, nil
|
||||
}
|
||||
|
||||
// sharesHostNetwork true if the given container is sharing the hosts's
|
||||
// network namespace.
|
||||
func sharesHostNetwork(container *dockertypes.ContainerJSON) bool {
|
||||
if container != nil && container.HostConfig != nil {
|
||||
return string(container.HostConfig.NetworkMode) == namespaceModeHost
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func setSandboxResources(hc *dockercontainer.HostConfig) {
|
||||
hc.Resources = dockercontainer.Resources{
|
||||
MemorySwap: -1, // Always disable memory swap.
|
||||
|
|
|
@ -25,6 +25,7 @@ import (
|
|||
"github.com/stretchr/testify/assert"
|
||||
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
)
|
||||
|
||||
|
@ -99,12 +100,13 @@ func TestSandboxStatus(t *testing.T) {
|
|||
|
||||
state := runtimeApi.PodSandBoxState_READY
|
||||
ct := int64(0)
|
||||
hostNetwork := false
|
||||
expected := &runtimeApi.PodSandboxStatus{
|
||||
State: &state,
|
||||
CreatedAt: &ct,
|
||||
Metadata: config.Metadata,
|
||||
Network: &runtimeApi.PodSandboxNetworkStatus{Ip: &fakeIP},
|
||||
Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &fakeNS}},
|
||||
Linux: &runtimeApi.LinuxPodSandboxStatus{Namespaces: &runtimeApi.Namespace{Network: &fakeNS, Options: &runtimeApi.NamespaceOption{HostNetwork: &hostNetwork}}},
|
||||
Labels: labels,
|
||||
Annotations: annotations,
|
||||
}
|
||||
|
@ -138,3 +140,57 @@ func TestSandboxStatus(t *testing.T) {
|
|||
status, err = ds.PodSandboxStatus(id)
|
||||
assert.Error(t, err, fmt.Sprintf("status of sandbox: %+v", status))
|
||||
}
|
||||
|
||||
// TestNetworkPluginInvocation checks that the right SetUpPod and TearDownPod
|
||||
// calls are made when we run/stop a sandbox.
|
||||
func TestNetworkPluginInvocation(t *testing.T) {
|
||||
ds, _, _ := newTestDockerService()
|
||||
mockPlugin := newTestNetworkPlugin(t)
|
||||
ds.networkPlugin = mockPlugin
|
||||
defer mockPlugin.Finish()
|
||||
|
||||
name := "foo0"
|
||||
ns := "bar0"
|
||||
c := makeSandboxConfigWithLabelsAndAnnotations(
|
||||
name, ns, "0", 0,
|
||||
map[string]string{"label": name},
|
||||
map[string]string{"annotation": ns},
|
||||
)
|
||||
cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))}
|
||||
|
||||
setup := mockPlugin.EXPECT().SetUpPod(ns, name, cID)
|
||||
// StopPodSandbox performs a lookup on status to figure out if the sandbox
|
||||
// is running with hostnetworking, as all its given is the ID.
|
||||
mockPlugin.EXPECT().GetPodNetworkStatus(ns, name, cID)
|
||||
mockPlugin.EXPECT().TearDownPod(ns, name, cID).After(setup)
|
||||
|
||||
_, err := ds.RunPodSandbox(c)
|
||||
assert.NoError(t, err)
|
||||
err = ds.StopPodSandbox(cID.ID)
|
||||
assert.NoError(t, err)
|
||||
}
|
||||
|
||||
// TestHostNetworkPluginInvocation checks that *no* SetUp/TearDown calls happen
|
||||
// for host network sandboxes.
|
||||
func TestHostNetworkPluginInvocation(t *testing.T) {
|
||||
ds, _, _ := newTestDockerService()
|
||||
mockPlugin := newTestNetworkPlugin(t)
|
||||
ds.networkPlugin = mockPlugin
|
||||
defer mockPlugin.Finish()
|
||||
|
||||
name := "foo0"
|
||||
ns := "bar0"
|
||||
c := makeSandboxConfigWithLabelsAndAnnotations(
|
||||
name, ns, "0", 0,
|
||||
map[string]string{"label": name},
|
||||
map[string]string{"annotation": ns},
|
||||
)
|
||||
hostNetwork := true
|
||||
c.Linux = &runtimeApi.LinuxPodSandboxConfig{NamespaceOptions: &runtimeApi.NamespaceOption{HostNetwork: &hostNetwork}}
|
||||
cID := kubecontainer.ContainerID{Type: runtimeName, ID: fmt.Sprintf("/%v", makeSandboxName(c))}
|
||||
|
||||
// No calls to network plugin are expected
|
||||
_, err := ds.RunPodSandbox(c)
|
||||
assert.NoError(t, err)
|
||||
assert.NoError(t, ds.StopPodSandbox(cID.ID))
|
||||
}
|
||||
|
|
|
@ -20,11 +20,16 @@ import (
|
|||
"fmt"
|
||||
"io"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
"k8s.io/kubernetes/pkg/apis/componentconfig"
|
||||
internalApi "k8s.io/kubernetes/pkg/kubelet/api"
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/cni"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/kubenet"
|
||||
"k8s.io/kubernetes/pkg/kubelet/server/streaming"
|
||||
"k8s.io/kubernetes/pkg/util/term"
|
||||
)
|
||||
|
@ -53,10 +58,41 @@ const (
|
|||
sandboxIDLabelKey = "io.kubernetes.sandbox.id"
|
||||
)
|
||||
|
||||
// NetworkPluginArgs is the subset of kubelet runtime args we pass
|
||||
// to the container runtime shim so it can probe for network plugins.
|
||||
// In the future we will feed these directly to a standalone container
|
||||
// runtime process.
|
||||
type NetworkPluginSettings struct {
|
||||
// HairpinMode is best described by comments surrounding the kubelet arg
|
||||
HairpinMode componentconfig.HairpinMode
|
||||
// NonMasqueradeCIDR is the range of ips which should *not* be included
|
||||
// in any MASQUERADE rules applied by the plugin
|
||||
NonMasqueradeCIDR string
|
||||
// PluginName is the name of the plugin, runtime shim probes for
|
||||
PluginName string
|
||||
// PluginBinDir is the directory in which the binaries for the plugin with
|
||||
// PluginName is kept. The admin is responsible for provisioning these
|
||||
// binaries before-hand.
|
||||
PluginBinDir string
|
||||
// PluginConfDir is the directory in which the admin places a CNI conf.
|
||||
// Depending on the plugin, this may be an optional field, eg: kubenet
|
||||
// generates its own plugin conf.
|
||||
PluginConfDir string
|
||||
// MTU is the desired MTU for network devices created by the plugin.
|
||||
MTU int
|
||||
|
||||
// RuntimeHost is an interface that serves as a trap-door from plugin back
|
||||
// into the kubelet.
|
||||
// TODO: This shouldn't be required, remove once we move host ports into CNI
|
||||
// and figure out bandwidth shaping. See corresponding comments above
|
||||
// network.Host interface.
|
||||
LegacyRuntimeHost network.LegacyHost
|
||||
}
|
||||
|
||||
var internalLabelKeys []string = []string{containerTypeLabelKey, containerLogPathLabelKey, sandboxIDLabelKey}
|
||||
|
||||
// NOTE: Anything passed to DockerService should be eventually handled in another way when we switch to running the shim as a different process.
|
||||
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config) (DockerService, error) {
|
||||
func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot string, podSandboxImage string, streamingConfig *streaming.Config, pluginSettings *NetworkPluginSettings) (DockerService, error) {
|
||||
ds := &dockerService{
|
||||
seccompProfileRoot: seccompProfileRoot,
|
||||
client: dockertools.NewInstrumentedDockerInterface(client),
|
||||
|
@ -76,6 +112,19 @@ func NewDockerService(client dockertools.DockerInterface, seccompProfileRoot str
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
// dockershim currently only supports CNI plugins.
|
||||
cniPlugins := cni.ProbeNetworkPlugins(pluginSettings.PluginConfDir, pluginSettings.PluginBinDir)
|
||||
cniPlugins = append(cniPlugins, kubenet.NewPlugin(pluginSettings.PluginBinDir))
|
||||
netHost := &dockerNetworkHost{
|
||||
pluginSettings.LegacyRuntimeHost,
|
||||
&namespaceGetter{ds},
|
||||
}
|
||||
plug, err := network.InitNetworkPlugin(cniPlugins, pluginSettings.PluginName, netHost, pluginSettings.HairpinMode, pluginSettings.NonMasqueradeCIDR, pluginSettings.MTU)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("didn't find compatible CNI plugin with given settings %+v: %v", pluginSettings, err)
|
||||
}
|
||||
ds.networkPlugin = plug
|
||||
glog.Infof("Docker cri networking managed by %v", plug.Name())
|
||||
return ds, nil
|
||||
}
|
||||
|
||||
|
@ -105,6 +154,7 @@ type dockerService struct {
|
|||
podSandboxImage string
|
||||
streamingRuntime *streamingRuntime
|
||||
streamingServer streaming.Server
|
||||
networkPlugin network.NetworkPlugin
|
||||
}
|
||||
|
||||
// Version returns the runtime name, runtime version and runtime API version
|
||||
|
@ -126,6 +176,41 @@ func (ds *dockerService) Version(_ string) (*runtimeApi.VersionResponse, error)
|
|||
}, nil
|
||||
}
|
||||
|
||||
func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
|
||||
return nil
|
||||
// UpdateRuntimeConfig updates the runtime config. Currently only handles podCIDR updates.
|
||||
func (ds *dockerService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) (err error) {
|
||||
if runtimeConfig == nil {
|
||||
return
|
||||
}
|
||||
glog.Infof("docker cri received runtime config %+v", runtimeConfig)
|
||||
if ds.networkPlugin != nil && runtimeConfig.NetworkConfig.PodCidr != nil {
|
||||
event := make(map[string]interface{})
|
||||
event[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = *runtimeConfig.NetworkConfig.PodCidr
|
||||
ds.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, event)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// namespaceGetter is a wrapper around the dockerService that implements
|
||||
// the network.NamespaceGetter interface.
|
||||
type namespaceGetter struct {
|
||||
*dockerService
|
||||
}
|
||||
|
||||
// GetNetNS returns the network namespace of the given containerID. The ID
|
||||
// supplied is typically the ID of a pod sandbox. This getter doesn't try
|
||||
// to map non-sandbox IDs to their respective sandboxes.
|
||||
func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
|
||||
r, err := ds.client.InspectContainer(podSandboxID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return getNetworkNamespace(r), nil
|
||||
}
|
||||
|
||||
// dockerNetworkHost implements network.Host by wrapping the legacy host
|
||||
// passed in by the kubelet and adding NamespaceGetter methods. The legacy
|
||||
// host methods are slated for deletion.
|
||||
type dockerNetworkHost struct {
|
||||
network.LegacyHost
|
||||
*namespaceGetter
|
||||
}
|
||||
|
|
|
@ -17,15 +17,25 @@ limitations under the License.
|
|||
package dockershim
|
||||
|
||||
import (
|
||||
"github.com/golang/mock/gomock"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
containertest "k8s.io/kubernetes/pkg/kubelet/container/testing"
|
||||
"k8s.io/kubernetes/pkg/kubelet/dockertools"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network/mock_network"
|
||||
"k8s.io/kubernetes/pkg/util/clock"
|
||||
)
|
||||
|
||||
// newTestNetworkPlugin returns a mock plugin that implements network.NetworkPlugin
|
||||
func newTestNetworkPlugin(t *testing.T) *mock_network.MockNetworkPlugin {
|
||||
ctrl := gomock.NewController(t)
|
||||
return mock_network.NewMockNetworkPlugin(ctrl)
|
||||
}
|
||||
|
||||
func newTestDockerService() (*dockerService, *dockertools.FakeDockerClient, *clock.FakeClock) {
|
||||
fakeClock := clock.NewFakeClock(time.Time{})
|
||||
c := dockertools.NewFakeDockerClientWithClock(fakeClock)
|
||||
return &dockerService{client: c, os: &containertest.FakeOS{}}, c, fakeClock
|
||||
return &dockerService{client: c, os: &containertest.FakeOS{}, networkPlugin: &network.NoopNetworkPlugin{}}, c, fakeClock
|
||||
}
|
||||
|
|
|
@ -1321,6 +1321,12 @@ func (dm *DockerManager) PortForward(pod *kubecontainer.Pod, port uint16, stream
|
|||
return PortForward(dm.client, podInfraContainer.ID.ID, port, stream)
|
||||
}
|
||||
|
||||
// UpdatePodCIDR updates the podCIDR for the runtime.
|
||||
// Currently no-ops, just implemented to satisfy the cri.
|
||||
func (dm *DockerManager) UpdatePodCIDR(podCIDR string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// Temporarily export this function to share with dockershim.
|
||||
func PortForward(client DockerInterface, podInfraContainerID string, port uint16, stream io.ReadWriteCloser) error {
|
||||
container, err := client.InspectContainer(podInfraContainerID)
|
||||
|
|
|
@ -456,7 +456,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
}
|
||||
glog.Infof("Hairpin mode set to %q", klet.hairpinMode)
|
||||
|
||||
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &networkHost{klet}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
|
||||
if plug, err := network.InitNetworkPlugin(kubeDeps.NetworkPlugins, kubeCfg.NetworkPluginName, &criNetworkHost{&networkHost{klet}}, klet.hairpinMode, klet.nonMasqueradeCIDR, int(kubeCfg.NetworkPluginMTU)); err != nil {
|
||||
return nil, err
|
||||
} else {
|
||||
klet.networkPlugin = plug
|
||||
|
@ -482,6 +482,26 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: These need to become arguments to a standalone docker shim.
|
||||
binDir := kubeCfg.CNIBinDir
|
||||
if binDir == "" {
|
||||
binDir = kubeCfg.NetworkPluginDir
|
||||
}
|
||||
pluginSettings := dockershim.NetworkPluginSettings{
|
||||
HairpinMode: klet.hairpinMode,
|
||||
NonMasqueradeCIDR: klet.nonMasqueradeCIDR,
|
||||
PluginName: kubeCfg.NetworkPluginName,
|
||||
PluginConfDir: kubeCfg.CNIConfDir,
|
||||
PluginBinDir: binDir,
|
||||
MTU: int(kubeCfg.NetworkPluginMTU),
|
||||
}
|
||||
|
||||
// Remote runtime shim just cannot talk back to kubelet, so it doesn't
|
||||
// support bandwidth shaping or hostports till #35457. To enable legacy
|
||||
// features, replace with networkHost.
|
||||
var nl *noOpLegacyHost
|
||||
pluginSettings.LegacyRuntimeHost = nl
|
||||
|
||||
// Initialize the runtime.
|
||||
switch kubeCfg.ContainerRuntime {
|
||||
case "docker":
|
||||
|
@ -489,10 +509,7 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
case "cri":
|
||||
// Use the new CRI shim for docker. This is needed for testing the
|
||||
// docker integration through CRI, and may be removed in the future.
|
||||
dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
dockerService, err := dockershim.NewDockerService(klet.dockerClient, kubeCfg.SeccompProfileRoot, kubeCfg.PodInfraContainerImage, nil, &pluginSettings)
|
||||
runtimeService := dockerService.(internalApi.RuntimeService)
|
||||
imageService := dockerService.(internalApi.ImageManagerService)
|
||||
|
||||
|
@ -520,6 +537,13 @@ func NewMainKubelet(kubeCfg *componentconfig.KubeletConfiguration, kubeDeps *Kub
|
|||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
// kubelet defers to the runtime shim to setup networking. Setting
|
||||
// this to nil will prevent it from trying to invoke the plugin.
|
||||
// It's easier to always probe and initialize plugins till cri
|
||||
// becomes the default.
|
||||
klet.networkPlugin = nil
|
||||
|
||||
klet.containerRuntime, err = kuberuntime.NewKubeGenericRuntimeManager(
|
||||
kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
|
||||
klet.livenessManager,
|
||||
|
@ -1202,6 +1226,13 @@ func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
|
|||
kl.syncLoop(updates, kl)
|
||||
}
|
||||
|
||||
// GetKubeClient returns the Kubernetes client.
|
||||
// TODO: This is currently only required by network plugins. Replace
|
||||
// with more specific methods.
|
||||
func (kl *Kubelet) GetKubeClient() clientset.Interface {
|
||||
return kl.kubeClient
|
||||
}
|
||||
|
||||
// GetClusterDNS returns a list of the DNS servers and a list of the DNS search
|
||||
// domains of the cluster.
|
||||
func (kl *Kubelet) GetClusterDNS(pod *api.Pod) ([]string, []string, error) {
|
||||
|
|
|
@ -191,7 +191,20 @@ func (kl *Kubelet) cleanupBandwidthLimits(allPods []*api.Pod) error {
|
|||
|
||||
// syncNetworkStatus updates the network state
|
||||
func (kl *Kubelet) syncNetworkStatus() {
|
||||
kl.runtimeState.setNetworkState(kl.networkPlugin.Status())
|
||||
// TODO(#35701): cri shim handles network plugin but we currently
|
||||
// don't have a cri status hook, so network plugin status isn't
|
||||
// reported if --experimental-runtime-integration=cri. This isn't
|
||||
// too bad, because kubenet is the only network plugin that
|
||||
// implements status(), and it just checks for plugin binaries
|
||||
// on the filesystem.
|
||||
if kl.networkPlugin != nil {
|
||||
kl.runtimeState.setNetworkState(kl.networkPlugin.Status())
|
||||
} else if kl.runtimeState.podCIDR() != "" {
|
||||
// Don't mark the node ready till we've successfully executed
|
||||
// the first UpdatePodCIDR call through cri. See comment above
|
||||
// setPodCIDR call.
|
||||
kl.runtimeState.setNetworkState(nil)
|
||||
}
|
||||
}
|
||||
|
||||
// updatePodCIDR updates the pod CIDR in the runtime state if it is different
|
||||
|
@ -203,14 +216,26 @@ func (kl *Kubelet) updatePodCIDR(cidr string) {
|
|||
return
|
||||
}
|
||||
|
||||
glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr)
|
||||
kl.runtimeState.setPodCIDR(cidr)
|
||||
|
||||
// kubelet -> network plugin
|
||||
// cri runtime shims are responsible for their own network plugins
|
||||
if kl.networkPlugin != nil {
|
||||
details := make(map[string]interface{})
|
||||
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = cidr
|
||||
kl.networkPlugin.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
||||
}
|
||||
|
||||
// kubelet -> generic runtime -> runtime shim -> network plugin
|
||||
// docker/rkt non-cri implementations have a passthrough UpdatePodCIDR
|
||||
if err := kl.GetRuntime().UpdatePodCIDR(cidr); err != nil {
|
||||
glog.Errorf("Failed to update pod CIDR: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// We need to be careful about setting podCIDR. Till #35839 lands we're
|
||||
// using it to indicate network plugin status for cri shims. See comment
|
||||
// in syncNetworkStatus.
|
||||
glog.Infof("Setting Pod CIDR: %v -> %v", podCIDR, cidr)
|
||||
kl.runtimeState.setPodCIDR(cidr)
|
||||
}
|
||||
|
||||
// shapingEnabled returns whether traffic shaping is enabled.
|
||||
|
@ -219,6 +244,16 @@ func (kl *Kubelet) shapingEnabled() bool {
|
|||
if kl.networkPlugin != nil && kl.networkPlugin.Capabilities().Has(network.NET_PLUGIN_CAPABILITY_SHAPING) {
|
||||
return false
|
||||
}
|
||||
// This is not strictly true but we need to figure out how to handle
|
||||
// bandwidth shaping anyway. If the kubelet doesn't have a networkPlugin,
|
||||
// it could mean:
|
||||
// a. the kubelet is responsible for bandwidth shaping
|
||||
// b. the kubelet is using cri, and the cri has a network plugin
|
||||
// Today, the only plugin that understands bandwidth shaping is kubenet, and
|
||||
// it doesn't support bandwidth shaping when invoked through cri, so it
|
||||
// effectively boils down to letting the kubelet decide how to handle
|
||||
// shaping annotations. The combination of (cri + network plugin that
|
||||
// handles bandwidth shaping) may not work because of this.
|
||||
return true
|
||||
}
|
||||
|
||||
|
|
|
@ -641,41 +641,16 @@ func (m *kubeGenericRuntimeManager) SyncPod(pod *api.Pod, _ api.PodStatus, podSt
|
|||
return
|
||||
}
|
||||
|
||||
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, podSandboxID)
|
||||
result.AddSyncResult(setupNetworkResult)
|
||||
if !kubecontainer.IsHostNetworkPod(pod) {
|
||||
glog.V(3).Infof("Calling network plugin %s to setup pod for %s", m.networkPlugin.Name(), format.Pod(pod))
|
||||
// Setup pod network plugin with sandbox id
|
||||
// TODO: rename the last param to sandboxID
|
||||
err = m.networkPlugin.SetUpPod(pod.Namespace, pod.Name, kubecontainer.ContainerID{
|
||||
Type: m.runtimeName,
|
||||
ID: podSandboxID,
|
||||
})
|
||||
if err != nil {
|
||||
message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v", format.Pod(pod), m.networkPlugin.Name(), err)
|
||||
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
|
||||
glog.Error(message)
|
||||
|
||||
killPodSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, format.Pod(pod))
|
||||
result.AddSyncResult(killPodSandboxResult)
|
||||
if err := m.runtimeService.StopPodSandbox(podSandboxID); err != nil {
|
||||
killPodSandboxResult.Fail(kubecontainer.ErrKillPodSandbox, err.Error())
|
||||
glog.Errorf("Kill sandbox %q failed for pod %q: %v", podSandboxID, format.Pod(pod), err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
|
||||
result.Fail(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Overwrite the podIP passed in the pod status, since we just started the infra container.
|
||||
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
|
||||
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
|
||||
podSandboxStatus, err := m.runtimeService.PodSandboxStatus(podSandboxID)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get pod sandbox status: %v; Skipping pod %q", err, format.Pod(pod))
|
||||
result.Fail(err)
|
||||
return
|
||||
}
|
||||
|
||||
// Overwrite the podIP passed in the pod status, since we just started the pod sandbox.
|
||||
podIP = m.determinePodSandboxIP(pod.Namespace, pod.Name, podSandboxStatus)
|
||||
glog.V(4).Infof("Determined the ip %q for pod %q after sandbox changed", podIP, format.Pod(pod))
|
||||
}
|
||||
|
||||
// Get podSandboxConfig for containers to start.
|
||||
|
@ -815,33 +790,6 @@ func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *api.Pod, runningP
|
|||
result.AddSyncResult(containerResult)
|
||||
}
|
||||
|
||||
// Teardown network plugin
|
||||
if len(runningPod.Sandboxes) == 0 {
|
||||
glog.V(4).Infof("Can not find pod sandbox by UID %q, assuming already removed.", runningPod.ID)
|
||||
return
|
||||
}
|
||||
|
||||
sandboxID := runningPod.Sandboxes[0].ID.ID
|
||||
isHostNetwork, err := m.isHostNetwork(sandboxID, pod)
|
||||
if err != nil {
|
||||
result.Fail(err)
|
||||
return
|
||||
}
|
||||
if !isHostNetwork {
|
||||
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, runningPod.ID)
|
||||
result.AddSyncResult(teardownNetworkResult)
|
||||
// Tear down network plugin with sandbox id
|
||||
if err := m.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubecontainer.ContainerID{
|
||||
Type: m.runtimeName,
|
||||
ID: sandboxID,
|
||||
}); err != nil {
|
||||
message := fmt.Sprintf("Failed to teardown network for pod %s_%s(%s) using network plugins %q: %v",
|
||||
runningPod.Name, runningPod.Namespace, runningPod.ID, m.networkPlugin.Name(), err)
|
||||
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message)
|
||||
glog.Error(message)
|
||||
}
|
||||
}
|
||||
|
||||
// stop sandbox, the sandbox will be removed in GarbageCollect
|
||||
killSandboxResult := kubecontainer.NewSyncResult(kubecontainer.KillPodSandbox, runningPod.ID)
|
||||
result.AddSyncResult(killSandboxResult)
|
||||
|
@ -1006,3 +954,17 @@ func (m *kubeGenericRuntimeManager) PortForward(pod *kubecontainer.Pod, port uin
|
|||
|
||||
return fmt.Errorf("not implemented")
|
||||
}
|
||||
|
||||
// UpdatePodCIDR is just a passthrough method to update the runtimeConfig of the shim
|
||||
// with the podCIDR supplied by the kubelet.
|
||||
func (m *kubeGenericRuntimeManager) UpdatePodCIDR(podCIDR string) error {
|
||||
// TODO(#35531): do we really want to write a method on this manager for each
|
||||
// field of the config?
|
||||
glog.Infof("updating runtime config through cri with podcidr %v", podCIDR)
|
||||
return m.runtimeService.UpdateRuntimeConfig(
|
||||
&runtimeApi.RuntimeConfig{
|
||||
NetworkConfig: &runtimeApi.NetworkConfig{
|
||||
PodCidr: &podCIDR,
|
||||
},
|
||||
})
|
||||
}
|
||||
|
|
|
@ -18,13 +18,13 @@ package kuberuntime
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"sort"
|
||||
|
||||
"github.com/golang/glog"
|
||||
"k8s.io/kubernetes/pkg/api"
|
||||
runtimeApi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
|
||||
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
|
||||
"k8s.io/kubernetes/pkg/kubelet/network"
|
||||
"k8s.io/kubernetes/pkg/kubelet/types"
|
||||
"k8s.io/kubernetes/pkg/kubelet/util/format"
|
||||
)
|
||||
|
@ -180,27 +180,16 @@ func (m *kubeGenericRuntimeManager) getKubeletSandboxes(all bool) ([]*runtimeApi
|
|||
}
|
||||
|
||||
// determinePodSandboxIP determines the IP address of the given pod sandbox.
|
||||
// TODO: remove determinePodSandboxIP after networking is delegated to the container runtime.
|
||||
func (m *kubeGenericRuntimeManager) determinePodSandboxIP(podNamespace, podName string, podSandbox *runtimeApi.PodSandboxStatus) string {
|
||||
ip := ""
|
||||
|
||||
if podSandbox.Network != nil {
|
||||
ip = podSandbox.Network.GetIp()
|
||||
if podSandbox.Network == nil {
|
||||
glog.Warningf("Pod Sandbox status doesn't have network information, cannot report IP")
|
||||
return ""
|
||||
}
|
||||
|
||||
if m.networkPlugin.Name() != network.DefaultPluginName {
|
||||
// TODO: podInfraContainerID in GetPodNetworkStatus() interface should be renamed to sandboxID
|
||||
netStatus, err := m.networkPlugin.GetPodNetworkStatus(podNamespace, podName, kubecontainer.ContainerID{
|
||||
Type: m.runtimeName,
|
||||
ID: podSandbox.GetId(),
|
||||
})
|
||||
if err != nil {
|
||||
glog.Errorf("NetworkPlugin %s failed on the status hook for pod '%s' - %v", m.networkPlugin.Name(), kubecontainer.BuildPodFullName(podName, podNamespace), err)
|
||||
} else if netStatus != nil {
|
||||
ip = netStatus.IP.String()
|
||||
}
|
||||
ip := podSandbox.Network.GetIp()
|
||||
if net.ParseIP(ip) == nil {
|
||||
glog.Warningf("Pod Sandbox reported an unparseable IP %v", ip)
|
||||
return ""
|
||||
}
|
||||
|
||||
return ip
|
||||
}
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ func (plugin *cniNetworkPlugin) SetUpPod(namespace string, name string, id kubec
|
|||
if err := plugin.checkInitialized(); err != nil {
|
||||
return err
|
||||
}
|
||||
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
|
@ -217,7 +217,7 @@ func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id ku
|
|||
if err := plugin.checkInitialized(); err != nil {
|
||||
return err
|
||||
}
|
||||
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
|
@ -228,7 +228,7 @@ func (plugin *cniNetworkPlugin) TearDownPod(namespace string, name string, id ku
|
|||
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.
|
||||
// Also fix the runtime's call to Status function to be done only in the case that the IP is lost, no need to do periodic calls
|
||||
func (plugin *cniNetworkPlugin) GetPodNetworkStatus(namespace string, name string, id kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
|
||||
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("CNI failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
|
|
|
@ -138,6 +138,14 @@ func (fnh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
|
|||
return fnh.runtime
|
||||
}
|
||||
|
||||
func (fnh *fakeNetworkHost) GetNetNS(containerID string) (string, error) {
|
||||
return fnh.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "test", ID: containerID})
|
||||
}
|
||||
|
||||
func (fnh *fakeNetworkHost) SupportsLegacyFeatures() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func TestCNIPlugin(t *testing.T) {
|
||||
// install some random plugin
|
||||
pluginName := fmt.Sprintf("test%d", rand.Intn(1000))
|
||||
|
|
|
@ -334,6 +334,9 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
|
|||
return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING)
|
||||
}
|
||||
|
||||
// setup sets up networking through CNI using the given ns/name and sandbox ID.
|
||||
// TODO: Don't pass the pod to this method, it only needs it for bandwidth
|
||||
// shaping and hostport management.
|
||||
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error {
|
||||
// Bring up container loopback interface
|
||||
if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
|
||||
|
@ -384,6 +387,14 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
|
|||
plugin.syncEbtablesDedupRules(macAddr)
|
||||
}
|
||||
|
||||
plugin.podIPs[id] = ip4.String()
|
||||
|
||||
// The host can choose to not support "legacy" features. The remote
|
||||
// shim doesn't support it (#35457), but the kubelet does.
|
||||
if !plugin.host.SupportsLegacyFeatures() {
|
||||
return nil
|
||||
}
|
||||
|
||||
// The first SetUpPod call creates the bridge; get a shaper for the sake of
|
||||
// initialization
|
||||
shaper := plugin.shaper()
|
||||
|
@ -398,8 +409,6 @@ func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kube
|
|||
}
|
||||
}
|
||||
|
||||
plugin.podIPs[id] = ip4.String()
|
||||
|
||||
// Open any hostports the pod's containers want
|
||||
activePods, err := plugin.getActivePods()
|
||||
if err != nil {
|
||||
|
@ -423,6 +432,7 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
|
|||
glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
|
||||
}()
|
||||
|
||||
// TODO: Entire pod object only required for bw shaping and hostport.
|
||||
pod, ok := plugin.host.GetPodByName(namespace, name)
|
||||
if !ok {
|
||||
return fmt.Errorf("pod %q cannot be found", name)
|
||||
|
@ -440,15 +450,20 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
|
|||
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
|
||||
}
|
||||
|
||||
// TODO: Remove this hack once we've figured out how to retrieve the netns
|
||||
// of an exited container. Currently, restarting docker will leak a bunch of
|
||||
// ips. This will exhaust available ip space unless we cleanup old ips. At the
|
||||
// same time we don't want to try GC'ing them periodically as that could lead
|
||||
// to a performance regression in starting pods. So on each setup failure, try
|
||||
// GC on the assumption that the kubelet is going to retry pod creation, and
|
||||
// when it does, there will be ips.
|
||||
plugin.ipamGarbageCollection()
|
||||
// TODO(#34278): Figure out if we need IP GC through the cri.
|
||||
// The cri should always send us teardown events for stale sandboxes,
|
||||
// this obviates the need for GC in the common case, for kubenet.
|
||||
if plugin.host.SupportsLegacyFeatures() {
|
||||
|
||||
// TODO: Remove this hack once we've figured out how to retrieve the netns
|
||||
// of an exited container. Currently, restarting docker will leak a bunch of
|
||||
// ips. This will exhaust available ip space unless we cleanup old ips. At the
|
||||
// same time we don't want to try GC'ing them periodically as that could lead
|
||||
// to a performance regression in starting pods. So on each setup failure, try
|
||||
// GC on the assumption that the kubelet is going to retry pod creation, and
|
||||
// when it does, there will be ips.
|
||||
plugin.ipamGarbageCollection()
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -485,6 +500,12 @@ func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id k
|
|||
}
|
||||
}
|
||||
|
||||
// The host can choose to not support "legacy" features. The remote
|
||||
// shim doesn't support it (#35457), but the kubelet does.
|
||||
if !plugin.host.SupportsLegacyFeatures() {
|
||||
return utilerrors.NewAggregate(errList)
|
||||
}
|
||||
|
||||
activePods, err := plugin.getActivePods()
|
||||
if err == nil {
|
||||
err = plugin.hostportHandler.SyncHostports(BridgeName, activePods)
|
||||
|
@ -533,7 +554,7 @@ func (plugin *kubenetNetworkPlugin) GetPodNetworkStatus(namespace string, name s
|
|||
return &network.PodNetworkStatus{IP: net.ParseIP(podIP)}, nil
|
||||
}
|
||||
|
||||
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
|
@ -722,7 +743,7 @@ func podIsExited(p *kubecontainer.Pod) bool {
|
|||
}
|
||||
|
||||
func (plugin *kubenetNetworkPlugin) buildCNIRuntimeConf(ifName string, id kubecontainer.ContainerID) (*libcni.RuntimeConf, error) {
|
||||
netnsPath, err := plugin.host.GetRuntime().GetNetNS(id)
|
||||
netnsPath, err := plugin.host.GetNetNS(id.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("Kubenet failed to retrieve network namespace path: %v", err)
|
||||
}
|
||||
|
|
|
@ -229,4 +229,40 @@ func TestGenerateMacAddress(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestInvocationWithoutRuntime invokes the plugin without a runtime.
|
||||
// This is how kubenet is invoked from the cri.
|
||||
func TestTearDownWithoutRuntime(t *testing.T) {
|
||||
fhost := nettest.NewFakeHost(nil)
|
||||
fhost.Legacy = false
|
||||
fhost.Runtime = nil
|
||||
mockcni := &mock_cni.MockCNI{}
|
||||
|
||||
fexec := &exec.FakeExec{
|
||||
CommandScript: []exec.FakeCommandAction{},
|
||||
LookPathFunc: func(file string) (string, error) {
|
||||
return fmt.Sprintf("/fake-bin/%s", file), nil
|
||||
},
|
||||
}
|
||||
|
||||
kubenet := newFakeKubenetPlugin(map[kubecontainer.ContainerID]string{}, fexec, fhost)
|
||||
kubenet.cniConfig = mockcni
|
||||
kubenet.iptables = ipttest.NewFake()
|
||||
|
||||
details := make(map[string]interface{})
|
||||
details[network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE_DETAIL_CIDR] = "10.0.0.1/24"
|
||||
kubenet.Event(network.NET_PLUGIN_EVENT_POD_CIDR_CHANGE, details)
|
||||
|
||||
existingContainerID := kubecontainer.BuildContainerID("docker", "123")
|
||||
kubenet.podIPs[existingContainerID] = "10.0.0.1"
|
||||
|
||||
mockcni.On("DelNetwork", mock.AnythingOfType("*libcni.NetworkConfig"), mock.AnythingOfType("*libcni.RuntimeConf")).Return(nil)
|
||||
|
||||
if err := kubenet.TearDownPod("namespace", "name", existingContainerID); err != nil {
|
||||
t.Fatalf("Unexpected error in TearDownPod: %v", err)
|
||||
}
|
||||
// Assert that the CNI DelNetwork made it through and we didn't crash
|
||||
// without a runtime.
|
||||
mockcni.AssertExpectations(t)
|
||||
}
|
||||
|
||||
//TODO: add unit test for each implementation of network plugin interface
|
||||
|
|
|
@ -55,6 +55,10 @@ func (_m *MockNetworkPlugin) Capabilities() sets.Int {
|
|||
return ret0
|
||||
}
|
||||
|
||||
func (_m *MockNetworkPlugin) Finish() {
|
||||
_m.ctrl.Finish()
|
||||
}
|
||||
|
||||
func (_mr *_MockNetworkPluginRecorder) Capabilities() *gomock.Call {
|
||||
return _mr.mock.ctrl.RecordCall(_mr.mock, "Capabilities")
|
||||
}
|
||||
|
|
|
@ -95,16 +95,53 @@ type PodNetworkStatus struct {
|
|||
IP net.IP `json:"ip" description:"Primary IP address of the pod"`
|
||||
}
|
||||
|
||||
// Host is an interface that plugins can use to access the kubelet.
|
||||
type Host interface {
|
||||
// LegacyHost implements the methods required by network plugins that
|
||||
// were directly invoked by the kubelet. Implementations of this interface
|
||||
// that do not wish to support these features can simply return false
|
||||
// to SupportsLegacyFeatures.
|
||||
type LegacyHost interface {
|
||||
// Get the pod structure by its name, namespace
|
||||
// Only used for hostport management and bw shaping
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
|
||||
// GetKubeClient returns a client interface
|
||||
// Only used in testing
|
||||
GetKubeClient() clientset.Interface
|
||||
|
||||
// GetContainerRuntime returns the container runtime that implements the containers (e.g. docker/rkt)
|
||||
// Only used for hostport management
|
||||
GetRuntime() kubecontainer.Runtime
|
||||
|
||||
// SupportsLegacyFeaturs returns true if this host can support hostports
|
||||
// and bandwidth shaping. Both will either get added to CNI or dropped,
|
||||
// so differnt implementations can choose to ignore them.
|
||||
SupportsLegacyFeatures() bool
|
||||
}
|
||||
|
||||
// Host is an interface that plugins can use to access the kubelet.
|
||||
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of
|
||||
// the back channel is restricted to host-ports/testing, and restricted
|
||||
// to kubenet. No other network plugin wrapper needs it. Other plugins
|
||||
// only require a way to access namespace information, which they can do
|
||||
// directly through the embedded NamespaceGetter.
|
||||
type Host interface {
|
||||
// NamespaceGetter is a getter for sandbox namespace information.
|
||||
// It's the only part of this interface that isn't currently deprecated.
|
||||
NamespaceGetter
|
||||
|
||||
// LegacyHost contains methods that trap back into the Kubelet. Dependence
|
||||
// *do not* add more dependencies in this interface. In a post-cri world,
|
||||
// network plugins will be invoked by the runtime shim, and should only
|
||||
// require NamespaceGetter.
|
||||
LegacyHost
|
||||
}
|
||||
|
||||
// NamespaceGetter is an interface to retrieve namespace information for a given
|
||||
// sandboxID. Typically implemented by runtime shims that are closely coupled to
|
||||
// CNI plugin wrappers like kubenet.
|
||||
type NamespaceGetter interface {
|
||||
// GetNetNS returns network namespace information for the given containerID.
|
||||
GetNetNS(containerID string) (string, error)
|
||||
}
|
||||
|
||||
// InitNetworkPlugin inits the plugin that matches networkPluginName. Plugins must have unique names.
|
||||
|
|
|
@ -27,11 +27,14 @@ import (
|
|||
)
|
||||
|
||||
type fakeNetworkHost struct {
|
||||
fakeNamespaceGetter
|
||||
kubeClient clientset.Interface
|
||||
Legacy bool
|
||||
Runtime *containertest.FakeRuntime
|
||||
}
|
||||
|
||||
func NewFakeHost(kubeClient clientset.Interface) *fakeNetworkHost {
|
||||
host := &fakeNetworkHost{kubeClient: kubeClient}
|
||||
host := &fakeNetworkHost{kubeClient: kubeClient, Legacy: true, Runtime: &containertest.FakeRuntime{}}
|
||||
return host
|
||||
}
|
||||
|
||||
|
@ -44,5 +47,17 @@ func (fnh *fakeNetworkHost) GetKubeClient() clientset.Interface {
|
|||
}
|
||||
|
||||
func (nh *fakeNetworkHost) GetRuntime() kubecontainer.Runtime {
|
||||
return &containertest.FakeRuntime{}
|
||||
return nh.Runtime
|
||||
}
|
||||
|
||||
func (nh *fakeNetworkHost) SupportsLegacyFeatures() bool {
|
||||
return nh.Legacy
|
||||
}
|
||||
|
||||
type fakeNamespaceGetter struct {
|
||||
ns string
|
||||
}
|
||||
|
||||
func (nh *fakeNamespaceGetter) GetNetNS(containerID string) (string, error) {
|
||||
return nh.ns, nil
|
||||
}
|
||||
|
|
|
@ -24,6 +24,11 @@ import (
|
|||
|
||||
// This just exports required functions from kubelet proper, for use by network
|
||||
// plugins.
|
||||
// TODO(#35457): get rid of this backchannel to the kubelet. The scope of
|
||||
// the back channel is restricted to host-ports/testing, and restricted
|
||||
// to kubenet. No other network plugin wrapper needs it. Other plugins
|
||||
// only require a way to access namespace information, which they can do
|
||||
// directly through the methods implemented by criNetworkHost.
|
||||
type networkHost struct {
|
||||
kubelet *Kubelet
|
||||
}
|
||||
|
@ -39,3 +44,45 @@ func (nh *networkHost) GetKubeClient() clientset.Interface {
|
|||
func (nh *networkHost) GetRuntime() kubecontainer.Runtime {
|
||||
return nh.kubelet.GetRuntime()
|
||||
}
|
||||
|
||||
func (nh *networkHost) SupportsLegacyFeatures() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
// criNetworkHost implements the part of network.Host required by the
|
||||
// cri (NamespaceGetter). It leechs off networkHost for all other
|
||||
// methods, because networkHost is slated for deletion.
|
||||
type criNetworkHost struct {
|
||||
*networkHost
|
||||
}
|
||||
|
||||
// GetNetNS returns the network namespace of the given containerID.
|
||||
// This method satisfies the network.NamespaceGetter interface for
|
||||
// networkHost. It's only meant to be used from network plugins
|
||||
// that are directly invoked by the kubelet (aka: legacy, pre-cri).
|
||||
// Any network plugin invoked by a cri must implement NamespaceGetter
|
||||
// to talk directly to the runtime instead.
|
||||
func (c *criNetworkHost) GetNetNS(containerID string) (string, error) {
|
||||
return c.kubelet.GetRuntime().GetNetNS(kubecontainer.ContainerID{Type: "", ID: containerID})
|
||||
}
|
||||
|
||||
// noOpLegacyHost implements the network.LegacyHost interface for the remote
|
||||
// runtime shim by just returning empties. It doesn't support legacy features
|
||||
// like host port and bandwidth shaping.
|
||||
type noOpLegacyHost struct{}
|
||||
|
||||
func (n *noOpLegacyHost) GetPodByName(namespace, name string) (*api.Pod, bool) {
|
||||
return nil, true
|
||||
}
|
||||
|
||||
func (n *noOpLegacyHost) GetKubeClient() clientset.Interface {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (n *noOpLegacyHost) GetRuntime() kubecontainer.Runtime {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (nh *noOpLegacyHost) SupportsLegacyFeatures() bool {
|
||||
return false
|
||||
}
|
||||
|
|
|
@ -36,7 +36,7 @@ type RemoteRuntimeService struct {
|
|||
|
||||
// NewRemoteRuntimeService creates a new internalApi.RuntimeService.
|
||||
func NewRemoteRuntimeService(addr string, connectionTimout time.Duration) (internalApi.RuntimeService, error) {
|
||||
glog.V(3).Infof("Connecting to runtime service %s", addr)
|
||||
glog.Infof("Connecting to runtime service %s", addr)
|
||||
conn, err := grpc.Dial(addr, grpc.WithInsecure(), grpc.WithTimeout(connectionTimout), grpc.WithDialer(dial))
|
||||
if err != nil {
|
||||
glog.Errorf("Connect remote runtime %s failed: %v", addr, err)
|
||||
|
@ -319,6 +319,23 @@ func (r *RemoteRuntimeService) PortForward(req *runtimeApi.PortForwardRequest) (
|
|||
return resp, nil
|
||||
}
|
||||
|
||||
// UpdateRuntimeConfig updates the config of a runtime service. The only
|
||||
// update payload currently supported is the pod CIDR assigned to a node,
|
||||
// and the runtime service just proxies it down to the network plugin.
|
||||
func (r *RemoteRuntimeService) UpdateRuntimeConfig(runtimeConfig *runtimeApi.RuntimeConfig) error {
|
||||
ctx, cancel := getContextWithTimeout(r.timeout)
|
||||
defer cancel()
|
||||
|
||||
// Response doesn't contain anything of interest. This translates to an
|
||||
// Event notification to the network plugin, which can't fail, so we're
|
||||
// really looking to surface destination unreachable.
|
||||
_, err := r.runtimeClient.UpdateRuntimeConfig(ctx, &runtimeApi.UpdateRuntimeConfigRequest{
|
||||
RuntimeConfig: runtimeConfig,
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -2179,6 +2179,12 @@ func (r *Runtime) PortForward(pod *kubecontainer.Pod, port uint16, stream io.Rea
|
|||
return command.Run()
|
||||
}
|
||||
|
||||
// UpdatePodCIDR updates the runtimeconfig with the podCIDR.
|
||||
// Currently no-ops, just implemented to satisfy the cri.
|
||||
func (r *Runtime) UpdatePodCIDR(podCIDR string) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// appStateToContainerState converts rktapi.AppState to kubecontainer.ContainerState.
|
||||
func appStateToContainerState(state rktapi.AppState) kubecontainer.ContainerState {
|
||||
switch state {
|
||||
|
|
Loading…
Reference in New Issue