Merge pull request #13628 from MikaelCluseau/wip-hairpin

Auto commit by PR queue bot
pull/6/head
k8s-merge-robot 2015-09-11 17:45:46 -07:00
commit 5ed8400642
10 changed files with 245 additions and 7 deletions

View File

@ -130,7 +130,7 @@ func filterHTTPError(err error, image string) error {
jerr.Code == http.StatusServiceUnavailable ||
jerr.Code == http.StatusGatewayTimeout) {
glog.V(2).Infof("Pulling image %q failed: %v", image, err)
return fmt.Errorf("image pull failed for %s because the registry is temporarily unavailbe.", image)
return fmt.Errorf("image pull failed for %s because the registry is temporarily unavailable.", image)
} else {
return err
}

View File

@ -255,7 +255,7 @@ func TestPullWithJSONError(t *testing.T) {
"Bad gateway": {
"ubuntu",
&jsonmessage.JSONError{Code: 502, Message: "<!doctype html>\n<html class=\"no-js\" lang=\"\">\n <head>\n </head>\n <body>\n <h1>Oops, there was an error!</h1>\n <p>We have been contacted of this error, feel free to check out <a href=\"http://status.docker.com/\">status.docker.com</a>\n to see if there is a bigger issue.</p>\n\n </body>\n</html>"},
"because the registry is temporarily unavailbe",
"because the registry is temporarily unavailable",
},
}
for i, test := range tests {

View File

@ -42,6 +42,7 @@ import (
"k8s.io/kubernetes/pkg/kubelet/lifecycle"
"k8s.io/kubernetes/pkg/kubelet/metrics"
"k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/hairpin"
"k8s.io/kubernetes/pkg/kubelet/prober"
"k8s.io/kubernetes/pkg/kubelet/qos"
kubeletTypes "k8s.io/kubernetes/pkg/kubelet/types"
@ -1734,6 +1735,16 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, runningPod kubecontainer.Pod, pod
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, podFullName)
return err
}
// Setup the host interface (FIXME: move to networkPlugin when ready)
podInfraContainer, err := dm.client.InspectContainer(string(podInfraContainerID))
if err != nil {
glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, podFullName)
return err
}
if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, "eth0"); err != nil {
glog.Warningf("Hairpin setup failed for pod %q: %v", podFullName, err)
}
}
// Start everything

View File

