diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4f8dc8ef7..7633c6c9dc 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{ {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, + {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, @@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() { } writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - if hasEndpoints { - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + fwChain := svcInfo.serviceFirewallChainName + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { + if ingress.IP != "" { + if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { writeBytesLine(proxier.natChains, chain) @@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() { // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } else { + // No endpoints. + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), + "--dport", strconv.Itoa(svcInfo.Port), + "-j", "REJECT", + ) } } } - // FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints? // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but @@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), diff --git a/test/e2e/framework/networking_utils.go b/test/e2e/framework/networking_utils.go index 7d43570aa9..acaf74553f 100644 --- a/test/e2e/framework/networking_utils.go +++ b/test/e2e/framework/networking_utils.go @@ -17,7 +17,6 @@ limitations under the License. package framework import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -708,13 +707,131 @@ func CheckReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, n ExpectNoError(err) } -// Does an HTTP GET, but does not reuse TCP connections -// This masks problems where the iptables rule has changed, but we don't see it -// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout -func httpGetNoConnectionPool(url string) (*http.Response, error) { - return httpGetNoConnectionPoolTimeout(url, 5*time.Second) +type HTTPPokeParams struct { + Timeout time.Duration + ExpectCode int // default = 200 + BodyContains string + RetriableCodes []int } +type HTTPPokeResult struct { + Status HTTPPokeStatus + Code int // HTTP code: 0 if the connection was not made + Error error // if there was any error + Body []byte // if code != 0 +} + +type HTTPPokeStatus string + +const ( + HTTPSuccess HTTPPokeStatus = "Success" + HTTPError HTTPPokeStatus = "UnknownError" + // Any time we add new errors, we should audit all callers of this. + HTTPTimeout HTTPPokeStatus = "TimedOut" + HTTPRefused HTTPPokeStatus = "ConnectionRefused" + HTTPRetryCode HTTPPokeStatus = "RetryCode" + HTTPWrongCode HTTPPokeStatus = "WrongCode" + HTTPBadResponse HTTPPokeStatus = "BadResponse" +) + +// PokeHTTP tries to connect to a host on a port for a given URL path. Callers +// can specify additional success parameters, if desired. +// +// The result status will be characterized as precisely as possible, given the +// known users of this. +// +// The result code will be zero in case of any failure to connect, or non-zero +// if the HTTP transaction completed (even if the other test params make this a +// failure). +// +// The result error will be populated for any status other than Success. +// +// The result body will be populated if the HTTP transaction was completed, even +// if the other test params make this a failure). +func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult { + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("http://%s%s", hostPort, path) + + ret := HTTPPokeResult{} + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + Failf("Got empty host for HTTP poke (%s)", url) + return ret + } + if port == 0 { + Failf("Got port==0 for HTTP poke (%s)", url) + return ret + } + + // Set default params. + if params == nil { + params = &HTTPPokeParams{} + } + if params.ExpectCode == 0 { + params.ExpectCode = http.StatusOK + } + + Logf("Poking %q", url) + + resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = HTTPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = HTTPRefused + } else { + ret.Status = HTTPError + } + Logf("Poke(%q): %v", url, err) + return ret + } + + ret.Code = resp.StatusCode + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + ret.Status = HTTPError + ret.Error = fmt.Errorf("error reading HTTP body: %v", err) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + ret.Body = make([]byte, len(body)) + copy(ret.Body, body) + + if resp.StatusCode != params.ExpectCode { + for _, code := range params.RetriableCodes { + if resp.StatusCode == code { + ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode) + ret.Status = HTTPRetryCode + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + } + ret.Status = HTTPWrongCode + ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + + if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) { + ret.Status = HTTPBadResponse + ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body)) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + + ret.Status = HTTPSuccess + Logf("Poke(%q): success", url) + return ret +} + +// Does an HTTP GET, but does not reuse TCP connections +// This masks problems where the iptables rule has changed, but we don't see it func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { tr := utilnet.SetTransportDefaults(&http.Transport{ DisableKeepAlives: true, @@ -727,178 +844,126 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re return client.Get(url) } -func TestReachableHTTP(ip string, port int, request string, expect string) (bool, error) { - return TestReachableHTTPWithContent(ip, port, request, expect, nil) +type UDPPokeParams struct { + Timeout time.Duration + Response string } -func TestReachableHTTPWithRetriableErrorCodes(ip string, port int, request string, expect string, retriableErrCodes []int) (bool, error) { - return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, nil, retriableErrCodes, time.Second*5) +type UDPPokeResult struct { + Status UDPPokeStatus + Error error // if there was any error + Response []byte // if code != 0 } -func TestReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) { - return TestReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second) -} +type UDPPokeStatus string -func TestReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) { - return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, content, []int{}, timeout) -} +const ( + UDPSuccess UDPPokeStatus = "Success" + UDPError UDPPokeStatus = "UnknownError" + // Any time we add new errors, we should audit all callers of this. + UDPTimeout UDPPokeStatus = "TimedOut" + UDPRefused UDPPokeStatus = "ConnectionRefused" + UDPBadResponse UDPPokeStatus = "BadResponse" +) -func TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip string, port int, request string, expect string, content *bytes.Buffer, retriableErrCodes []int, timeout time.Duration) (bool, error) { +// PokeUDP tries to connect to a host on a port and send the given request. Callers +// can specify additional success parameters, if desired. +// +// The result status will be characterized as precisely as possible, given the +// known users of this. +// +// The result error will be populated for any status other than Success. +// +// The result response will be populated if the UDP transaction was completed, even +// if the other test params make this a failure). +func PokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult { + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("udp://%s", hostPort) - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - url := fmt.Sprintf("http://%s%s", ipPort, request) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", url) - return false, nil + ret := UDPPokeResult{} + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + Failf("Got empty host for UDP poke (%s)", url) + return ret } if port == 0 { - Failf("Got port==0 for reachability check (%s)", url) - return false, nil + Failf("Got port==0 for UDP poke (%s)", url) + return ret } - Logf("Testing HTTP reachability of %v", url) + // Set default params. + if params == nil { + params = &UDPPokeParams{} + } - resp, err := httpGetNoConnectionPoolTimeout(url, timeout) + Logf("Poking %v", url) + + con, err := net.Dial("udp", hostPort) if err != nil { - Logf("Got error testing for reachability of %s: %v", url, err) - return false, nil + ret.Status = UDPError + ret.Error = err + Logf("Poke(%q): %v", url, err) + return ret } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + + _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) if err != nil { - Logf("Got error reading response from %s: %v", url, err) - return false, nil - } - if resp.StatusCode != 200 { - for _, code := range retriableErrCodes { - if resp.StatusCode == code { - Logf("Got non-success status %q when trying to access %s, but the error code is retriable", resp.Status, url) - return false, nil - } + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError } - return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", - resp.Status, url, string(body)) - } - if !strings.Contains(string(body), expect) { - return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body)) - } - if content != nil { - content.Write(body) - } - return true, nil -} - -func TestNotReachableHTTP(ip string, port int) (bool, error) { - return TestNotReachableHTTPTimeout(ip, port, 5*time.Second) -} - -func TestNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - url := fmt.Sprintf("http://%s", ipPort) - if ip == "" { - Failf("Got empty IP for non-reachability check (%s)", url) - return false, nil - } - if port == 0 { - Failf("Got port==0 for non-reachability check (%s)", url) - return false, nil + Logf("Poke(%q): %v", url, err) + return ret } - Logf("Testing HTTP non-reachability of %v", url) + if params.Timeout != 0 { + err = con.SetDeadline(time.Now().Add(params.Timeout)) + if err != nil { + ret.Status = UDPError + ret.Error = err + Logf("Poke(%q): %v", url, err) + return ret + } + } - resp, err := httpGetNoConnectionPoolTimeout(url, timeout) + bufsize := len(params.Response) + 1 + if bufsize == 0 { + bufsize = 4096 + } + var buf []byte = make([]byte, bufsize) + n, err := con.Read(buf) if err != nil { - Logf("Confirmed that %s is not reachable", url) - return true, nil + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + Logf("Poke(%q): %v", url, err) + return ret } - resp.Body.Close() - return false, nil -} + ret.Response = buf[0:n] -func TestReachableUDP(ip string, port int, request string, expect string) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - uri := fmt.Sprintf("udp://%s", ipPort) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", uri) - return false, nil - } - if port == 0 { - Failf("Got port==0 for reachability check (%s)", uri) - return false, nil + if params.Response != "" && string(ret.Response) != params.Response { + ret.Status = UDPBadResponse + ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) + Logf("Poke(%q): %v", url, ret.Error) + return ret } - Logf("Testing UDP reachability of %v", uri) - - con, err := net.Dial("udp", ipPort) - if err != nil { - return false, fmt.Errorf("Failed to dial %s: %v", ipPort, err) - } - - _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) - if err != nil { - return false, fmt.Errorf("Failed to send request: %v", err) - } - - var buf []byte = make([]byte, len(expect)+1) - - err = con.SetDeadline(time.Now().Add(3 * time.Second)) - if err != nil { - return false, fmt.Errorf("Failed to set deadline: %v", err) - } - - _, err = con.Read(buf) - if err != nil { - return false, nil - } - - if !strings.Contains(string(buf), expect) { - return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf)) - } - - Logf("Successfully reached %v", uri) - return true, nil -} - -func TestNotReachableUDP(ip string, port int, request string) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - uri := fmt.Sprintf("udp://%s", ipPort) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", uri) - return false, nil - } - if port == 0 { - Failf("Got port==0 for reachability check (%s)", uri) - return false, nil - } - - Logf("Testing UDP non-reachability of %v", uri) - - con, err := net.Dial("udp", ipPort) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - var buf []byte = make([]byte, 1) - - err = con.SetDeadline(time.Now().Add(3 * time.Second)) - if err != nil { - return false, fmt.Errorf("Failed to set deadline: %v", err) - } - - _, err = con.Read(buf) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - return false, nil + ret.Status = UDPSuccess + Logf("Poke(%q): success", url) + return ret } func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error { @@ -911,13 +976,12 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout hittedHosts := sets.NewString() count := 0 condition := func() (bool, error) { - var respBody bytes.Buffer - reached, err := TestReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody, - 1*time.Second) - if err != nil || !reached { + result := PokeHTTP(externalIP, int(httpPort), "/hostname", &HTTPPokeParams{Timeout: 1 * time.Second}) + if result.Status != HTTPSuccess { return false, nil } - hittedHost := strings.TrimSpace(respBody.String()) + + hittedHost := strings.TrimSpace(string(result.Body)) if !expectedHosts.Has(hittedHost) { Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count) count = 0 diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 7ef9d711cd..b98ddba72e 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -623,6 +623,7 @@ func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout // name as the jig and runs the "netexec" container. func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController { var replicas int32 = 1 + var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down rc := &v1.ReplicationController{ ObjectMeta: metav1.ObjectMeta{ @@ -654,7 +655,7 @@ func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationControll }, }, }, - TerminationGracePeriodSeconds: new(int64), + TerminationGracePeriodSeconds: &grace, }, }, }, @@ -737,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati return result } +func (j *ServiceTestJig) Scale(namespace string, replicas int) { + rc := j.Name + scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get scale for RC %q: %v", rc, err) + } + + scale.Spec.Replicas = int32(replicas) + _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale) + if err != nil { + Failf("Failed to scale RC %q: %v", rc, err) + } + pods, err := j.waitForPodsCreated(namespace, replicas) + if err != nil { + Failf("Failed waiting for pods: %v", err) + } + if err := j.waitForPodsReady(namespace, pods); err != nil { + Failf("Failed waiting for pods to be running: %v", err) + } + return +} + func (j *ServiceTestJig) waitForPdbReady(namespace string) error { timeout := 2 * time.Minute for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { @@ -875,9 +898,19 @@ func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.D } func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { - return TestReachableHTTPWithRetriableErrorCodes(host, port, "/echo?msg=hello", "hello", retriableErrCodes) - }); err != nil { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/echo?msg=hello", + &HTTPPokeParams{ + BodyContains: "hello", + RetriableCodes: retriableErrCodes, + }) + if result.Status == HTTPSuccess { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { if err == wait.ErrWaitTimeout { Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout) } else { @@ -887,36 +920,87 @@ func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, p } func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil { - Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err) + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Code == 0 { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err) + } +} + +func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Status == HTTPRefused { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v not rejected: %v", host, port, err) } } func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{ + Timeout: 3 * time.Second, + Response: "hello", + }) + if result.Status == UDPSuccess { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) } } func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil { - Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err) + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status != UDPSuccess && result.Status != UDPError { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err) + } +} + +func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status == UDPRefused { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v not rejected: %v", host, port, err) } } func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer - var err error if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { - var result bool - result, err = TestReachableHTTPWithContent(host, port, url, "", &body) - if err != nil { - Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err) - return false, nil + result := PokeHTTP(host, port, url, nil) + if result.Status == HTTPSuccess { + body.Write(result.Body) + return true, nil } - return result, nil + return false, nil }); pollErr != nil { - Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err) + Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr) } return body } @@ -929,7 +1013,7 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err return false, fmt.Errorf("Invalid input ip or port") } Logf("Testing HTTP health check on %v", url) - resp, err := httpGetNoConnectionPool(url) + resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second) if err != nil { Logf("Got error testing for reachability of %s: %v", url, err) return false, err diff --git a/test/e2e/network/firewall.go b/test/e2e/network/firewall.go index db43181ab7..cfa3ffcaef 100644 --- a/test/e2e/network/firewall.go +++ b/test/e2e/network/firewall.go @@ -188,11 +188,11 @@ var _ = SIGDescribe("Firewall rule", func() { }) func assertNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) { - unreachable, err := framework.TestNotReachableHTTPTimeout(ip, port, timeout) - if err != nil { - framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, err) + result := framework.PokeHTTP(ip, port, "/", &framework.HTTPPokeParams{Timeout: timeout}) + if result.Status == framework.HTTPError { + framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, result.Error) } - if !unreachable { + if result.Code != 0 { framework.Failf("Was unexpectedly able to reach %s:%d", ip, port) } } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 3df7628386..55f743992d 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() { jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) By("hitting the TCP service's LoadBalancer") - jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { By("hitting the UDP service's LoadBalancer") - jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 0") + jig.Scale(ns1, 0) + jig.Scale(ns2, 0) + + By("looking for ICMP REJECT on the TCP service's NodePort") + jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the UDP service's NodePort") + jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the TCP service's LoadBalancer") + jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("looking for ICMP REJECT on the UDP service's LoadBalancer") + jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 1") + jig.Scale(ns1, 1) + jig.Scale(ns2, 1) + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } // Change the services back to ClusterIP. @@ -2063,14 +2099,18 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { for nodeName, nodeIPs := range endpointNodeMap { By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) var body bytes.Buffer - var result bool - var err error - if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, func() (bool, error) { - result, err = framework.TestReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body) - return !result, nil - }); pollErr != nil { - framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v", - nodeName, healthCheckNodePort, err, body.String()) + pollfn := func() (bool, error) { + result := framework.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil) + if result.Code == 0 { + return true, nil + } + body.Reset() + body.Write(result.Body) + return false, nil + } + if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, pollfn); pollErr != nil { + framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s", + nodeName, healthCheckNodePort, body.String()) } }