Merge pull request #74394 from thockin/proxy-reject-lb-no-endpoints

Kube-proxy: ICMP reject via LBs when no endpoints
pull/564/head
Kubernetes Prow Robot 2019-03-12 13:18:40 -07:00 committed by GitHub
commit f33e5e8f7a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 397 additions and 196 deletions

View File

@ -369,6 +369,7 @@ var iptablesJumpChains = []iptablesJumpChain{
{utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeExternalServicesChain, utiliptables.ChainInput, "kubernetes externally-visible service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainForward, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}}, {utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeServicesChain, utiliptables.ChainInput, "kubernetes service portals", []string{"-m", "conntrack", "--ctstate", "NEW"}},
{utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil}, {utiliptables.TableFilter, kubeForwardChain, utiliptables.ChainForward, "kubernetes forwarding rules", nil},
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainOutput, "kubernetes service portals", nil},
{utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil}, {utiliptables.TableNAT, kubeServicesChain, utiliptables.ChainPrerouting, "kubernetes service portals", nil},
@ -847,6 +848,7 @@ func (proxier *Proxier) syncProxyRules() {
} }
writeLine(proxier.natRules, append(args, "-j", string(svcChain))...) writeLine(proxier.natRules, append(args, "-j", string(svcChain))...)
} else { } else {
// No endpoints.
writeLine(proxier.filterRules, writeLine(proxier.filterRules,
"-A", string(kubeServicesChain), "-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -917,6 +919,7 @@ func (proxier *Proxier) syncProxyRules() {
// This covers cases like GCE load-balancers which get added to the local routing table. // This covers cases like GCE load-balancers which get added to the local routing table.
writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...) writeLine(proxier.natRules, append(dstLocalOnlyArgs, "-j", string(svcChain))...)
} else { } else {
// No endpoints.
writeLine(proxier.filterRules, writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
@ -929,10 +932,10 @@ func (proxier *Proxier) syncProxyRules() {
} }
// Capture load-balancer ingress. // Capture load-balancer ingress.
if hasEndpoints { fwChain := svcInfo.serviceFirewallChainName
fwChain := svcInfo.serviceFirewallChainName for _, ingress := range svcInfo.LoadBalancerStatus.Ingress {
for _, ingress := range svcInfo.LoadBalancerStatus.Ingress { if ingress.IP != "" {
if ingress.IP != "" { if hasEndpoints {
// create service firewall chain // create service firewall chain
if chain, ok := existingNATChains[fwChain]; ok { if chain, ok := existingNATChains[fwChain]; ok {
writeBytesLine(proxier.natChains, chain) writeBytesLine(proxier.natChains, chain)
@ -993,10 +996,19 @@ func (proxier *Proxier) syncProxyRules() {
// If the packet was able to reach the end of firewall chain, then it did not get DNATed. // If the packet was able to reach the end of firewall chain, then it did not get DNATed.
// It means the packet cannot go thru the firewall, then mark it for DROP // It means the packet cannot go thru the firewall, then mark it for DROP
writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...) writeLine(proxier.natRules, append(args, "-j", string(KubeMarkDropChain))...)
} else {
// No endpoints.
writeLine(proxier.filterRules,
"-A", string(kubeServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),
"-m", protocol, "-p", protocol,
"-d", utilproxy.ToCIDR(net.ParseIP(ingress.IP)),
"--dport", strconv.Itoa(svcInfo.Port),
"-j", "REJECT",
)
} }
} }
} }
// FIXME: do we need REJECT rules for load-balancer ingress if !hasEndpoints?
// Capture nodeports. If we had more than 2 rules it might be // Capture nodeports. If we had more than 2 rules it might be
// worthwhile to make a new per-service chain for nodeport rules, but // worthwhile to make a new per-service chain for nodeport rules, but
@ -1078,6 +1090,7 @@ func (proxier *Proxier) syncProxyRules() {
writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...) writeLine(proxier.natRules, append(args, "-j", string(svcXlbChain))...)
} }
} else { } else {
// No endpoints.
writeLine(proxier.filterRules, writeLine(proxier.filterRules,
"-A", string(kubeExternalServicesChain), "-A", string(kubeExternalServicesChain),
"-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString), "-m", "comment", "--comment", fmt.Sprintf(`"%s has no endpoints"`, svcNameString),

View File

@ -17,7 +17,6 @@ limitations under the License.
package framework package framework
import ( import (
"bytes"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
@ -708,13 +707,131 @@ func CheckReachabilityFromPod(expectToBeReachable bool, timeout time.Duration, n
ExpectNoError(err) ExpectNoError(err)
} }
// Does an HTTP GET, but does not reuse TCP connections type HTTPPokeParams struct {
// This masks problems where the iptables rule has changed, but we don't see it Timeout time.Duration
// This is intended for relatively quick requests (status checks), so we set a short (5 seconds) timeout ExpectCode int // default = 200
func httpGetNoConnectionPool(url string) (*http.Response, error) { BodyContains string
return httpGetNoConnectionPoolTimeout(url, 5*time.Second) RetriableCodes []int
} }
type HTTPPokeResult struct {
Status HTTPPokeStatus
Code int // HTTP code: 0 if the connection was not made
Error error // if there was any error
Body []byte // if code != 0
}
type HTTPPokeStatus string
const (
HTTPSuccess HTTPPokeStatus = "Success"
HTTPError HTTPPokeStatus = "UnknownError"
// Any time we add new errors, we should audit all callers of this.
HTTPTimeout HTTPPokeStatus = "TimedOut"
HTTPRefused HTTPPokeStatus = "ConnectionRefused"
HTTPRetryCode HTTPPokeStatus = "RetryCode"
HTTPWrongCode HTTPPokeStatus = "WrongCode"
HTTPBadResponse HTTPPokeStatus = "BadResponse"
)
// PokeHTTP tries to connect to a host on a port for a given URL path. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result code will be zero in case of any failure to connect, or non-zero
// if the HTTP transaction completed (even if the other test params make this a
// failure).
//
// The result error will be populated for any status other than Success.
//
// The result body will be populated if the HTTP transaction was completed, even
// if the other test params make this a failure).
func PokeHTTP(host string, port int, path string, params *HTTPPokeParams) HTTPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("http://%s%s", hostPort, path)
ret := HTTPPokeResult{}
// Sanity check inputs, because it has happened. These are the only things
// that should hard fail the test - they are basically ASSERT()s.
if host == "" {
Failf("Got empty host for HTTP poke (%s)", url)
return ret
}
if port == 0 {
Failf("Got port==0 for HTTP poke (%s)", url)
return ret
}
// Set default params.
if params == nil {
params = &HTTPPokeParams{}
}
if params.ExpectCode == 0 {
params.ExpectCode = http.StatusOK
}
Logf("Poking %q", url)
resp, err := httpGetNoConnectionPoolTimeout(url, params.Timeout)
if err != nil {
ret.Error = err
neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = HTTPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = HTTPRefused
} else {
ret.Status = HTTPError
}
Logf("Poke(%q): %v", url, err)
return ret
}
ret.Code = resp.StatusCode
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
ret.Status = HTTPError
ret.Error = fmt.Errorf("error reading HTTP body: %v", err)
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Body = make([]byte, len(body))
copy(ret.Body, body)
if resp.StatusCode != params.ExpectCode {
for _, code := range params.RetriableCodes {
if resp.StatusCode == code {
ret.Error = fmt.Errorf("retriable status code: %d", resp.StatusCode)
ret.Status = HTTPRetryCode
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
}
ret.Status = HTTPWrongCode
ret.Error = fmt.Errorf("bad status code: %d", resp.StatusCode)
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
if params.BodyContains != "" && !strings.Contains(string(body), params.BodyContains) {
ret.Status = HTTPBadResponse
ret.Error = fmt.Errorf("response does not contain expected substring: %q", string(body))
Logf("Poke(%q): %v", url, ret.Error)
return ret
}
ret.Status = HTTPSuccess
Logf("Poke(%q): success", url)
return ret
}
// Does an HTTP GET, but does not reuse TCP connections
// This masks problems where the iptables rule has changed, but we don't see it
func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) { func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Response, error) {
tr := utilnet.SetTransportDefaults(&http.Transport{ tr := utilnet.SetTransportDefaults(&http.Transport{
DisableKeepAlives: true, DisableKeepAlives: true,
@ -727,178 +844,126 @@ func httpGetNoConnectionPoolTimeout(url string, timeout time.Duration) (*http.Re
return client.Get(url) return client.Get(url)
} }
func TestReachableHTTP(ip string, port int, request string, expect string) (bool, error) { type UDPPokeParams struct {
return TestReachableHTTPWithContent(ip, port, request, expect, nil) Timeout time.Duration
Response string
} }
func TestReachableHTTPWithRetriableErrorCodes(ip string, port int, request string, expect string, retriableErrCodes []int) (bool, error) { type UDPPokeResult struct {
return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, nil, retriableErrCodes, time.Second*5) Status UDPPokeStatus
Error error // if there was any error
Response []byte // if code != 0
} }
func TestReachableHTTPWithContent(ip string, port int, request string, expect string, content *bytes.Buffer) (bool, error) { type UDPPokeStatus string
return TestReachableHTTPWithContentTimeout(ip, port, request, expect, content, 5*time.Second)
}
func TestReachableHTTPWithContentTimeout(ip string, port int, request string, expect string, content *bytes.Buffer, timeout time.Duration) (bool, error) { const (
return TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip, port, request, expect, content, []int{}, timeout) UDPSuccess UDPPokeStatus = "Success"
} UDPError UDPPokeStatus = "UnknownError"
// Any time we add new errors, we should audit all callers of this.
UDPTimeout UDPPokeStatus = "TimedOut"
UDPRefused UDPPokeStatus = "ConnectionRefused"
UDPBadResponse UDPPokeStatus = "BadResponse"
)
func TestReachableHTTPWithContentTimeoutWithRetriableErrorCodes(ip string, port int, request string, expect string, content *bytes.Buffer, retriableErrCodes []int, timeout time.Duration) (bool, error) { // PokeUDP tries to connect to a host on a port and send the given request. Callers
// can specify additional success parameters, if desired.
//
// The result status will be characterized as precisely as possible, given the
// known users of this.
//
// The result error will be populated for any status other than Success.
//
// The result response will be populated if the UDP transaction was completed, even
// if the other test params make this a failure).
func PokeUDP(host string, port int, request string, params *UDPPokeParams) UDPPokeResult {
hostPort := net.JoinHostPort(host, strconv.Itoa(port))
url := fmt.Sprintf("udp://%s", hostPort)
ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) ret := UDPPokeResult{}
url := fmt.Sprintf("http://%s%s", ipPort, request)
if ip == "" { // Sanity check inputs, because it has happened. These are the only things
Failf("Got empty IP for reachability check (%s)", url) // that should hard fail the test - they are basically ASSERT()s.
return false, nil if host == "" {
Failf("Got empty host for UDP poke (%s)", url)
return ret
} }
if port == 0 { if port == 0 {
Failf("Got port==0 for reachability check (%s)", url) Failf("Got port==0 for UDP poke (%s)", url)
return false, nil return ret
} }
Logf("Testing HTTP reachability of %v", url) // Set default params.
if params == nil {
params = &UDPPokeParams{}
}
resp, err := httpGetNoConnectionPoolTimeout(url, timeout) Logf("Poking %v", url)
con, err := net.Dial("udp", hostPort)
if err != nil { if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err) ret.Status = UDPError
return false, nil ret.Error = err
Logf("Poke(%q): %v", url, err)
return ret
} }
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body) _, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil { if err != nil {
Logf("Got error reading response from %s: %v", url, err) ret.Error = err
return false, nil neterr, ok := err.(net.Error)
} if ok && neterr.Timeout() {
if resp.StatusCode != 200 { ret.Status = UDPTimeout
for _, code := range retriableErrCodes { } else if strings.Contains(err.Error(), "connection refused") {
if resp.StatusCode == code { ret.Status = UDPRefused
Logf("Got non-success status %q when trying to access %s, but the error code is retriable", resp.Status, url) } else {
return false, nil ret.Status = UDPError
}
} }
return false, fmt.Errorf("received non-success return status %q trying to access %s; got body: %s", Logf("Poke(%q): %v", url, err)
resp.Status, url, string(body)) return ret
}
if !strings.Contains(string(body), expect) {
return false, fmt.Errorf("received response body without expected substring %q: %s", expect, string(body))
}
if content != nil {
content.Write(body)
}
return true, nil
}
func TestNotReachableHTTP(ip string, port int) (bool, error) {
return TestNotReachableHTTPTimeout(ip, port, 5*time.Second)
}
func TestNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
url := fmt.Sprintf("http://%s", ipPort)
if ip == "" {
Failf("Got empty IP for non-reachability check (%s)", url)
return false, nil
}
if port == 0 {
Failf("Got port==0 for non-reachability check (%s)", url)
return false, nil
} }
Logf("Testing HTTP non-reachability of %v", url) if params.Timeout != 0 {
err = con.SetDeadline(time.Now().Add(params.Timeout))
if err != nil {
ret.Status = UDPError
ret.Error = err
Logf("Poke(%q): %v", url, err)
return ret
}
}
resp, err := httpGetNoConnectionPoolTimeout(url, timeout) bufsize := len(params.Response) + 1
if bufsize == 0 {
bufsize = 4096
}
var buf []byte = make([]byte, bufsize)
n, err := con.Read(buf)
if err != nil { if err != nil {
Logf("Confirmed that %s is not reachable", url) ret.Error = err
return true, nil neterr, ok := err.(net.Error)
if ok && neterr.Timeout() {
ret.Status = UDPTimeout
} else if strings.Contains(err.Error(), "connection refused") {
ret.Status = UDPRefused
} else {
ret.Status = UDPError
}
Logf("Poke(%q): %v", url, err)
return ret
} }
resp.Body.Close() ret.Response = buf[0:n]
return false, nil
}
func TestReachableUDP(ip string, port int, request string, expect string) (bool, error) { if params.Response != "" && string(ret.Response) != params.Response {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port)) ret.Status = UDPBadResponse
uri := fmt.Sprintf("udp://%s", ipPort) ret.Error = fmt.Errorf("response does not match expected string: %q", string(ret.Response))
if ip == "" { Logf("Poke(%q): %v", url, ret.Error)
Failf("Got empty IP for reachability check (%s)", uri) return ret
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
} }
Logf("Testing UDP reachability of %v", uri) ret.Status = UDPSuccess
Logf("Poke(%q): success", url)
con, err := net.Dial("udp", ipPort) return ret
if err != nil {
return false, fmt.Errorf("Failed to dial %s: %v", ipPort, err)
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
return false, fmt.Errorf("Failed to send request: %v", err)
}
var buf []byte = make([]byte, len(expect)+1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
return false, nil
}
if !strings.Contains(string(buf), expect) {
return false, fmt.Errorf("Failed to retrieve %q, got %q", expect, string(buf))
}
Logf("Successfully reached %v", uri)
return true, nil
}
func TestNotReachableUDP(ip string, port int, request string) (bool, error) {
ipPort := net.JoinHostPort(ip, strconv.Itoa(port))
uri := fmt.Sprintf("udp://%s", ipPort)
if ip == "" {
Failf("Got empty IP for reachability check (%s)", uri)
return false, nil
}
if port == 0 {
Failf("Got port==0 for reachability check (%s)", uri)
return false, nil
}
Logf("Testing UDP non-reachability of %v", uri)
con, err := net.Dial("udp", ipPort)
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
_, err = con.Write([]byte(fmt.Sprintf("%s\n", request)))
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
var buf []byte = make([]byte, 1)
err = con.SetDeadline(time.Now().Add(3 * time.Second))
if err != nil {
return false, fmt.Errorf("Failed to set deadline: %v", err)
}
_, err = con.Read(buf)
if err != nil {
Logf("Confirmed that %s is not reachable", uri)
return true, nil
}
return false, nil
} }
func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error { func TestHitNodesFromOutside(externalIP string, httpPort int32, timeout time.Duration, expectedHosts sets.String) error {
@ -911,13 +976,12 @@ func TestHitNodesFromOutsideWithCount(externalIP string, httpPort int32, timeout
hittedHosts := sets.NewString() hittedHosts := sets.NewString()
count := 0 count := 0
condition := func() (bool, error) { condition := func() (bool, error) {
var respBody bytes.Buffer result := PokeHTTP(externalIP, int(httpPort), "/hostname", &HTTPPokeParams{Timeout: 1 * time.Second})
reached, err := TestReachableHTTPWithContentTimeout(externalIP, int(httpPort), "/hostname", "", &respBody, if result.Status != HTTPSuccess {
1*time.Second)
if err != nil || !reached {
return false, nil return false, nil
} }
hittedHost := strings.TrimSpace(respBody.String())
hittedHost := strings.TrimSpace(string(result.Body))
if !expectedHosts.Has(hittedHost) { if !expectedHosts.Has(hittedHost) {
Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count) Logf("Error hitting unexpected host: %v, reset counter: %v", hittedHost, count)
count = 0 count = 0

View File

@ -623,6 +623,7 @@ func (j *ServiceTestJig) waitForConditionOrFail(namespace, name string, timeout
// name as the jig and runs the "netexec" container. // name as the jig and runs the "netexec" container.
func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController { func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationController {
var replicas int32 = 1 var replicas int32 = 1
var grace int64 = 3 // so we don't race with kube-proxy when scaling up/down
rc := &v1.ReplicationController{ rc := &v1.ReplicationController{
ObjectMeta: metav1.ObjectMeta{ ObjectMeta: metav1.ObjectMeta{
@ -654,7 +655,7 @@ func (j *ServiceTestJig) newRCTemplate(namespace string) *v1.ReplicationControll
}, },
}, },
}, },
TerminationGracePeriodSeconds: new(int64), TerminationGracePeriodSeconds: &grace,
}, },
}, },
}, },
@ -737,6 +738,28 @@ func (j *ServiceTestJig) RunOrFail(namespace string, tweak func(rc *v1.Replicati
return result return result
} }
func (j *ServiceTestJig) Scale(namespace string, replicas int) {
rc := j.Name
scale, err := j.Client.CoreV1().ReplicationControllers(namespace).GetScale(rc, metav1.GetOptions{})
if err != nil {
Failf("Failed to get scale for RC %q: %v", rc, err)
}
scale.Spec.Replicas = int32(replicas)
_, err = j.Client.CoreV1().ReplicationControllers(namespace).UpdateScale(rc, scale)
if err != nil {
Failf("Failed to scale RC %q: %v", rc, err)
}
pods, err := j.waitForPodsCreated(namespace, replicas)
if err != nil {
Failf("Failed waiting for pods: %v", err)
}
if err := j.waitForPodsReady(namespace, pods); err != nil {
Failf("Failed waiting for pods to be running: %v", err)
}
return
}
func (j *ServiceTestJig) waitForPdbReady(namespace string) error { func (j *ServiceTestJig) waitForPdbReady(namespace string) error {
timeout := 2 * time.Minute timeout := 2 * time.Minute
for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) { for start := time.Now(); time.Since(start) < timeout; time.Sleep(2 * time.Second) {
@ -875,9 +898,19 @@ func (j *ServiceTestJig) TestReachableHTTP(host string, port int, timeout time.D
} }
func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) { func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, port int, retriableErrCodes []int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { pollfn := func() (bool, error) {
return TestReachableHTTPWithRetriableErrorCodes(host, port, "/echo?msg=hello", "hello", retriableErrCodes) result := PokeHTTP(host, port, "/echo?msg=hello",
}); err != nil { &HTTPPokeParams{
BodyContains: "hello",
RetriableCodes: retriableErrCodes,
})
if result.Status == HTTPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
if err == wait.ErrWaitTimeout { if err == wait.ErrWaitTimeout {
Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout) Failf("Could not reach HTTP service through %v:%v after %v", host, port, timeout)
} else { } else {
@ -887,36 +920,87 @@ func (j *ServiceTestJig) TestReachableHTTPWithRetriableErrorCodes(host string, p
} }
func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) { func (j *ServiceTestJig) TestNotReachableHTTP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableHTTP(host, port) }); err != nil { pollfn := func() (bool, error) {
Failf("Could still reach HTTP service through %v:%v after %v: %v", host, port, timeout, err) result := PokeHTTP(host, port, "/", nil)
if result.Code == 0 {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestRejectedHTTP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeHTTP(host, port, "/", nil)
if result.Status == HTTPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("HTTP service %v:%v not rejected: %v", host, port, err)
} }
} }
func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) { func (j *ServiceTestJig) TestReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestReachableUDP(host, port, "echo hello", "hello") }); err != nil { pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{
Timeout: 3 * time.Second,
Response: "hello",
})
if result.Status == UDPSuccess {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err) Failf("Could not reach UDP service through %v:%v after %v: %v", host, port, timeout, err)
} }
} }
func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) { func (j *ServiceTestJig) TestNotReachableUDP(host string, port int, timeout time.Duration) {
if err := wait.PollImmediate(Poll, timeout, func() (bool, error) { return TestNotReachableUDP(host, port, "echo hello") }); err != nil { pollfn := func() (bool, error) {
Failf("Could still reach UDP service through %v:%v after %v: %v", host, port, timeout, err) result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status != UDPSuccess && result.Status != UDPError {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v reachable after %v: %v", host, port, timeout, err)
}
}
func (j *ServiceTestJig) TestRejectedUDP(host string, port int, timeout time.Duration) {
pollfn := func() (bool, error) {
result := PokeUDP(host, port, "echo hello", &UDPPokeParams{Timeout: 3 * time.Second})
if result.Status == UDPRefused {
return true, nil
}
return false, nil // caller can retry
}
if err := wait.PollImmediate(Poll, timeout, pollfn); err != nil {
Failf("UDP service %v:%v not rejected: %v", host, port, err)
} }
} }
func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer { func (j *ServiceTestJig) GetHTTPContent(host string, port int, timeout time.Duration, url string) bytes.Buffer {
var body bytes.Buffer var body bytes.Buffer
var err error
if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) { if pollErr := wait.PollImmediate(Poll, timeout, func() (bool, error) {
var result bool result := PokeHTTP(host, port, url, nil)
result, err = TestReachableHTTPWithContent(host, port, url, "", &body) if result.Status == HTTPSuccess {
if err != nil { body.Write(result.Body)
Logf("Error hitting %v:%v%v, retrying: %v", host, port, url, err) return true, nil
return false, nil
} }
return result, nil return false, nil
}); pollErr != nil { }); pollErr != nil {
Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, err) Failf("Could not reach HTTP service through %v:%v%v after %v: %v", host, port, url, timeout, pollErr)
} }
return body return body
} }
@ -929,7 +1013,7 @@ func testHTTPHealthCheckNodePort(ip string, port int, request string) (bool, err
return false, fmt.Errorf("Invalid input ip or port") return false, fmt.Errorf("Invalid input ip or port")
} }
Logf("Testing HTTP health check on %v", url) Logf("Testing HTTP health check on %v", url)
resp, err := httpGetNoConnectionPool(url) resp, err := httpGetNoConnectionPoolTimeout(url, 5*time.Second)
if err != nil { if err != nil {
Logf("Got error testing for reachability of %s: %v", url, err) Logf("Got error testing for reachability of %s: %v", url, err)
return false, err return false, err

View File

@ -188,11 +188,11 @@ var _ = SIGDescribe("Firewall rule", func() {
}) })
func assertNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) { func assertNotReachableHTTPTimeout(ip string, port int, timeout time.Duration) {
unreachable, err := framework.TestNotReachableHTTPTimeout(ip, port, timeout) result := framework.PokeHTTP(ip, port, "/", &framework.HTTPPokeParams{Timeout: timeout})
if err != nil { if result.Status == framework.HTTPError {
framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, err) framework.Failf("Unexpected error checking for reachability of %s:%d: %v", ip, port, result.Error)
} }
if !unreachable { if result.Code != 0 {
framework.Failf("Was unexpectedly able to reach %s:%d", ip, port) framework.Failf("Was unexpectedly able to reach %s:%d", ip, port)
} }
} }