@ -885,7 +885,7 @@ func TestSyncPodCreateNetAndContainer(t *testing.T) {
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
"create", "start", "inspect_container", "inspect_container",
// Create container.
"create", "start", "inspect_container",
})
@ -934,7 +934,7 @@ func TestSyncPodCreatesNetAndContainerPullsImage(t *testing.T) {
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
"create", "start", "inspect_container", "inspect_container",
// Create container.
"create", "start", "inspect_container",
})
@ -1027,7 +1027,7 @@ func TestSyncPodDeletesWithNoPodInfraContainer(t *testing.T) {
// Kill the container since pod infra container is not running.
"stop",
// Create pod infra container.
"create", "start", "inspect_container",
"create", "start", "inspect_container", "inspect_container",
// Create container.
"create", "start", "inspect_container",
})
@ -2093,7 +2093,7 @@ func TestSyncPodWithTerminationLog(t *testing.T) {
runSyncPod(t, dm, fakeDocker, pod, nil)
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
"create", "start", "inspect_container", "inspect_container",
// Create container.
"create", "start", "inspect_container",
})
@ -2132,7 +2132,7 @@ func TestSyncPodWithHostNetwork(t *testing.T) {
verifyCalls(t, fakeDocker, []string{
// Create pod infra container.
"create", "start", "inspect_container",
"create", "start", "inspect_container", "inspect_container",
// Create container.
"create", "start", "inspect_container",
})

View File

@ -0,0 +1,100 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hairpin
import (
"fmt"
"io/ioutil"
"net"
"path"
"regexp"
"strconv"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util/exec"
)
const (
sysfsNetPath = "/sys/devices/virtual/net"
hairpinModeRelativePath = "brport/hairpin_mode"
hairpinEnable = "1"
)
var (
ethtoolOutputRegex = regexp.MustCompile("peer_ifindex: (\\d+)")
)
func SetUpContainer(containerPid int, containerInterfaceName string) error {
e := exec.New()
return setUpContainerInternal(e, containerPid, containerInterfaceName)
}
func setUpContainerInternal(e exec.Interface, containerPid int, containerInterfaceName string) error {
hostIfName, err := findPairInterfaceOfContainerInterface(e, containerPid, containerInterfaceName)
if err != nil {
glog.Infof("Unable to find pair interface, setting up all interfaces: %v", err)
return setUpAllInterfaces()
}
return setUpInterface(hostIfName)
}
func findPairInterfaceOfContainerInterface(e exec.Interface, containerPid int, containerInterfaceName string) (string, error) {
nsenterPath, err := e.LookPath("nsenter")
if err != nil {
return "", err
}
ethtoolPath, err := e.LookPath("ethtool")
if err != nil {
return "", err
}
// Get container's interface index
output, err := e.Command(nsenterPath, "-t", fmt.Sprintf("%d", containerPid), "-n", "-F", "--", ethtoolPath, "--statistics", containerInterfaceName).CombinedOutput()
if err != nil {
return "", fmt.Errorf("Unable to query interface %s of container %d: %v", containerInterfaceName, containerPid, err)
}
// look for peer_ifindex
match := ethtoolOutputRegex.FindSubmatch(output)
if match == nil {
return "", fmt.Errorf("No peer_ifindex in interface statistics for %s of container %d", containerInterfaceName, containerPid)
}
peerIfIndex, err := strconv.Atoi(string(match[1]))
if err != nil { // seems impossible (\d+ not numeric)
return "", fmt.Errorf("peer_ifindex wasn't numeric: %s: %v", match[1], err)
}
iface, err := net.InterfaceByIndex(peerIfIndex)
if err != nil {
return "", err
}
return iface.Name, nil
}
func setUpAllInterfaces() error {
interfaces, err := net.Interfaces()
if err != nil {
return err
}
for _, netIf := range interfaces {
setUpInterface(netIf.Name) // ignore errors
}
return nil
}
func setUpInterface(ifName string) error {
glog.V(3).Infof("Enabling hairpin on interface %s", ifName)
hairpinModeFile := path.Join(sysfsNetPath, ifName, hairpinModeRelativePath)
return ioutil.WriteFile(hairpinModeFile, []byte(hairpinEnable), 0644)
}

View File

@ -0,0 +1,96 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package hairpin
import (
"errors"
"fmt"
"net"
"strings"
"testing"
"k8s.io/kubernetes/pkg/util/exec"
)
func TestFindPairInterfaceOfContainerInterface(t *testing.T) {
// there should be at least "lo" on any system
interfaces, _ := net.Interfaces()
validOutput := fmt.Sprintf("garbage\n peer_ifindex: %d", interfaces[0].Index)
invalidOutput := fmt.Sprintf("garbage\n unknown: %d", interfaces[0].Index)
tests := []struct {
output string
err error
expectedName string
expectErr bool
}{
{
output: validOutput,
expectedName: interfaces[0].Name,
},
{
output: invalidOutput,
expectErr: true,
},
{
output: validOutput,
err: errors.New("error"),
expectErr: true,
},
}
for _, test := range tests {
fcmd := exec.FakeCmd{
CombinedOutputScript: []exec.FakeCombinedOutputAction{
func() ([]byte, error) { return []byte(test.output), test.err },
},
}
fexec := exec.FakeExec{
CommandScript: []exec.FakeCommandAction{
func(cmd string, args ...string) exec.Cmd {
return exec.InitFakeCmd(&fcmd, cmd, args...)
},
},
LookPathFunc: func(file string) (string, error) {
return fmt.Sprintf("/fake-bin/%s", file), nil
},
}
name, err := findPairInterfaceOfContainerInterface(&fexec, 123, "eth0")
if test.expectErr {
if err == nil {
t.Errorf("unexpected non-error")
}
} else {
if err != nil {
t.Errorf("unexpected error: %v", err)
}
}
if name != test.expectedName {
t.Errorf("unexpected name: %s (expected: %s)", name, test.expectedName)
}
}
}
func TestSetUpInterface(t *testing.T) {
err := setUpInterface("non-existent")
if err == nil {
t.Errorf("unexpected non-error")
}
hairpinModeFile := fmt.Sprintf("%s/%s/%s", sysfsNetPath, "non-existent", hairpinModeRelativePath)
if !strings.Contains(fmt.Sprintf("%v", err), hairpinModeFile) {
t.Errorf("should have tried to open %s", hairpinModeFile)
}
}

View File

@ -147,6 +147,13 @@ func TestRunOnce(t *testing.T) {
State: docker.State{Running: true, Pid: 42},
},
},
{
label: "syncPod",
container: docker.Container{
Config: &docker.Config{Image: "someimage"},
State: docker.State{Running: true, Pid: 42},
},
},
},
t: t,
}

View File

@ -27,6 +27,9 @@ type Interface interface {
// Command returns a Cmd instance which can be used to run a single command.
// This follows the pattern of package os/exec.
Command(cmd string, args ...string) Cmd
// LookPath wraps os/exec.LookPath
LookPath(file string) (string, error)
}
// Cmd is an interface that presents an API that is very similar to Cmd from os/exec.
@ -62,6 +65,11 @@ func (executor *executor) Command(cmd string, args ...string) Cmd {
return (*cmdWrapper)(osexec.Command(cmd, args...))
}
// LookPath is part of the Interface interface
func (executor *executor) LookPath(file string) (string, error) {
return osexec.LookPath(file)
}
// Wraps exec.Cmd so we can capture errors.
type cmdWrapper osexec.Cmd

View File

@ -17,6 +17,7 @@ limitations under the License.
package exec
import (
osexec "os/exec"
"testing"
)
@ -81,3 +82,13 @@ func TestExecutorWithArgs(t *testing.T) {
t.Errorf("unexpected output: %q", string(out))
}
}
func TestLookPath(t *testing.T) {
ex := New()
shExpected, _ := osexec.LookPath("sh")
sh, _ := ex.LookPath("sh")
if sh != shExpected {
t.Errorf("unexpected result for LookPath: got %s, expected %s", sh, shExpected)
}
}

View File

@ -24,6 +24,7 @@ import (
type FakeExec struct {
CommandScript []FakeCommandAction
CommandCalls int
LookPathFunc func(string) (string, error)
}
type FakeCommandAction func(cmd string, args ...string) Cmd
@ -37,6 +38,10 @@ func (fake *FakeExec) Command(cmd string, args ...string) Cmd {
return fake.CommandScript[i](cmd, args...)
}
func (fake *FakeExec) LookPath(file string) (string, error) {
return fake.LookPathFunc(file)
}
// A simple scripted Cmd type.
type FakeCmd struct {
Argv []string