kubelet: add network plugin manager with per-pod operation locking

The PluginManager almost duplicates the network plugin interface, but
not quite since the Init() function should be called by whatever
actually finds and creates the network plugin instance.  Only then
does it get passed off to the PluginManager.

The Manager synchronizes pod-specific network operations like setup,
teardown, and pod network status.  It passes through all other
operations so that runtimes don't have to cache the network plugin
directly, but can use the PluginManager as a wrapper.
pull/6/head
Dan Williams 2016-12-12 18:44:03 -06:00
parent 2509ab0c7a
commit 5633d7423a
5 changed files with 354 additions and 51 deletions

View File

@ -5,7 +5,6 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -31,17 +30,6 @@ go_library(
],
)
go_test(
name = "go_default_test",
srcs = ["plugins_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/componentconfig:go_default_library",
"//pkg/kubelet/network/testing:go_default_library",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
@ -57,7 +45,6 @@ filegroup(
"//pkg/kubelet/network/hairpin:all-srcs",
"//pkg/kubelet/network/hostport:all-srcs",
"//pkg/kubelet/network/kubenet:all-srcs",
"//pkg/kubelet/network/mock_network:all-srcs",
"//pkg/kubelet/network/testing:all-srcs",
],
tags = ["automanaged"],

View File

@ -20,6 +20,7 @@ import (
"fmt"
"net"
"strings"
"sync"
"github.com/golang/glog"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -293,3 +294,123 @@ type NoopPortMappingGetter struct{}
func (*NoopPortMappingGetter) GetPodPortMappings(containerID string) ([]*hostport.PortMapping, error) {
return nil, nil
}
// The PluginManager wraps a kubelet network plugin and provides synchronization
// for a given pod's network operations. Each pod's setup/teardown/status operations
// are synchronized against each other, but network operations of other pods can
// proceed in parallel.
type PluginManager struct {
// Network plugin being wrapped
plugin NetworkPlugin
// Pod list and lock
podsLock sync.Mutex
pods map[string]*podLock
}
func NewPluginManager(plugin NetworkPlugin) *PluginManager {
return &PluginManager{
plugin: plugin,
pods: make(map[string]*podLock),
}
}
func (pm *PluginManager) PluginName() string {
return pm.plugin.Name()
}
func (pm *PluginManager) Event(name string, details map[string]interface{}) {
pm.plugin.Event(name, details)
}
func (pm *PluginManager) Status() error {
return pm.plugin.Status()
}
type podLock struct {
// Count of in-flight operations for this pod; when this reaches zero
// the lock can be removed from the pod map
refcount uint
// Lock to synchronize operations for this specific pod
mu sync.Mutex
}
// Lock network operations for a specific pod. If that pod is not yet in
// the pod map, it will be added. The reference count for the pod will
// be increased.
func (pm *PluginManager) podLock(fullPodName string) *sync.Mutex {
pm.podsLock.Lock()
defer pm.podsLock.Unlock()
lock, ok := pm.pods[fullPodName]
if !ok {
lock = &podLock{}
pm.pods[fullPodName] = lock
}
lock.refcount++
return &lock.mu
}
// Unlock network operations for a specific pod. The reference count for the
// pod will be decreased. If the reference count reaches zero, the pod will be
// removed from the pod map.
func (pm *PluginManager) podUnlock(fullPodName string) {
pm.podsLock.Lock()
defer pm.podsLock.Unlock()
lock, ok := pm.pods[fullPodName]
if !ok {
glog.Warningf("Unbalanced pod lock unref for %s", fullPodName)
return
} else if lock.refcount == 0 {
// This should never ever happen, but handle it anyway
delete(pm.pods, fullPodName)
glog.Warningf("Pod lock for %s still in map with zero refcount", fullPodName)
return
}
lock.refcount--
lock.mu.Unlock()
if lock.refcount == 0 {
delete(pm.pods, fullPodName)
}
}
func (pm *PluginManager) GetPodNetworkStatus(podNamespace, podName string, id kubecontainer.ContainerID) (*PodNetworkStatus, error) {
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
pm.podLock(fullPodName).Lock()
defer pm.podUnlock(fullPodName)
netStatus, err := pm.plugin.GetPodNetworkStatus(podNamespace, podName, id)
if err != nil {
return nil, fmt.Errorf("NetworkPlugin %s failed on the status hook for pod %q: %v", pm.plugin.Name(), fullPodName, err)
}
return netStatus, nil
}
func (pm *PluginManager) SetUpPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
pm.podLock(fullPodName).Lock()
defer pm.podUnlock(fullPodName)
glog.V(3).Infof("Calling network plugin %s to set up pod %q", pm.plugin.Name(), fullPodName)
if err := pm.plugin.SetUpPod(podNamespace, podName, id); err != nil {
return fmt.Errorf("NetworkPlugin %s failed to set up pod %q network: %v", pm.plugin.Name(), fullPodName, err)
}
return nil
}
func (pm *PluginManager) TearDownPod(podNamespace, podName string, id kubecontainer.ContainerID) error {
fullPodName := kubecontainer.BuildPodFullName(podName, podNamespace)
pm.podLock(fullPodName).Lock()
defer pm.podUnlock(fullPodName)
glog.V(3).Infof("Calling network plugin %s to tear down pod %q", pm.plugin.Name(), fullPodName)
if err := pm.plugin.TearDownPod(podNamespace, podName, id); err != nil {
return fmt.Errorf("NetworkPlugin %s failed to teardown pod %q network: %v", pm.plugin.Name(), fullPodName, err)
}
return nil
}

View File

@ -1,38 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package network
import (
"testing"
"k8s.io/kubernetes/pkg/apis/componentconfig"
nettest "k8s.io/kubernetes/pkg/kubelet/network/testing"
)
func TestSelectDefaultPlugin(t *testing.T) {
all_plugins := []NetworkPlugin{}
plug, err := InitNetworkPlugin(all_plugins, "", nettest.NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", UseDefaultMTU)
if err != nil {
t.Fatalf("Unexpected error in selecting default plugin: %v", err)
}
if plug == nil {
t.Fatalf("Failed to select the default plugin.")
}
if plug.Name() != DefaultPluginName {
t.Errorf("Failed to select the default plugin. Expected %s. Got %s", DefaultPluginName, plug.Name())
}
}

View File

@ -5,6 +5,7 @@ licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
"go_test",
)
go_library(
@ -20,6 +21,20 @@ go_library(
],
)
go_test(
name = "go_default_test",
srcs = ["plugins_test.go"],
library = ":go_default_library",
tags = ["automanaged"],
deps = [
"//pkg/apis/componentconfig:go_default_library",
"//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/network:go_default_library",
"//vendor:github.com/golang/mock/gomock",
"//vendor:k8s.io/apimachinery/pkg/util/sets",
],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),

View File

@ -0,0 +1,218 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package testing
import (
"fmt"
"net"
"sync"
"testing"
utilsets "k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/network"
"github.com/golang/mock/gomock"
)
func TestSelectDefaultPlugin(t *testing.T) {
all_plugins := []network.NetworkPlugin{}
plug, err := network.InitNetworkPlugin(all_plugins, "", NewFakeHost(nil), componentconfig.HairpinNone, "10.0.0.0/8", network.UseDefaultMTU)
if err != nil {
t.Fatalf("Unexpected error in selecting default plugin: %v", err)
}
if plug == nil {
t.Fatalf("Failed to select the default plugin.")
}
if plug.Name() != network.DefaultPluginName {
t.Errorf("Failed to select the default plugin. Expected %s. Got %s", network.DefaultPluginName, plug.Name())
}
}
func TestPluginManager(t *testing.T) {
ctrl := gomock.NewController(t)
fnp := NewMockNetworkPlugin(ctrl)
defer fnp.Finish()
pm := network.NewPluginManager(fnp)
fnp.EXPECT().Name().Return("someNetworkPlugin").AnyTimes()
allCreatedWg := sync.WaitGroup{}
allCreatedWg.Add(1)
allDoneWg := sync.WaitGroup{}
// 10 pods, 4 setup/status/teardown runs each. Ensure that network locking
// works and the pod map isn't concurrently accessed
for i := 0; i < 10; i++ {
podName := fmt.Sprintf("pod%d", i)
containerID := kubecontainer.ContainerID{ID: podName}
fnp.EXPECT().SetUpPod("", podName, containerID).Return(nil).Times(4)
fnp.EXPECT().GetPodNetworkStatus("", podName, containerID).Return(&network.PodNetworkStatus{IP: net.ParseIP("1.2.3.4")}, nil).Times(4)
fnp.EXPECT().TearDownPod("", podName, containerID).Return(nil).Times(4)
for x := 0; x < 4; x++ {
allDoneWg.Add(1)
go func(name string, id kubecontainer.ContainerID, num int) {
defer allDoneWg.Done()
// Block all goroutines from running until all have
// been created and are ready. This ensures we
// have more pod network operations running
// concurrently.
allCreatedWg.Wait()
if err := pm.SetUpPod("", name, id); err != nil {
t.Errorf("Failed to set up pod %q: %v", name, err)
return
}
if _, err := pm.GetPodNetworkStatus("", name, id); err != nil {
t.Errorf("Failed to inspect pod %q: %v", name, err)
return
}
if err := pm.TearDownPod("", name, id); err != nil {
t.Errorf("Failed to tear down pod %q: %v", name, err)
return
}
}(podName, containerID, x)
}
}
// Block all goroutines from running until all have been created and started
allCreatedWg.Done()
// Wait for them all to finish
allDoneWg.Wait()
}
type hookableFakeNetworkPluginSetupHook func(namespace, name string, id kubecontainer.ContainerID)
type hookableFakeNetworkPlugin struct {
setupHook hookableFakeNetworkPluginSetupHook
}
func newHookableFakeNetworkPlugin(setupHook hookableFakeNetworkPluginSetupHook) *hookableFakeNetworkPlugin {
return &hookableFakeNetworkPlugin{
setupHook: setupHook,
}
}
func (p *hookableFakeNetworkPlugin) Init(host network.Host, hairpinMode componentconfig.HairpinMode, nonMasqueradeCIDR string, mtu int) error {
return nil
}
func (p *hookableFakeNetworkPlugin) Event(name string, details map[string]interface{}) {
}
func (p *hookableFakeNetworkPlugin) Name() string {
return "fakeplugin"
}
func (p *hookableFakeNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt()
}
func (p *hookableFakeNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
if p.setupHook != nil {
p.setupHook(namespace, name, id)
}
return nil
}
func (p *hookableFakeNetworkPlugin) TearDownPod(string, string, kubecontainer.ContainerID) error {
return nil
}
func (p *hookableFakeNetworkPlugin) GetPodNetworkStatus(string, string, kubecontainer.ContainerID) (*network.PodNetworkStatus, error) {
return &network.PodNetworkStatus{IP: net.ParseIP("10.1.2.3")}, nil
}
func (p *hookableFakeNetworkPlugin) Status() error {
return nil
}
// Ensure that one pod's network operations don't block another's. If the
// test is successful (eg, first pod doesn't block on second) the test
// will complete. If unsuccessful, it will hang and get killed.
func TestMultiPodParallelNetworkOps(t *testing.T) {
podWg := sync.WaitGroup{}
podWg.Add(1)
// Can't do this with MockNetworkPlugin because the gomock controller
// has its own locks which don't allow the parallel network operation
// to proceed.
didWait := false
fakePlugin := newHookableFakeNetworkPlugin(func(podNamespace, podName string, id kubecontainer.ContainerID) {
if podName == "waiter" {
podWg.Wait()
didWait = true
}
})
pm := network.NewPluginManager(fakePlugin)
opsWg := sync.WaitGroup{}
// Start the pod that will wait for the other to complete
opsWg.Add(1)
go func() {
defer opsWg.Done()
podName := "waiter"
containerID := kubecontainer.ContainerID{ID: podName}
// Setup will block on the runner pod completing. If network
// operations locking isn't correct (eg pod network operations
// block other pods) setUpPod() will never return.
if err := pm.SetUpPod("", podName, containerID); err != nil {
t.Errorf("Failed to set up waiter pod: %v", err)
return
}
if err := pm.TearDownPod("", podName, containerID); err != nil {
t.Errorf("Failed to tear down waiter pod: %v", err)
return
}
}()
opsWg.Add(1)
go func() {
defer opsWg.Done()
// Let other pod proceed
defer podWg.Done()
podName := "runner"
containerID := kubecontainer.ContainerID{ID: podName}
if err := pm.SetUpPod("", podName, containerID); err != nil {
t.Errorf("Failed to set up runner pod: %v", err)
return
}
if err := pm.TearDownPod("", podName, containerID); err != nil {
t.Errorf("Failed to tear down runner pod: %v", err)
return
}
}()
opsWg.Wait()
if !didWait {
t.Errorf("waiter pod didn't wait for runner pod!")
}
}