View File

@ -791,11 +791,47 @@ var _ = SIGDescribe("Services", func() {
jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout) jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("hitting the TCP service's LoadBalancer") By("hitting the TCP service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP { if loadBalancerSupportsUDP {
By("hitting the UDP service's LoadBalancer") By("hitting the UDP service's LoadBalancer")
jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout) // this may actually recreate the LB) jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
}
By("Scaling the pods to 0")
jig.Scale(ns1, 0)
jig.Scale(ns2, 0)
By("looking for ICMP REJECT on the TCP service's NodePort")
jig.TestRejectedHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout)
By("looking for ICMP REJECT on the UDP service's NodePort")
jig.TestRejectedUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("looking for ICMP REJECT on the TCP service's LoadBalancer")
jig.TestRejectedHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP {
By("looking for ICMP REJECT on the UDP service's LoadBalancer")
jig.TestRejectedUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
}
By("Scaling the pods to 1")
jig.Scale(ns1, 1)
jig.Scale(ns2, 1)
By("hitting the TCP service's NodePort")
jig.TestReachableHTTP(nodeIP, tcpNodePort, framework.KubeProxyLagTimeout)
By("hitting the UDP service's NodePort")
jig.TestReachableUDP(nodeIP, udpNodePort, framework.KubeProxyLagTimeout)
By("hitting the TCP service's LoadBalancer")
jig.TestReachableHTTP(tcpIngressIP, svcPort, loadBalancerCreateTimeout)
if loadBalancerSupportsUDP {
By("hitting the UDP service's LoadBalancer")
jig.TestReachableUDP(udpIngressIP, svcPort, loadBalancerCreateTimeout)
} }
// Change the services back to ClusterIP. // Change the services back to ClusterIP.
@ -2063,14 +2099,18 @@ var _ = SIGDescribe("ESIPP [Slow] [DisabledForLargeClusters]", func() {
for nodeName, nodeIPs := range endpointNodeMap { for nodeName, nodeIPs := range endpointNodeMap {
By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0])) By(fmt.Sprintf("checking kube-proxy health check fails on node with endpoint (%s), public IP %s", nodeName, nodeIPs[0]))
var body bytes.Buffer var body bytes.Buffer
var result bool pollfn := func() (bool, error) {
var err error result := framework.PokeHTTP(nodeIPs[0], healthCheckNodePort, "/healthz", nil)
if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, func() (bool, error) { if result.Code == 0 {
result, err = framework.TestReachableHTTPWithContent(nodeIPs[0], healthCheckNodePort, "/healthz", "", &body) return true, nil
return !result, nil }
}); pollErr != nil { body.Reset()
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. Last err %v, last body %v", body.Write(result.Body)
nodeName, healthCheckNodePort, err, body.String()) return false, nil
}
if pollErr := wait.PollImmediate(framework.Poll, framework.ServiceTestTimeout, pollfn); pollErr != nil {
framework.Failf("Kube-proxy still exposing health check on node %v:%v, after ESIPP was turned off. body %s",
nodeName, healthCheckNodePort, body.String())
} }
} }