kubenet: clean up networking when setup errors occur

Relying on the runtime to later call cleanup is fragile, so make sure
that everything gets nicely cleaned up when setup errors occur.
pull/6/head
Dan Williams 2016-06-13 17:19:04 -05:00
parent 965492fdd0
commit a519e8a403
1 changed files with 83 additions and 49 deletions

View File

@ -31,6 +31,7 @@ import (
"github.com/golang/glog"
"github.com/vishvananda/netlink"
"github.com/vishvananda/netlink/nl"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/apis/componentconfig"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockertools"
@ -290,29 +291,7 @@ func (plugin *kubenetNetworkPlugin) Capabilities() utilsets.Int {
return utilsets.NewInt(network.NET_PLUGIN_CAPABILITY_SHAPING)
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
plugin.mu.Lock()
defer plugin.mu.Unlock()
start := time.Now()
defer func() {
glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
}()
pod, ok := plugin.host.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if err := plugin.Status(); err != nil {
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
}
func (plugin *kubenetNetworkPlugin) setup(namespace string, name string, id kubecontainer.ContainerID, pod *api.Pod) error {
// Bring up container loopback interface
if _, err := plugin.addContainerToNetwork(plugin.loConfig, "lo", namespace, name, id); err != nil {
return err
@ -348,6 +327,10 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// initialization
shaper := plugin.shaper()
ingress, egress, err := bandwidth.ExtractPodBandwidthResources(pod.Annotations)
if err != nil {
return fmt.Errorf("Error reading pod bandwidth annotations: %v", err)
}
if egress != nil || ingress != nil {
if err := shaper.ReconcileCIDR(fmt.Sprintf("%s/32", ip4.String()), egress, ingress); err != nil {
return fmt.Errorf("Failed to add pod to shaper: %v", err)
@ -358,16 +341,86 @@ func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id k
// Open any hostports the pod's containers want
runningPods, err := plugin.getRunningPods()
if err == nil {
err = plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods)
if err != nil {
return err
}
if err := plugin.hostportHandler.OpenPodHostportsAndSync(pod, BridgeName, runningPods); err != nil {
return err
}
return nil
}
func (plugin *kubenetNetworkPlugin) SetUpPod(namespace string, name string, id kubecontainer.ContainerID) error {
plugin.mu.Lock()
defer plugin.mu.Unlock()
start := time.Now()
defer func() {
glog.V(4).Infof("SetUpPod took %v for %s/%s", time.Since(start), namespace, name)
}()
pod, ok := plugin.host.GetPodByName(namespace, name)
if !ok {
return fmt.Errorf("pod %q cannot be found", name)
}
if err := plugin.Status(); err != nil {
return fmt.Errorf("Kubenet cannot SetUpPod: %v", err)
}
if err := plugin.setup(namespace, name, id, pod); err != nil {
// Make sure everything gets cleaned up on errors
podIP, _ := plugin.podIPs[id]
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
// Not a hard error or warning
glog.V(4).Infof("Failed to clean up %s/%s after SetUpPod failure: %v", namespace, name, err)
}
return err
}
// Need to SNAT outbound traffic from cluster
if err = plugin.ensureMasqRule(); err != nil {
if err := plugin.ensureMasqRule(); err != nil {
glog.Errorf("Failed to ensure MASQ rule: %v", err)
}
return err
return nil
}
// Tears down as much of a pod's network as it can even if errors occur. Returns
// an aggregate error composed of all errors encountered during the teardown.
func (plugin *kubenetNetworkPlugin) teardown(namespace string, name string, id kubecontainer.ContainerID, podIP string) error {
errList := []error{}
if podIP != "" {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
delete(plugin.podIPs, id)
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if podIP != "" {
glog.Warningf("Failed to delete container from kubenet: %v", err)
} else {
errList = append(errList, err)
}
}
runningPods, err := plugin.getRunningPods()
if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods)
}
if err != nil {
errList = append(errList, err)
}
return utilerrors.NewAggregate(errList)
}
func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, id kubecontainer.ContainerID) error {
@ -384,36 +437,17 @@ func (plugin *kubenetNetworkPlugin) TearDownPod(namespace string, name string, i
}
// no cached IP is Ok during teardown
podIP, hasIP := plugin.podIPs[id]
if hasIP {
glog.V(5).Infof("Removing pod IP %s from shaper", podIP)
// shaper wants /32
if err := plugin.shaper().Reset(fmt.Sprintf("%s/32", podIP)); err != nil {
// Possible bandwidth shaping wasn't enabled for this pod anyways
glog.V(4).Infof("Failed to remove pod IP %s from shaper: %v", podIP, err)
}
}
if err := plugin.delContainerFromNetwork(plugin.netConfig, network.DefaultInterfaceName, namespace, name, id); err != nil {
// This is to prevent returning error when TearDownPod is called twice on the same pod. This helps to reduce event pollution.
if !hasIP {
glog.Warningf("Failed to delete container from kubenet: %v", err)
return nil
}
podIP, _ := plugin.podIPs[id]
if err := plugin.teardown(namespace, name, id, podIP); err != nil {
return err
}
delete(plugin.podIPs, id)
runningPods, err := plugin.getRunningPods()
if err == nil {
err = plugin.hostportHandler.SyncHostports(BridgeName, runningPods)
}
// Need to SNAT outbound traffic from cluster
if err := plugin.ensureMasqRule(); err != nil {
glog.Errorf("Failed to ensure MASQ rule: %v", err)
}
return err
return nil
}
// TODO: Use the addToNetwork function to obtain the IP of the Pod. That will assume idempotent ADD call to the plugin.