Merge pull request #38592 from krousey/client-context

Automatic merge from submit-queue (batch tested with PRs 38592, 39949, 39946, 39882)

Add optional per-request context to restclient

**What this PR does / why we need it**: It adds per-request contexts to restclient's API, and uses them to add timeouts to all proxy calls in the e2e tests. An entire e2e shouldn't hang for hours on a single API call.

**Which issue this PR fixes**: #38305

**Special notes for your reviewer**:

This adds a feature to the low-level rest client request feature that is entirely optional. It doesn't affect any requests that don't use it. The api of the generated clients does not change, and they currently don't take advantage of this.

I intend to patch this in to 1.5 as a mostly test only change since it's not going to affect any controller, generated client, or user of the generated client.


cc @kubernetes/sig-api-machinery 
cc @saad-ali
pull/6/head
Kubernetes Submit Queue 2017-01-16 10:37:38 -08:00 committed by GitHub
commit 4811ad0231
15 changed files with 208 additions and 12 deletions

View File

@ -18,6 +18,7 @@ package restclient
import (
"bytes"
"context"
"encoding/hex"
"fmt"
"io"
@ -105,16 +106,14 @@ type Request struct {
resource string
resourceName string
subresource string
selector labels.Selector
timeout time.Duration
// output
err error
body io.Reader
// The constructed request and the response
req *http.Request
resp *http.Response
// This is only used for per-request timeouts, deadlines, and cancellations.
ctx context.Context
backoffMgr BackoffManager
throttle flowcontrol.RateLimiter
@ -566,6 +565,13 @@ func (r *Request) Body(obj interface{}) *Request {
return r
}
// Context adds a context to the request. Contexts are only used for
// timeouts, deadlines, and cancellations.
func (r *Request) Context(ctx context.Context) *Request {
r.ctx = ctx
return r
}
// URL returns the current working URL.
func (r *Request) URL() *url.URL {
p := r.pathPrefix
@ -651,6 +657,9 @@ func (r *Request) Watch() (watch.Interface, error) {
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
if client == nil {
@ -720,6 +729,9 @@ func (r *Request) Stream() (io.ReadCloser, error) {
if err != nil {
return nil, err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
client := r.client
if client == nil {
@ -794,6 +806,9 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
if err != nil {
return err
}
if r.ctx != nil {
req = req.WithContext(r.ctx)
}
req.Header = r.headers
r.backoffMgr.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))

View File

@ -18,6 +18,7 @@ package restclient
import (
"bytes"
"context"
"errors"
"fmt"
"io"
@ -1651,3 +1652,32 @@ func testRESTClient(t testing.TB, srv *httptest.Server) *RESTClient {
}
return client
}
func TestDoContext(t *testing.T) {
receivedCh := make(chan struct{})
block := make(chan struct{})
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
close(receivedCh)
<-block
w.WriteHeader(http.StatusOK)
}))
defer testServer.Close()
defer close(block)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
<-receivedCh
cancel()
}()
c := testRESTClient(t, testServer)
_, err := c.Verb("GET").
Context(ctx).
Prefix("foo").
DoRaw()
if err == nil {
t.Fatal("Expected context cancellation error")
}
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"fmt"
"strconv"
"time"
@ -203,7 +204,12 @@ func (rc *ResourceConsumer) makeConsumeCustomMetric() {
func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName).
Suffix("ConsumeCPU").
Param("millicores", strconv.Itoa(millicores)).
@ -218,7 +224,12 @@ func (rc *ResourceConsumer) sendConsumeCPURequest(millicores int) {
func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName).
Suffix("ConsumeMem").
Param("megabytes", strconv.Itoa(megabytes)).
@ -233,7 +244,12 @@ func (rc *ResourceConsumer) sendConsumeMemRequest(megabytes int) {
func (rc *ResourceConsumer) sendConsumeCustomMetric(delta int) {
proxyRequest, err := framework.GetServicesProxyRequest(rc.framework.ClientSet, rc.framework.ClientSet.Core().RESTClient().Post())
framework.ExpectNoError(err)
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
req := proxyRequest.Namespace(rc.framework.Namespace.Name).
Context(ctx).
Name(rc.controllerName).
Suffix("BumpMetric").
Param("metric", customMetricName).

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"encoding/json"
"fmt"
"strconv"
@ -122,14 +123,23 @@ func checkElasticsearchReadiness(f *framework.Framework) error {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the root URL for Elasticsearch.
response := proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Do()
err = response.Error()
response.StatusCode(&statusCode)
if err != nil {
if ctx.Err() != nil {
framework.Failf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
framework.Logf("After %v proxy call to elasticsearch-loigging failed: %v", time.Since(start), err)
continue
}
@ -154,12 +164,20 @@ func checkElasticsearchReadiness(f *framework.Framework) error {
framework.Logf("After %v failed to get services proxy request: %v", time.Since(start), errProxy)
continue
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err = proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Suffix("_cluster/health").
Param("level", "indices").
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to get cluster health from elasticsearch: %v", err)
}
continue
}
health := make(map[string]interface{})
@ -196,9 +214,13 @@ func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int
return 0, fmt.Errorf("Failed to get services proxy request: %v", errProxy)
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Ask Elasticsearch to return all the log lines that were tagged with the
// pod name. Ask for ten times as many log lines because duplication is possible.
body, err := proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("elasticsearch-logging").
Suffix("_search").
// TODO: Change filter to only match records from current test run
@ -208,6 +230,9 @@ func getMissingLinesCountElasticsearch(f *framework.Framework, expectedCount int
Param("size", strconv.Itoa(expectedCount*10)).
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to make proxy call to elasticsearch-logging: %v", err)
}
return 0, fmt.Errorf("Failed to make proxy call to elasticsearch-logging: %v", err)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"net/http"
"time"
@ -58,17 +59,26 @@ var _ = framework.KubeDescribe("Kubernetes Dashboard", func() {
if errProxy != nil {
framework.Logf("Get services proxy request failed: %v", errProxy)
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the proxy URL for the kube-ui service.
err := proxyRequest.Namespace(uiNamespace).
Context(ctx).
Name(uiServiceName).
Timeout(framework.SingleCallTimeout).
Do().
StatusCode(&status).
Error()
if status != http.StatusOK {
framework.Logf("Unexpected status from kubernetes-dashboard: %v", status)
} else if err != nil {
if err != nil {
if ctx.Err() != nil {
framework.Failf("Request to kube-ui failed: %v", err)
return true, err
}
framework.Logf("Request to kube-ui failed: %v", err)
} else if status != http.StatusOK {
framework.Logf("Unexpected status from kubernetes-dashboard: %v", status)
}
// Don't return err here as it aborts polling.
return status == http.StatusOK, nil

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"fmt"
"strings"
"time"
@ -184,10 +185,15 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
if err != nil {
return false, err
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var contents []byte
for _, fileName := range fileNames {
if subResourceProxyAvailable {
contents, err = client.Core().RESTClient().Get().
Context(ctx).
Namespace(pod.Namespace).
Resource("pods").
SubResource("proxy").
@ -196,6 +202,7 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
Do().Raw()
} else {
contents, err = client.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy").
Resource("pods").
Namespace(pod.Namespace).
@ -204,7 +211,11 @@ func assertFilesContain(fileNames []string, fileDir string, pod *v1.Pod, client
Do().Raw()
}
if err != nil {
framework.Logf("Unable to read %s from pod %s: %v", fileName, pod.Name, err)
if ctx.Err() != nil {
framework.Failf("Unable to read %s from pod %s: %v", fileName, pod.Name, err)
} else {
framework.Logf("Unable to read %s from pod %s: %v", fileName, pod.Name, err)
}
failed = append(failed, fileName)
} else if check && strings.TrimSpace(string(contents)) != expected {
framework.Logf("File %s from pod %s contains '%s' instead of '%s'", fileName, pod.Name, string(contents), expected)

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"fmt"
"os"
"os/exec"
@ -52,16 +53,25 @@ func readTransactions(c clientset.Interface, ns string) (error, int) {
if errProxy != nil {
return errProxy, -1
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name("frontend").
Suffix("llen").
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to read petstore transactions: %v", err)
}
return err, -1
} else {
totalTrans, err := strconv.Atoi(string(body))
return err, totalTrans
}
totalTrans, err := strconv.Atoi(string(body))
return err, totalTrans
}
// runK8petstore runs the k8petstore application, bound to external nodeport, and
@ -150,7 +160,7 @@ T:
// We should have exceeded the finalTransactionsExpected num of transactions.
// If this fails, but there are transactions being created, we may need to recalibrate
// the finalTransactionsExpected value - or else - your cluster is broken/slow !
Ω(totalTransactions).Should(BeNumerically(">", finalTransactionsExpected))
Expect(totalTransactions).To(BeNumerically(">", finalTransactionsExpected))
}
var _ = framework.KubeDescribe("Pet Store [Feature:Example]", func() {

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"fmt"
"io/ioutil"
"os"
@ -580,7 +581,12 @@ func makeHttpRequestToService(c clientset.Interface, ns, service, path string, t
if errProxy != nil {
break
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err = proxyRequest.Namespace(ns).
Context(ctx).
Name(service).
Suffix(path).
Do().

View File

@ -18,6 +18,7 @@ package framework
import (
"bytes"
"context"
"encoding/json"
"fmt"
"sort"
@ -288,9 +289,13 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
var data []byte
if subResourceProxyAvailable {
data, err = c.Core().RESTClient().Post().
Context(ctx).
Resource("nodes").
SubResource("proxy").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).
@ -301,6 +306,7 @@ func getContainerInfo(c clientset.Interface, nodeName string, req *kubeletstats.
} else {
data, err = c.Core().RESTClient().Post().
Context(ctx).
Prefix("proxy").
Resource("nodes").
Name(fmt.Sprintf("%v:%v", nodeName, ports.KubeletPort)).

View File

@ -18,6 +18,7 @@ package framework
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
@ -335,7 +336,11 @@ func getSchedulingLatency(c clientset.Interface) (SchedulingLatency, error) {
}
}
if masterRegistered {
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
rawData, err := c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy").
Namespace(api.NamespaceSystem).
Resource("pods").

View File

@ -18,6 +18,7 @@ package framework
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -1555,9 +1556,14 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
if err != nil {
return false, err
}
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
var body []byte
if subResourceProxyAvailable {
body, err = r.c.Core().RESTClient().Get().
Context(ctx).
Namespace(r.ns).
Resource("pods").
SubResource("proxy").
@ -1566,6 +1572,7 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
Raw()
} else {
body, err = r.c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy").
Namespace(r.ns).
Resource("pods").
@ -1574,6 +1581,10 @@ func (r podProxyResponseChecker) CheckAllResponses() (done bool, err error) {
Raw()
}
if err != nil {
if ctx.Err() != nil {
Failf("Controller %s: Failed to Get from replica %d [%s]: %v\n pod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
return false, err
}
Logf("Controller %s: Failed to GET from replica %d [%s]: %v\npod status: %#v", r.controllerName, i+1, pod.Name, err, pod.Status)
continue
}
@ -1743,11 +1754,20 @@ func ServiceResponding(c clientset.Interface, ns, name string) error {
Logf("Failed to get services proxy request: %v:", errProxy)
return false, nil
}
ctx, cancel := context.WithTimeout(context.Background(), SingleCallTimeout)
defer cancel()
body, err := proxyRequest.Namespace(ns).
Context(ctx).
Name(name).
Do().
Raw()
if err != nil {
if ctx.Err() != nil {
Failf("Failed to GET from service %s: %v", name, err)
return true, err
}
Logf("Failed to GET from service %s: %v:", name, err)
return false, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"time"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -88,11 +89,20 @@ func ClusterLevelLoggingWithKibana(f *framework.Framework) {
err = errProxy
continue
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
// Query against the root URL for Kibana.
_, err = proxyRequest.Namespace(api.NamespaceSystem).
Context(ctx).
Name("kibana-logging").
DoRaw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
break
}
framework.Logf("After %v proxy call to kibana-logging failed: %v", time.Since(start), err)
continue
}

View File

@ -18,6 +18,7 @@ package e2e
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -1671,7 +1672,12 @@ func makeRequestToGuestbook(c clientset.Interface, cmd, value string, ns string)
if errProxy != nil {
return "", errProxy
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err := proxyRequest.Namespace(ns).
Context(ctx).
Name("frontend").
Suffix("/guestbook.php").
Param("cmd", cmd).
@ -1771,6 +1777,10 @@ func getUDData(jpgExpected string, ns string) func(clientset.Interface, string)
if err != nil {
return err
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var body []byte
if subResourceProxyAvailable {
body, err = c.Core().RESTClient().Get().
@ -1792,6 +1802,9 @@ func getUDData(jpgExpected string, ns string) func(clientset.Interface, string)
Raw()
}
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to retrieve data from container: %v", err)
}
return err
}
framework.Logf("got data: %s", body)

View File

@ -18,6 +18,7 @@ package e2e
import (
"bytes"
"context"
"encoding/json"
"fmt"
"time"
@ -63,6 +64,10 @@ var (
// Query sends a command to the server and returns the Response
func Query(c clientset.Interface, query string) (*influxdb.Response, error) {
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
result, err := c.Core().RESTClient().Get().
Prefix("proxy").
Namespace("kube-system").
@ -76,6 +81,9 @@ func Query(c clientset.Interface, query string) (*influxdb.Response, error) {
Raw()
if err != nil {
if ctx.Err() != nil {
framework.Failf("Failed to query influx db: %v", err)
}
return nil, err
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package e2e
import (
"context"
"encoding/json"
"fmt"
"time"
@ -126,9 +127,14 @@ func testPreStop(c clientset.Interface, ns string) {
if err != nil {
return false, err
}
ctx, cancel := context.WithTimeout(context.Background(), framework.SingleCallTimeout)
defer cancel()
var body []byte
if subResourceProxyAvailable {
body, err = c.Core().RESTClient().Get().
Context(ctx).
Namespace(ns).
Resource("pods").
SubResource("proxy").
@ -137,6 +143,7 @@ func testPreStop(c clientset.Interface, ns string) {
DoRaw()
} else {
body, err = c.Core().RESTClient().Get().
Context(ctx).
Prefix("proxy").
Namespace(ns).
Resource("pods").
@ -145,6 +152,10 @@ func testPreStop(c clientset.Interface, ns string) {
DoRaw()
}
if err != nil {
if ctx.Err() != nil {
framework.Failf("Error validating prestop: %v", err)
return true, err
}
By(fmt.Sprintf("Error validating prestop: %v", err))
} else {
framework.Logf("Saw: %s", string(body))