mirror of https://github.com/k3s-io/k3s
Revert "Networking test rework to support modular, layered service soak"
parent
dfb400e2e9
commit
e86d5c9e7d
|
@ -18,25 +18,25 @@ package e2e
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
. "github.com/onsi/ginkgo"
|
"net/http"
|
||||||
. "github.com/onsi/gomega"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/unversioned"
|
"k8s.io/kubernetes/pkg/api/unversioned"
|
||||||
"k8s.io/kubernetes/pkg/fields"
|
"k8s.io/kubernetes/pkg/fields"
|
||||||
"k8s.io/kubernetes/pkg/labels"
|
"k8s.io/kubernetes/pkg/labels"
|
||||||
"k8s.io/kubernetes/pkg/util"
|
"k8s.io/kubernetes/pkg/util"
|
||||||
"k8s.io/kubernetes/pkg/util/wait"
|
|
||||||
"net/http"
|
|
||||||
"strconv"
|
|
||||||
"strings"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
//versions ~ 1.3 (original RO test), 1.6 uses newer services/tokens,...
|
. "github.com/onsi/ginkgo"
|
||||||
const nettestVersion = "1.6"
|
. "github.com/onsi/gomega"
|
||||||
|
)
|
||||||
|
|
||||||
var _ = Describe("Networking", func() {
|
var _ = Describe("Networking", func() {
|
||||||
f := NewFramework("nettest")
|
f := NewFramework("nettest")
|
||||||
|
|
||||||
|
var svcname = "nettest"
|
||||||
|
|
||||||
BeforeEach(func() {
|
BeforeEach(func() {
|
||||||
//Assert basic external connectivity.
|
//Assert basic external connectivity.
|
||||||
//Since this is not really a test of kubernetes in any way, we
|
//Since this is not really a test of kubernetes in any way, we
|
||||||
|
@ -82,7 +82,7 @@ var _ = Describe("Networking", func() {
|
||||||
})
|
})
|
||||||
|
|
||||||
// First test because it has no dependencies on variables created later on.
|
// First test because it has no dependencies on variables created later on.
|
||||||
It("should provide unchanging, static URL paths for kubernetes api services [Conformance].", func() {
|
It("should provide unchanging, static URL paths for kubernetes api services [Conformance]", func() {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
path string
|
path string
|
||||||
}{
|
}{
|
||||||
|
@ -102,221 +102,153 @@ var _ = Describe("Networking", func() {
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
//Each tuple defined in this struct array represents
|
//Now we can proceed with the test.
|
||||||
//a number of services, and a timeout. So for example,
|
It("should function for intra-pod communication [Conformance]", func() {
|
||||||
//{1, 300} defines a test where 1 service is created and
|
|
||||||
//we give it 300 seconds to timeout. This is how we test
|
|
||||||
//services in parallel... we can create a tuple like {5,400}
|
|
||||||
//to confirm that services over 5 ports all pass the networking test.
|
|
||||||
serviceSoakTests := []struct {
|
|
||||||
service int
|
|
||||||
timeoutSeconds time.Duration
|
|
||||||
}{
|
|
||||||
//These are very liberal, once this test is running regularly,
|
|
||||||
//We will DECREASE the timeout value.
|
|
||||||
//Replace this with --scale constants eventually https://github.com/kubernetes/kubernetes/issues/10479.
|
|
||||||
{1, time.Duration(100 * time.Second)},
|
|
||||||
{3, time.Duration(200 * time.Second)}, //Test that parallel nettests running on different ports complete.
|
|
||||||
}
|
|
||||||
|
|
||||||
for _, svcSoak := range serviceSoakTests {
|
By(fmt.Sprintf("Creating a service named %q in namespace %q", svcname, f.Namespace.Name))
|
||||||
//copy to local to avoid range overwriting
|
svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{
|
||||||
timeoutSeconds := svcSoak.timeoutSeconds
|
ObjectMeta: api.ObjectMeta{
|
||||||
serviceNum := svcSoak.service
|
Name: svcname,
|
||||||
It(fmt.Sprintf("should function for intrapod communication between all hosts in %v parallel services [Conformance]", serviceNum),
|
Labels: map[string]string{
|
||||||
func() {
|
"name": svcname,
|
||||||
Logf("running service test with timeout = %v for %v", timeout, serviceNum)
|
},
|
||||||
runNetTest(timeoutSeconds, f, makePorts(serviceNum), nettestVersion)
|
},
|
||||||
})
|
Spec: api.ServiceSpec{
|
||||||
}
|
Ports: []api.ServicePort{{
|
||||||
})
|
Protocol: "TCP",
|
||||||
|
Port: 8080,
|
||||||
|
TargetPort: util.NewIntOrStringFromInt(8080),
|
||||||
|
}},
|
||||||
|
Selector: map[string]string{
|
||||||
|
"name": svcname,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
Failf("unable to create test service named [%s] %v", svc.Name, err)
|
||||||
|
}
|
||||||
|
|
||||||
//pollPeerStatus will either fail, pass, or continue polling.
|
// Clean up service
|
||||||
//When completed, it will write the service name to the channel provided, thus
|
defer func() {
|
||||||
//facilitating parallel service testing.
|
By("Cleaning up the service")
|
||||||
func pollPeerStatus(serviceDoneChannel chan string, f *Framework, svc *api.Service, pollTimeoutSeconds time.Duration) {
|
if err = f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil {
|
||||||
|
Failf("unable to delete svc %v: %v", svc.Name, err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
Logf("Begin polling " + svc.Name)
|
By("Creating a webserver (pending) pod on each node")
|
||||||
getDetails := func() ([]byte, error) {
|
|
||||||
return f.Client.Get().
|
|
||||||
Namespace(f.Namespace.Name).
|
|
||||||
Prefix("proxy").
|
|
||||||
Resource("services").
|
|
||||||
Name(svc.Name).
|
|
||||||
Suffix("read").
|
|
||||||
DoRaw()
|
|
||||||
}
|
|
||||||
|
|
||||||
getStatus := func() ([]byte, error) {
|
nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything())
|
||||||
return f.Client.Get().
|
|
||||||
Namespace(f.Namespace.Name).
|
|
||||||
Prefix("proxy").
|
|
||||||
Resource("services").
|
|
||||||
Name(svc.Name).
|
|
||||||
Suffix("status").
|
|
||||||
DoRaw()
|
|
||||||
}
|
|
||||||
|
|
||||||
passed := false
|
|
||||||
|
|
||||||
expectNoError(wait.Poll(2*time.Second, pollTimeoutSeconds, func() (bool, error) {
|
|
||||||
body, err := getStatus()
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
Failf("Failed to list nodes: %v", err)
|
Failf("Failed to list nodes: %v", err)
|
||||||
}
|
}
|
||||||
|
// previous tests may have cause failures of some nodes. Let's skip
|
||||||
|
// 'Not Ready' nodes, just in case (there is no need to fail the test).
|
||||||
|
filterNodes(nodes, func(node api.Node) bool {
|
||||||
|
return isNodeReadySetAsExpected(&node, true)
|
||||||
|
})
|
||||||
|
|
||||||
// Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers.
|
if len(nodes.Items) == 0 {
|
||||||
switch {
|
Failf("No Ready nodes found.")
|
||||||
case string(body) == "pass":
|
}
|
||||||
passed = true
|
if len(nodes.Items) == 1 {
|
||||||
return true, nil
|
// in general, the test requires two nodes. But for local development, often a one node cluster
|
||||||
case string(body) == "running":
|
// is created, for simplicity and speed. (see issue #10012). We permit one-node test
|
||||||
case string(body) == "fail":
|
// only in some cases
|
||||||
if body, err = getDetails(); err != nil {
|
if !providerIs("local") {
|
||||||
Failf("Failed on attempt. Error reading details: %v", err)
|
Failf(fmt.Sprintf("The test requires two Ready nodes on %s, but found just one.", testContext.Provider))
|
||||||
return false, err
|
|
||||||
} else {
|
|
||||||
Failf("Failed on attempt. Details:\n%s", string(body))
|
|
||||||
return false, nil
|
|
||||||
}
|
}
|
||||||
case strings.Contains(string(body), "no endpoints available"):
|
Logf("Only one ready node is detected. The test has limited scope in such setting. " +
|
||||||
Logf("Attempt: waiting on service/endpoints")
|
"Rerun it with at least two nodes to get complete coverage.")
|
||||||
default:
|
|
||||||
Logf("Unexpected response:\n%s", body)
|
|
||||||
}
|
}
|
||||||
return false, nil
|
|
||||||
}))
|
|
||||||
|
|
||||||
if !passed {
|
podNames := LaunchNetTestPodPerNode(f, nodes, svcname, "1.6")
|
||||||
if body, err := getDetails(); err != nil {
|
|
||||||
Failf("Timed out. Major error : Couldn't read service details: %v", err)
|
// Clean up the pods
|
||||||
} else {
|
defer func() {
|
||||||
Failf("Timed out. Service details :\n%s", string(body))
|
By("Cleaning up the webserver pods")
|
||||||
|
for _, podName := range podNames {
|
||||||
|
if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil {
|
||||||
|
Logf("Failed to delete pod %s: %v", podName, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
By("Waiting for the webserver pods to transition to Running state")
|
||||||
|
for _, podName := range podNames {
|
||||||
|
err = f.WaitForPodRunning(podName)
|
||||||
|
Expect(err).NotTo(HaveOccurred())
|
||||||
}
|
}
|
||||||
}
|
|
||||||
serviceDoneChannel <- svc.Name
|
|
||||||
}
|
|
||||||
|
|
||||||
//runNetTest Creates a single pod on each host which serves
|
By("Waiting for connectivity to be verified")
|
||||||
//on a unique port in the cluster. It then binds a service to
|
passed := false
|
||||||
//that port, so that there are "n" nodes to balance traffic to -
|
|
||||||
//finally, each node reaches out to ping every other node in
|
|
||||||
//the cluster on the given port.
|
|
||||||
//The more ports given, the more services will be spun up,
|
|
||||||
//i.e. one service per port.
|
|
||||||
//To test basic pod networking, send a single port.
|
|
||||||
//To soak test the services, we can send a range (i.e. 8000-9000).
|
|
||||||
func runNetTest(timeoutSeconds time.Duration, f *Framework, ports []int, nettestVersion string) {
|
|
||||||
nodes, err := f.Client.Nodes().List(labels.Everything(), fields.Everything())
|
|
||||||
expectNoError(err, "Failed to list nodes")
|
|
||||||
|
|
||||||
// previous tests may have caused failures of some nodes. Let's skip
|
//once response OK, evaluate response body for pass/fail.
|
||||||
// 'Not Ready' nodes, just in case (there is no need to fail the test).
|
var body []byte
|
||||||
filterNodes(nodes, func(node api.Node) bool {
|
getDetails := func() ([]byte, error) {
|
||||||
return isNodeReadySetAsExpected(&node, true)
|
return f.Client.Get().
|
||||||
|
Namespace(f.Namespace.Name).
|
||||||
|
Prefix("proxy").
|
||||||
|
Resource("services").
|
||||||
|
Name(svc.Name).
|
||||||
|
Suffix("read").
|
||||||
|
DoRaw()
|
||||||
|
}
|
||||||
|
|
||||||
|
getStatus := func() ([]byte, error) {
|
||||||
|
return f.Client.Get().
|
||||||
|
Namespace(f.Namespace.Name).
|
||||||
|
Prefix("proxy").
|
||||||
|
Resource("services").
|
||||||
|
Name(svc.Name).
|
||||||
|
Suffix("status").
|
||||||
|
DoRaw()
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout := time.Now().Add(2 * time.Minute)
|
||||||
|
for i := 0; !passed && timeout.After(time.Now()); i++ {
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
Logf("About to make a proxy status call")
|
||||||
|
start := time.Now()
|
||||||
|
body, err = getStatus()
|
||||||
|
Logf("Proxy status call returned in %v", time.Since(start))
|
||||||
|
if err != nil {
|
||||||
|
Logf("Attempt %v: service/pod still starting. (error: '%v')", i, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
// Finally, we pass/fail the test based on if the container's response body, as to whether or not it was able to find peers.
|
||||||
|
switch {
|
||||||
|
case string(body) == "pass":
|
||||||
|
Logf("Passed on attempt %v. Cleaning up.", i)
|
||||||
|
passed = true
|
||||||
|
case string(body) == "running":
|
||||||
|
Logf("Attempt %v: test still running", i)
|
||||||
|
case string(body) == "fail":
|
||||||
|
if body, err = getDetails(); err != nil {
|
||||||
|
Failf("Failed on attempt %v. Cleaning up. Error reading details: %v", i, err)
|
||||||
|
} else {
|
||||||
|
Failf("Failed on attempt %v. Cleaning up. Details:\n%s", i, string(body))
|
||||||
|
}
|
||||||
|
case strings.Contains(string(body), "no endpoints available"):
|
||||||
|
Logf("Attempt %v: waiting on service/endpoints", i)
|
||||||
|
default:
|
||||||
|
Logf("Unexpected response:\n%s", body)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !passed {
|
||||||
|
if body, err = getDetails(); err != nil {
|
||||||
|
Failf("Timed out. Cleaning up. Error reading details: %v", err)
|
||||||
|
} else {
|
||||||
|
Failf("Timed out. Cleaning up. Details:\n%s", string(body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Expect(string(body)).To(Equal("pass"))
|
||||||
})
|
})
|
||||||
|
|
||||||
if len(nodes.Items) == 0 {
|
})
|
||||||
Failf("No Ready nodes found.")
|
|
||||||
}
|
|
||||||
if len(nodes.Items) == 1 {
|
|
||||||
// in general, the test requires two nodes. But for local development, often a one node cluster
|
|
||||||
// is created, for simplicity and speed. (see issue #10012). We permit one-node test
|
|
||||||
// only in some cases
|
|
||||||
if !providerIs("local") {
|
|
||||||
Failf("The test requires two Ready nodes on %s, but found just one.", testContext.Provider)
|
|
||||||
}
|
|
||||||
Logf("Only one ready node is detected. The test has limited scope in such setting. " +
|
|
||||||
"Rerun it with at least two nodes to get complete coverage.")
|
|
||||||
}
|
|
||||||
|
|
||||||
portCompleteChannel := make(chan string, len(ports))
|
func LaunchNetTestPodPerNode(f *Framework, nodes *api.NodeList, name, version string) []string {
|
||||||
|
|
||||||
for p := range ports {
|
|
||||||
go func(nodes *api.NodeList, port int) {
|
|
||||||
var svcname = fmt.Sprintf("nettest-%v", port)
|
|
||||||
|
|
||||||
defer GinkgoRecover()
|
|
||||||
|
|
||||||
By(fmt.Sprintf("creating a service named %q in namespace %q", svcname, f.Namespace.Name))
|
|
||||||
svc, err := f.Client.Services(f.Namespace.Name).Create(&api.Service{
|
|
||||||
ObjectMeta: api.ObjectMeta{
|
|
||||||
Name: svcname,
|
|
||||||
Labels: map[string]string{
|
|
||||||
"name": svcname,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
Spec: api.ServiceSpec{
|
|
||||||
Ports: []api.ServicePort{{
|
|
||||||
Protocol: "TCP",
|
|
||||||
Port: port,
|
|
||||||
TargetPort: util.NewIntOrStringFromInt(port),
|
|
||||||
}},
|
|
||||||
Selector: map[string]string{
|
|
||||||
"name": svcname,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
})
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
Failf("unable to create test service named [%s] %v on port %v", svc.Name, err, port)
|
|
||||||
} else {
|
|
||||||
Logf("Created service successfully [%s]", svc.Name)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Clean up service
|
|
||||||
defer func() {
|
|
||||||
By("Cleaning up the service")
|
|
||||||
if err := f.Client.Services(f.Namespace.Name).Delete(svc.Name); err != nil {
|
|
||||||
Failf("unable to delete svc %v: %v", svc.Name, err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
Logf("launching pod per node.....")
|
|
||||||
podNames := launchNetTestPodPerNode(port, nettestVersion, f, nodes, svcname)
|
|
||||||
// Clean up the pods
|
|
||||||
defer func() {
|
|
||||||
By("Cleaning up the webserver pods")
|
|
||||||
for _, podName := range podNames {
|
|
||||||
if err = f.Client.Pods(f.Namespace.Name).Delete(podName, nil); err != nil {
|
|
||||||
Logf("Failed to delete pod %s: %v", podName, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
Logf("Launched test pods for %v", port)
|
|
||||||
By("waiting for all of the webserver pods to transition to Running + reaching the Passing state.")
|
|
||||||
|
|
||||||
for _, podName := range podNames {
|
|
||||||
err = f.WaitForPodRunning(podName)
|
|
||||||
Expect(err).NotTo(HaveOccurred())
|
|
||||||
By(fmt.Sprintf("waiting for connectivity to be verified [ port = %v ] ", port))
|
|
||||||
//once response OK, evaluate response body for pass/fail.
|
|
||||||
pollPeerStatus(portCompleteChannel, f, svc, timeoutSeconds)
|
|
||||||
}
|
|
||||||
|
|
||||||
Logf("Finished test pods for %v", port)
|
|
||||||
}(nodes, ports[p])
|
|
||||||
}
|
|
||||||
//now wait for the all X nettests to complete...
|
|
||||||
for pReturned := range ports {
|
|
||||||
Logf("Waiting on ports to report back. So far %v have been discovered...", pReturned)
|
|
||||||
Logf("... Another service has successfully been discovered: %v ( %v ) ", pReturned, <-portCompleteChannel)
|
|
||||||
}
|
|
||||||
Logf("Completed test on %v port/services", len(ports))
|
|
||||||
}
|
|
||||||
|
|
||||||
//makePorts makes a bunch of ports from 8080->8080+n
|
|
||||||
func makePorts(n int) []int {
|
|
||||||
m := make([]int, n)
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
m[i] = 8080 + i
|
|
||||||
}
|
|
||||||
return m
|
|
||||||
}
|
|
||||||
|
|
||||||
//launchNetTestPodPerNode launches nettest pods, and returns their names.
|
|
||||||
func launchNetTestPodPerNode(port int, version string, f *Framework, nodes *api.NodeList, name string) []string {
|
|
||||||
podNames := []string{}
|
podNames := []string{}
|
||||||
|
|
||||||
totalPods := len(nodes.Items)
|
totalPods := len(nodes.Items)
|
||||||
|
@ -337,13 +269,12 @@ func launchNetTestPodPerNode(port int, version string, f *Framework, nodes *api.
|
||||||
Name: "webserver",
|
Name: "webserver",
|
||||||
Image: "gcr.io/google_containers/nettest:" + version,
|
Image: "gcr.io/google_containers/nettest:" + version,
|
||||||
Args: []string{
|
Args: []string{
|
||||||
"-port=" + strconv.Itoa(port),
|
|
||||||
"-service=" + name,
|
"-service=" + name,
|
||||||
//peers >= totalPods should be asserted by the container.
|
//peers >= totalPods should be asserted by the container.
|
||||||
//the nettest container finds peers by looking up list of svc endpoints.
|
//the nettest container finds peers by looking up list of svc endpoints.
|
||||||
fmt.Sprintf("-peers=%d", totalPods),
|
fmt.Sprintf("-peers=%d", totalPods),
|
||||||
"-namespace=" + f.Namespace.Name},
|
"-namespace=" + f.Namespace.Name},
|
||||||
Ports: []api.ContainerPort{{ContainerPort: port}},
|
Ports: []api.ContainerPort{{ContainerPort: 8080}},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
NodeName: node.Name,
|
NodeName: node.Name,
|
||||||
|
|
Loading…
Reference in New Issue