From 19e333b5cc4ce36a1bae2c4d4ed2b97a837afcab Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 7 Mar 2019 17:14:18 -0800 Subject: [PATCH 1/3] Fix small race in e2e Occasionally we get spurious errors about "no route to host" when we race with kube-proxy. This should reduce that. It's mostly just log noise. --- test/e2e/framework/service_util.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 7ef9d711cd..a0d65f00d2 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -623,6 +623,7 @@ func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout // name as the jig and runs the "netexec" container. func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController { var replicas int32 = 1 + var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down rc := &v1.ReplicationController{ ObjectMeta: metav1.ObjectMeta{ @@ -654,7 +655,7 @@ func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationControll }, }, }, - TerminationGracePeriodSeconds: new(int64), + TerminationGracePeriodSeconds: &grace, }, }, }, From 382f5c83c0d02bdb981d0be0597ae4ea6de64f46 Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Thu, 7 Mar 2019 17:08:44 -0800 Subject: [PATCH 2/3] Retool HTTP and UDP e2e utils This is a prefactoring for followup changes that need to use very similar but subtly different test. Now it is more generic, though it pushes a little logic up the stack. That makes sense to me. --- test/e2e/framework/networking_utils.go | 382 +++++++++++++++---------- test/e2e/framework/service_util.go | 68 +++-- test/e2e/network/firewall.go | 8 +- test/e2e/network/service.go | 20 +- 4 files changed, 290 insertions(+), 188 deletions(-) diff --git a/test/e2e/framework/networking_utils.go b/test/e2e/framework/networking_utils.go index 7d43570aa9..acaf74553f 100644 --- a/test/e2e/framework/networking_utils.go +++ b/test/e2e/framework/networking_utils.go @@ -17,7 +17,6 @@ limitations under the License. package framework import ( - "bytes" "encoding/json" "fmt" "io/ioutil" @@ -708,13 +707,131 @@ func CheckReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, n ExpectNoError(err) } -// Does an HTTP GET, but does not reuse TCP connections -// This masks problems where the iptables rule has changed, but we don't see it -// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout -func httpGetNoConnectionPool(url string) (*http.Response, error) { - return httpGetNoConnectionPoolTimeout(url, 5*time.Second) +type HTTPPokeParams struct { + Timeout time.Duration + ExpectCode int // default = 200 + BodyContains string + RetriableCodes []int } +type HTTPPokeResult struct { + Status HTTPPokeStatus + Code int // HTTP code: 0 if the connection was not made + Error error // if there was any error + Body []byte // if code != 0 +} + +type HTTPPokeStatus string + +const ( + HTTPSuccess HTTPPokeStatus = "Success" + HTTPError HTTPPokeStatus = "UnknownError" + // Any time we add new errors, we should audit all callers of this. + HTTPTimeout HTTPPokeStatus = "TimedOut" + HTTPRefused HTTPPokeStatus = "ConnectionRefused" + HTTPRetryCode HTTPPokeStatus = "RetryCode" + HTTPWrongCode HTTPPokeStatus = "WrongCode" + HTTPBadResponse HTTPPokeStatus = "BadResponse" +) + +// PokeHTTP tries to connect to a host on a port for a given URL path. Callers +// can specify additional success parameters, if desired. +// +// The result status will be characterized as precisely as possible, given the +// known users of this. +// +// The result code will be zero in case of any failure to connect, or non-zero +// if the HTTP transaction completed (even if the other test params make this a +// failure). +// +// The result error will be populated for any status other than Success. +// +// The result body will be populated if the HTTP transaction was completed, even +// if the other test params make this a failure). +func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult { + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("http://%s%s", hostPort, path) + + ret := HTTPPokeResult{} + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + Failf("Got empty host for HTTP poke (%s)", url) + return ret + } + if port == 0 { + Failf("Got port==0 for HTTP poke (%s)", url) + return ret + } + + // Set default params. + if params == nil { + params = &HTTPPokeParams{} + } + if params.ExpectCode == 0 { + params.ExpectCode = http.StatusOK + } + + Logf("Poking %q", url) + + resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout) + if err != nil { + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = HTTPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = HTTPRefused + } else { + ret.Status = HTTPError + } + Logf("Poke(%q): %v", url, err) + return ret + } + + ret.Code = resp.StatusCode + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + ret.Status = HTTPError + ret.Error = fmt.Errorf("error reading HTTP body: %v", err) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + ret.Body = make([]byte, len(body)) + copy(ret.Body, body) + + if resp.StatusCode != params.ExpectCode { + for _, code := range params.RetriableCodes { + if resp.StatusCode == code { + ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode) + ret.Status = HTTPRetryCode + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + } + ret.Status = HTTPWrongCode + ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + + if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) { + ret.Status = HTTPBadResponse + ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body)) + Logf("Poke(%q): %v", url, ret.Error) + return ret + } + + ret.Status = HTTPSuccess + Logf("Poke(%q): success", url) + return ret +} + +// Does an HTTP GET, but does not reuse TCP connections +// This masks problems where the iptables rule has changed, but we don't see it func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { tr := utilnet.SetTransportDefaults(&http.Transport{ DisableKeepAlives: true, @@ -727,178 +844,126 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re return client.Get(url) } -func TestReachableHTTP(ip string, port int, request string, expect string) (bool, error) { - return TestReachableHTTPWithContent(ip, port, request, expect, nil) +type UDPPokeParams struct { + Timeout time.Duration + Response string } -func TestReachableHTTPWithRetriableErrorCodes(ip string, port int, request string, expect string, retriableErrCodes []int) (bool, error) { - return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, nil, retriableErrCodes, time.Second*5) +type UDPPokeResult struct { + Status UDPPokeStatus + Error error // if there was any error + Response []byte // if code != 0 } -func TestReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) { - return TestReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second) -} +type UDPPokeStatus string -func TestReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) { - return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, content, []int{}, timeout) -} +const ( + UDPSuccess UDPPokeStatus = "Success" + UDPError UDPPokeStatus = "UnknownError" + // Any time we add new errors, we should audit all callers of this. + UDPTimeout UDPPokeStatus = "TimedOut" + UDPRefused UDPPokeStatus = "ConnectionRefused" + UDPBadResponse UDPPokeStatus = "BadResponse" +) -func TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip string, port int, request string, expect string, content *bytes.Buffer, retriableErrCodes []int, timeout time.Duration) (bool, error) { +// PokeUDP tries to connect to a host on a port and send the given request. Callers +// can specify additional success parameters, if desired. +// +// The result status will be characterized as precisely as possible, given the +// known users of this. +// +// The result error will be populated for any status other than Success. +// +// The result response will be populated if the UDP transaction was completed, even +// if the other test params make this a failure). +func PokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult { + hostPort := net.JoinHostPort(host, strconv.Itoa(port)) + url := fmt.Sprintf("udp://%s", hostPort) - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - url := fmt.Sprintf("http://%s%s", ipPort, request) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", url) - return false, nil + ret := UDPPokeResult{} + + // Sanity check inputs, because it has happened. These are the only things + // that should hard fail the test - they are basically ASSERT()s. + if host == "" { + Failf("Got empty host for UDP poke (%s)", url) + return ret } if port == 0 { - Failf("Got port==0 for reachability check (%s)", url) - return false, nil + Failf("Got port==0 for UDP poke (%s)", url) + return ret } - Logf("Testing HTTP reachability of %v", url) + // Set default params. + if params == nil { + params = &UDPPokeParams{} + } - resp, err := httpGetNoConnectionPoolTimeout(url, timeout) + Logf("Poking %v", url) + + con, err := net.Dial("udp", hostPort) if err != nil { - Logf("Got error testing for reachability of %s: %v", url, err) - return false, nil + ret.Status = UDPError + ret.Error = err + Logf("Poke(%q): %v", url, err) + return ret } - defer resp.Body.Close() - body, err := ioutil.ReadAll(resp.Body) + + _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) if err != nil { - Logf("Got error reading response from %s: %v", url, err) - return false, nil - } - if resp.StatusCode != 200 { - for _, code := range retriableErrCodes { - if resp.StatusCode == code { - Logf("Got non-success status %q when trying to access %s, but the error code is retriable", resp.Status, url) - return false, nil - } + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError } - return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", - resp.Status, url, string(body)) - } - if !strings.Contains(string(body), expect) { - return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body)) - } - if content != nil { - content.Write(body) - } - return true, nil -} - -func TestNotReachableHTTP(ip string, port int) (bool, error) { - return TestNotReachableHTTPTimeout(ip, port, 5*time.Second) -} - -func TestNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - url := fmt.Sprintf("http://%s", ipPort) - if ip == "" { - Failf("Got empty IP for non-reachability check (%s)", url) - return false, nil - } - if port == 0 { - Failf("Got port==0 for non-reachability check (%s)", url) - return false, nil + Logf("Poke(%q): %v", url, err) + return ret } - Logf("Testing HTTP non-reachability of %v", url) + if params.Timeout != 0 { + err = con.SetDeadline(time.Now().Add(params.Timeout)) + if err != nil { + ret.Status = UDPError + ret.Error = err + Logf("Poke(%q): %v", url, err) + return ret + } + } - resp, err := httpGetNoConnectionPoolTimeout(url, timeout) + bufsize := len(params.Response) + 1 + if bufsize == 0 { + bufsize = 4096 + } + var buf []byte = make([]byte, bufsize) + n, err := con.Read(buf) if err != nil { - Logf("Confirmed that %s is not reachable", url) - return true, nil + ret.Error = err + neterr, ok := err.(net.Error) + if ok && neterr.Timeout() { + ret.Status = UDPTimeout + } else if strings.Contains(err.Error(), "connection refused") { + ret.Status = UDPRefused + } else { + ret.Status = UDPError + } + Logf("Poke(%q): %v", url, err) + return ret } - resp.Body.Close() - return false, nil -} + ret.Response = buf[0:n] -func TestReachableUDP(ip string, port int, request string, expect string) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - uri := fmt.Sprintf("udp://%s", ipPort) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", uri) - return false, nil - } - if port == 0 { - Failf("Got port==0 for reachability check (%s)", uri) - return false, nil + if params.Response != "" && string(ret.Response) != params.Response { + ret.Status = UDPBadResponse + ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response)) + Logf("Poke(%q): %v", url, ret.Error) + return ret } - Logf("Testing UDP reachability of %v", uri) - - con, err := net.Dial("udp", ipPort) - if err != nil { - return false, fmt.Errorf("Failed to dial %s: %v", ipPort, err) - } - - _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) - if err != nil { - return false, fmt.Errorf("Failed to send request: %v", err) - } - - var buf []byte = make([]byte, len(expect)+1) - - err = con.SetDeadline(time.Now().Add(3 * time.Second)) - if err != nil { - return false, fmt.Errorf("Failed to set deadline: %v", err) - } - - _, err = con.Read(buf) - if err != nil { - return false, nil - } - - if !strings.Contains(string(buf), expect) { - return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf)) - } - - Logf("Successfully reached %v", uri) - return true, nil -} - -func TestNotReachableUDP(ip string, port int, request string) (bool, error) { - ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) - uri := fmt.Sprintf("udp://%s", ipPort) - if ip == "" { - Failf("Got empty IP for reachability check (%s)", uri) - return false, nil - } - if port == 0 { - Failf("Got port==0 for reachability check (%s)", uri) - return false, nil - } - - Logf("Testing UDP non-reachability of %v", uri) - - con, err := net.Dial("udp", ipPort) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - _, err = con.Write([]byte(fmt.Sprintf("%s\n", request))) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - var buf []byte = make([]byte, 1) - - err = con.SetDeadline(time.Now().Add(3 * time.Second)) - if err != nil { - return false, fmt.Errorf("Failed to set deadline: %v", err) - } - - _, err = con.Read(buf) - if err != nil { - Logf("Confirmed that %s is not reachable", uri) - return true, nil - } - - return false, nil + ret.Status = UDPSuccess + Logf("Poke(%q): success", url) + return ret } func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error { @@ -911,13 +976,12 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout hittedHosts := sets.NewString() count := 0 condition := func() (bool, error) { - var respBody bytes.Buffer - reached, err := TestReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody, - 1*time.Second) - if err != nil || !reached { + result := PokeHTTP(externalIP, int(httpPort), "/hostname", &HTTPPokeParams{Timeout: 1 * time.Second}) + if result.Status != HTTPSuccess { return false, nil } - hittedHost := strings.TrimSpace(respBody.String()) + + hittedHost := strings.TrimSpace(string(result.Body)) if !expectedHosts.Has(hittedHost) { Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count) count = 0 diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index a0d65f00d2..172f52bd89 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -876,9 +876,19 @@ func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.D } func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { - return TestReachableHTTPWithRetriableErrorCodes(host, port, "/echo?msg=hello", "hello", retriableErrCodes) - }); err != nil { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/echo?msg=hello", + &HTTPPokeParams{ + BodyContains: "hello", + RetriableCodes: retriableErrCodes, + }) + if result.Status == HTTPSuccess { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { if err == wait.ErrWaitTimeout { Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout) } else { @@ -888,36 +898,60 @@ func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, p } func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil { - Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err) + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Code == 0 { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err) } } func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{ + Timeout: 3 * time.Second, + Response: "hello", + }) + if result.Status == UDPSuccess { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) } } func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) { - if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil { - Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err) + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status != UDPSuccess && result.Status != UDPError { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err) } } func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer - var err error if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { - var result bool - result, err = TestReachableHTTPWithContent(host, port, url, "", &body) - if err != nil { - Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err) - return false, nil + result := PokeHTTP(host, port, url, nil) + if result.Status == HTTPSuccess { + body.Write(result.Body) + return true, nil } - return result, nil + return false, nil }); pollErr != nil { - Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err) + Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr) } return body } @@ -930,7 +964,7 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err return false, fmt.Errorf("Invalid input ip or port") } Logf("Testing HTTP health check on %v", url) - resp, err := httpGetNoConnectionPool(url) + resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second) if err != nil { Logf("Got error testing for reachability of %s: %v", url, err) return false, err diff --git a/test/e2e/network/firewall.go b/test/e2e/network/firewall.go index db43181ab7..cfa3ffcaef 100644 --- a/test/e2e/network/firewall.go +++ b/test/e2e/network/firewall.go @@ -188,11 +188,11 @@ var _ = SIGDescribe("Firewall rule", func() { }) func assertNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) { - unreachable, err := framework.TestNotReachableHTTPTimeout(ip, port, timeout) - if err != nil { - framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, err) + result := framework.PokeHTTP(ip, port, "/", &framework.HTTPPokeParams{Timeout: timeout}) + if result.Status == framework.HTTPError { + framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, result.Error) } - if !unreachable { + if result.Code != 0 { framework.Failf("Was unexpectedly able to reach %s:%d", ip, port) } } diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index 3df7628386..a60fdd4138 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -2063,14 +2063,18 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() { for nodeName, nodeIPs := range endpointNodeMap { By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) var body bytes.Buffer - var result bool - var err error - if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, func() (bool, error) { - result, err = framework.TestReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body) - return !result, nil - }); pollErr != nil { - framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v", - nodeName, healthCheckNodePort, err, body.String()) + pollfn := func() (bool, error) { + result := framework.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil) + if result.Code == 0 { + return true, nil + } + body.Reset() + body.Write(result.Body) + return false, nil + } + if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, pollfn); pollErr != nil { + framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s", + nodeName, healthCheckNodePort, body.String()) } } From de25d6cb9577b00b24ca2baf0187b34f483a0d6c Mon Sep 17 00:00:00 2001 From: Tim Hockin Date: Mon, 18 Feb 2019 23:52:24 -0800 Subject: [PATCH 3/3] Kube-proxy: REJECT LB IPs with no endpoints We REJECT every other case. Close this FIXME. To get this to work in all cases, we have to process service in filter.INPUT, since LB IPS might be manged as local addresses. --- pkg/proxy/iptables/proxier.go | 23 +++++++++++--- test/e2e/framework/service_util.go | 49 ++++++++++++++++++++++++++++++ test/e2e/network/service.go | 40 ++++++++++++++++++++++-- 3 files changed, 105 insertions(+), 7 deletions(-) diff --git a/pkg/proxy/iptables/proxier.go b/pkg/proxy/iptables/proxier.go index a4f8dc8ef7..7633c6c9dc 100644 --- a/pkg/proxy/iptables/proxier.go +++ b/pkg/proxy/iptables/proxier.go @@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{ {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, + {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, @@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() { } writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() { // This covers cases like GCE load-balancers which get added to the local routing table. writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), @@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() { } // Capture load-balancer ingress. - if hasEndpoints { - fwChain := svcInfo.serviceFirewallChainName - for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { - if ingress.IP != "" { + fwChain := svcInfo.serviceFirewallChainName + for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { + if ingress.IP != "" { + if hasEndpoints { // create service firewall chain if chain, ok := existingNATChains[fwChain]; ok { writeBytesLine(proxier.natChains, chain) @@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() { // If the packet was able to reach the end of firewall chain, then it did not get DNATed. // It means the packet cannot go thru the firewall, then mark it for DROP writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) + } else { + // No endpoints. + writeLine(proxier.filterRules, + "-A", string(kubeServicesChain), + "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), + "-m", protocol, "-p", protocol, + "-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)), + "--dport", strconv.Itoa(svcInfo.Port), + "-j", "REJECT", + ) } } } - // FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints? // Capture nodeports. If we had more than 2 rules it might be // worthwhile to make a new per-service chain for nodeport rules, but @@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() { writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) } } else { + // No endpoints. writeLine(proxier.filterRules, "-A", string(kubeExternalServicesChain), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), diff --git a/test/e2e/framework/service_util.go b/test/e2e/framework/service_util.go index 172f52bd89..b98ddba72e 100644 --- a/test/e2e/framework/service_util.go +++ b/test/e2e/framework/service_util.go @@ -738,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati return result } +func (j *ServiceTestJig) Scale(namespace string, replicas int) { + rc := j.Name + scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{}) + if err != nil { + Failf("Failed to get scale for RC %q: %v", rc, err) + } + + scale.Spec.Replicas = int32(replicas) + _, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale) + if err != nil { + Failf("Failed to scale RC %q: %v", rc, err) + } + pods, err := j.waitForPodsCreated(namespace, replicas) + if err != nil { + Failf("Failed waiting for pods: %v", err) + } + if err := j.waitForPodsReady(namespace, pods); err != nil { + Failf("Failed waiting for pods to be running: %v", err) + } + return +} + func (j *ServiceTestJig) waitForPdbReady(namespace string) error { timeout := 2 * time.Minute for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { @@ -911,6 +933,20 @@ func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout tim } } +func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeHTTP(host, port, "/", nil) + if result.Status == HTTPRefused { + return true, nil + } + return false, nil // caller can retry + } + + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("HTTP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { pollfn := func() (bool, error) { result := PokeUDP(host, port, "echo hello", &UDPPokeParams{ @@ -941,6 +977,19 @@ func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time } } +func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) { + pollfn := func() (bool, error) { + result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second}) + if result.Status == UDPRefused { + return true, nil + } + return false, nil // caller can retry + } + if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil { + Failf("UDP service %v:%v not rejected: %v", host, port, err) + } +} + func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { var body bytes.Buffer if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { diff --git a/test/e2e/network/service.go b/test/e2e/network/service.go index a60fdd4138..55f743992d 100644 --- a/test/e2e/network/service.go +++ b/test/e2e/network/service.go @@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() { jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) By("hitting the TCP service's LoadBalancer") - jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) if loadBalancerSupportsUDP { By("hitting the UDP service's LoadBalancer") - jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 0") + jig.Scale(ns1, 0) + jig.Scale(ns2, 0) + + By("looking for ICMP REJECT on the TCP service's NodePort") + jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the UDP service's NodePort") + jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("looking for ICMP REJECT on the TCP service's LoadBalancer") + jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("looking for ICMP REJECT on the UDP service's LoadBalancer") + jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) + } + + By("Scaling the pods to 1") + jig.Scale(ns1, 1) + jig.Scale(ns2, 1) + + By("hitting the TCP service's NodePort") + jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the UDP service's NodePort") + jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) + + By("hitting the TCP service's LoadBalancer") + jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) + + if loadBalancerSupportsUDP { + By("hitting the UDP service's LoadBalancer") + jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) } // Change the services back to ClusterIP.