Exponential backoff for request client, rebased. Updated license to

2015, cleaned more //[a-z] comments. Added in support for Environment
variable gaurds over the backcoff w/ default NoBackoff. Rebased.
pull/6/head
Jay Vyas 2015-11-19 14:23:11 -05:00
parent 929ab32865
commit 76e6281168
7 changed files with 332 additions and 11 deletions

View File

@ -50,23 +50,23 @@ type RESTClient struct {
} }
func (c *RESTClient) Get() *unversioned.Request { func (c *RESTClient) Get() *unversioned.Request {
return unversioned.NewRequest(c, "GET", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) return unversioned.NewRequest(c, "GET", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil)
} }
func (c *RESTClient) Put() *unversioned.Request { func (c *RESTClient) Put() *unversioned.Request {
return unversioned.NewRequest(c, "PUT", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) return unversioned.NewRequest(c, "PUT", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil)
} }
func (c *RESTClient) Patch(_ api.PatchType) *unversioned.Request { func (c *RESTClient) Patch(_ api.PatchType) *unversioned.Request {
return unversioned.NewRequest(c, "PATCH", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) return unversioned.NewRequest(c, "PATCH", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil)
} }
func (c *RESTClient) Post() *unversioned.Request { func (c *RESTClient) Post() *unversioned.Request {
return unversioned.NewRequest(c, "POST", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) return unversioned.NewRequest(c, "POST", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil)
} }
func (c *RESTClient) Delete() *unversioned.Request { func (c *RESTClient) Delete() *unversioned.Request {
return unversioned.NewRequest(c, "DELETE", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec) return unversioned.NewRequest(c, "DELETE", &url.URL{Host: "localhost"}, *testapi.Default.GroupVersion(), c.Codec, nil)
} }
func (c *RESTClient) Do(req *http.Request) (*http.Response, error) { func (c *RESTClient) Do(req *http.Request) (*http.Response, error) {

View File

@ -109,10 +109,17 @@ type Request struct {
// The constructed request and the response // The constructed request and the response
req *http.Request req *http.Request
resp *http.Response resp *http.Response
backoffMgr BackoffManager
} }
// NewRequest creates a new request helper object for accessing runtime.Objects on a server. // NewRequest creates a new request helper object for accessing runtime.Objects on a server.
func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion unversioned.GroupVersion, codec runtime.Codec) *Request { func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion unversioned.GroupVersion, codec runtime.Codec, backoff BackoffManager) *Request {
if backoff == nil {
glog.V(2).Infof("Not implementing request backoff strategy.")
backoff = &NoBackoff{}
}
metrics.Register()
return &Request{ return &Request{
client: client, client: client,
verb: verb, verb: verb,
@ -120,6 +127,7 @@ func NewRequest(client HTTPClient, verb string, baseURL *url.URL, groupVersion u
path: baseURL.Path, path: baseURL.Path,
groupVersion: groupVersion, groupVersion: groupVersion,
codec: codec, codec: codec,
backoffMgr: backoff,
} }
} }
@ -610,8 +618,16 @@ func (r *Request) Watch() (watch.Interface, error) {
if client == nil { if client == nil {
client = http.DefaultClient client = http.DefaultClient
} }
time.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(r, resp, err) updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.baseURL, err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.baseURL, err, resp.StatusCode)
}
}
if err != nil { if err != nil {
// The watch stream mechanism handles many common partial data errors, so closed // The watch stream mechanism handles many common partial data errors, so closed
// connections can be retried in many cases. // connections can be retried in many cases.
@ -663,8 +679,16 @@ func (r *Request) Stream() (io.ReadCloser, error) {
if client == nil { if client == nil {
client = http.DefaultClient client = http.DefaultClient
} }
time.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(r, resp, err) updateURLMetrics(r, resp, err)
if r.baseURL != nil {
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
}
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -708,6 +732,7 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
}() }()
if r.err != nil { if r.err != nil {
glog.V(4).Infof("Error in request: %v", r.err)
return r.err return r.err
} }
@ -736,8 +761,14 @@ func (r *Request) request(fn func(*http.Request, *http.Response)) error {
} }
req.Header = r.headers req.Header = r.headers
time.Sleep(r.backoffMgr.CalculateBackoff(r.URL()))
resp, err := client.Do(req) resp, err := client.Do(req)
updateURLMetrics(r, resp, err) updateURLMetrics(r, resp, err)
if err != nil {
r.backoffMgr.UpdateBackoff(r.URL(), err, 0)
} else {
r.backoffMgr.UpdateBackoff(r.URL(), err, resp.StatusCode)
}
if err != nil { if err != nil {
return err return err
} }

View File

@ -262,7 +262,7 @@ func TestResultIntoWithErrReturnsErr(t *testing.T) {
func TestURLTemplate(t *testing.T) { func TestURLTemplate(t *testing.T) {
uri, _ := url.Parse("http://localhost") uri, _ := url.Parse("http://localhost")
r := NewRequest(nil, "POST", uri, unversioned.GroupVersion{Group: "test"}, nil) r := NewRequest(nil, "POST", uri, unversioned.GroupVersion{Group: "test"}, nil, nil)
r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0") r.Prefix("pre1").Resource("r1").Namespace("ns").Name("nm").Param("p0", "v0")
full := r.URL() full := r.URL()
if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" { if full.String() != "http://localhost/pre1/namespaces/ns/r1/nm?p0=v0" {
@ -323,7 +323,7 @@ func TestTransformResponse(t *testing.T) {
{Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid}, {Response: &http.Response{StatusCode: 200, Body: ioutil.NopCloser(bytes.NewReader(invalid))}, Data: invalid},
} }
for i, test := range testCases { for i, test := range testCases {
r := NewRequest(nil, "", uri, *testapi.Default.GroupVersion(), testapi.Default.Codec()) r := NewRequest(nil, "", uri, *testapi.Default.GroupVersion(), testapi.Default.Codec(), nil)
if test.Response.Body == nil { if test.Response.Body == nil {
test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{})) test.Response.Body = ioutil.NopCloser(bytes.NewReader([]byte{}))
} }
@ -542,6 +542,8 @@ func TestRequestWatch(t *testing.T) {
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
t.Logf("testcase %v", testCase.Request)
testCase.Request.backoffMgr = &NoBackoff{}
watch, err := testCase.Request.Watch() watch, err := testCase.Request.Watch()
hasErr := err != nil hasErr := err != nil
if hasErr != testCase.Err { if hasErr != testCase.Err {
@ -604,6 +606,7 @@ func TestRequestStream(t *testing.T) {
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
testCase.Request.backoffMgr = &NoBackoff{}
body, err := testCase.Request.Stream() body, err := testCase.Request.Stream()
hasErr := err != nil hasErr := err != nil
if hasErr != testCase.Err { if hasErr != testCase.Err {
@ -673,6 +676,7 @@ func TestRequestDo(t *testing.T) {
}, },
} }
for i, testCase := range testCases { for i, testCase := range testCases {
testCase.Request.backoffMgr = &NoBackoff{}
body, err := testCase.Request.Do().Raw() body, err := testCase.Request.Do().Raw()
hasErr := err != nil hasErr := err != nil
if hasErr != testCase.Err { if hasErr != testCase.Err {
@ -720,6 +724,42 @@ func TestDoRequestNewWay(t *testing.T) {
fakeHandler.ValidateRequest(t, requestURL, "POST", &reqBody) fakeHandler.ValidateRequest(t, requestURL, "POST", &reqBody)
} }
// This test assumes that the client implementation backs off exponentially, for an individual request.
func TestBackoffLifecycle(t *testing.T) {
count := 0
testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
count++
t.Logf("Attempt %d", count)
if count == 5 || count == 9 {
w.WriteHeader(http.StatusOK)
return
} else {
w.WriteHeader(http.StatusGatewayTimeout)
return
}
}))
defer testServer.Close()
c := testRESTClient(t, testServer)
// Test backoff recovery and increase. This correlates to the constants
// which are used in the server implementation returning StatusOK above.
seconds := []int{0, 1, 2, 4, 8, 0, 1, 2, 4, 0}
request := c.Verb("POST").Prefix("backofftest").Suffix("abc")
request.backoffMgr = &URLBackoff{
Backoff: util.NewBackOff(
time.Duration(1)*time.Second,
time.Duration(200)*time.Second)}
for _, sec := range seconds {
start := time.Now()
request.DoRaw()
finish := time.Since(start)
t.Logf("%v finished in %v", sec, finish)
if finish < time.Duration(sec)*time.Second || finish >= time.Duration(sec+5)*time.Second {
t.Fatalf("%v not in range %v", finish, sec)
}
}
}
func TestCheckRetryClosesBody(t *testing.T) { func TestCheckRetryClosesBody(t *testing.T) {
count := 0 count := 0
ch := make(chan struct{}) ch := make(chan struct{})
@ -1030,7 +1070,7 @@ func TestUintParam(t *testing.T) {
for _, item := range table { for _, item := range table {
u, _ := url.Parse("http://localhost") u, _ := url.Parse("http://localhost")
r := NewRequest(nil, "GET", u, unversioned.GroupVersion{Group: "test"}, nil).AbsPath("").UintParam(item.name, item.testVal) r := NewRequest(nil, "GET", u, unversioned.GroupVersion{Group: "test"}, nil, nil).AbsPath("").UintParam(item.name, item.testVal)
if e, a := item.expectStr, r.URL().String(); e != a { if e, a := item.expectStr, r.URL().String(); e != a {
t.Errorf("expected %v, got %v", e, a) t.Errorf("expected %v, got %v", e, a)
} }

View File

@ -19,14 +19,25 @@ package unversioned
import ( import (
"net/http" "net/http"
"net/url" "net/url"
"os"
"strconv"
"strings" "strings"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util" "k8s.io/kubernetes/pkg/util"
) )
const (
// Environment variables: Note that the duration should be long enough that the backoff
// persists for some reasonable time (i.e. 120 seconds). The typical base might be "1".
envBackoffBase = "KUBE_CLIENT_BACKOFF_BASE"
envBackoffDuration = "KUBE_CLIENT_BACKOFF_DURATION"
)
// RESTClient imposes common Kubernetes API conventions on a set of resource paths. // RESTClient imposes common Kubernetes API conventions on a set of resource paths.
// The baseURL is expected to point to an HTTP or HTTPS path that is the parent // The baseURL is expected to point to an HTTP or HTTPS path that is the parent
// of one or more resources. The server should return a decodable API resource // of one or more resources. The server should return a decodable API resource
@ -74,6 +85,28 @@ func NewRESTClient(baseURL *url.URL, groupVersion unversioned.GroupVersion, c ru
} }
} }
// readExpBackoffConfig handles the internal logic of determining what the
// backoff policy is. By default if no information is available, NoBackoff.
// TODO Generalize this see #17727 .
func readExpBackoffConfig() BackoffManager {
backoffBase := os.Getenv(envBackoffBase)
backoffDuration := os.Getenv(envBackoffDuration)
backoffBaseInt, errBase := strconv.ParseInt(backoffBase, 10, 64)
backoffDurationInt, errDuration := strconv.ParseInt(backoffDuration, 10, 64)
if errBase != nil || errDuration != nil {
glog.V(2).Infof("Configuring no exponential backoff.")
return &NoBackoff{}
} else {
glog.V(2).Infof("Configuring exponential backoff as %v, %v", backoffBaseInt, backoffDurationInt)
return &URLBackoff{
Backoff: util.NewBackOff(
time.Duration(backoffBaseInt)*time.Second,
time.Duration(backoffDurationInt)*time.Second)}
}
}
// Verb begins a request with a verb (GET, POST, PUT, DELETE). // Verb begins a request with a verb (GET, POST, PUT, DELETE).
// //
// Example usage of RESTClient's request building interface: // Example usage of RESTClient's request building interface:
@ -90,10 +123,13 @@ func (c *RESTClient) Verb(verb string) *Request {
if c.Throttle != nil { if c.Throttle != nil {
c.Throttle.Accept() c.Throttle.Accept()
} }
backoff := readExpBackoffConfig()
if c.Client == nil { if c.Client == nil {
return NewRequest(nil, verb, c.baseURL, c.groupVersion, c.Codec) return NewRequest(nil, verb, c.baseURL, c.groupVersion, c.Codec, backoff)
} }
return NewRequest(c.Client, verb, c.baseURL, c.groupVersion, c.Codec) return NewRequest(c.Client, verb, c.baseURL, c.groupVersion, c.Codec, backoff)
} }
// Post begins a POST request. Short for c.Verb("POST"). // Post begins a POST request. Short for c.Verb("POST").

View File

@ -19,8 +19,11 @@ package unversioned
import ( import (
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"net/url"
"os"
"reflect" "reflect"
"testing" "testing"
"time"
"k8s.io/kubernetes/pkg/api/testapi" "k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
@ -139,3 +142,39 @@ func TestDoRequestCreated(t *testing.T) {
} }
fakeHandler.ValidateRequest(t, "/"+testapi.Default.Version()+"/test", "GET", nil) fakeHandler.ValidateRequest(t, "/"+testapi.Default.Version()+"/test", "GET", nil)
} }
func TestCreateBackoffManager(t *testing.T) {
theUrl, _ := url.Parse("http://localhost")
// 1 second base backoff + duration of 2 seconds -> exponential backoff for requests.
os.Setenv(envBackoffBase, "1")
os.Setenv(envBackoffDuration, "2")
backoff := readExpBackoffConfig()
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
if backoff.CalculateBackoff(theUrl)/time.Second != 2 {
t.Errorf("Backoff env not working.")
}
// 0 duration -> no backoff.
os.Setenv(envBackoffBase, "1")
os.Setenv(envBackoffDuration, "0")
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
backoff = readExpBackoffConfig()
if backoff.CalculateBackoff(theUrl)/time.Second != 0 {
t.Errorf("Zero backoff duration, but backoff still occuring.")
}
// No env -> No backoff.
os.Setenv(envBackoffBase, "")
os.Setenv(envBackoffDuration, "")
backoff = readExpBackoffConfig()
backoff.UpdateBackoff(theUrl, nil, 500)
backoff.UpdateBackoff(theUrl, nil, 500)
if backoff.CalculateBackoff(theUrl)/time.Second != 0 {
t.Errorf("Backoff should have been 0.")
}
}

View File

@ -0,0 +1,97 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unversioned
import (
"net/url"
"time"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/pkg/util/sets"
)
// Set of resp. Codes that we backoff for.
// In general these should be errors that indicate a server is overloaded.
// These shouldn't be configured by any user, we set them based on conventions
// described in
var serverIsOverloadedSet = sets.NewInt(429)
var maxResponseCode = 499
type BackoffManager interface {
UpdateBackoff(actualUrl *url.URL, err error, responseCode int)
CalculateBackoff(actualUrl *url.URL) time.Duration
}
// URLBackoff struct implements the semantics on top of Backoff which
// we need for URL specific exponential backoff.
type URLBackoff struct {
// Uses backoff as underlying implementation.
Backoff *util.Backoff
}
// NoBackoff is a stub implementation, can be used for mocking or else as a default.
type NoBackoff struct {
}
func (n *NoBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
// do nothing.
}
func (n *NoBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
return 0 * time.Second
}
// Disable makes the backoff trivial, i.e., sets it to zero. This might be used
// by tests which want to run 1000s of mock requests without slowing down.
func (b *URLBackoff) Disable() {
glog.V(4).Infof("Disabling backoff strategy")
b.Backoff = util.NewBackOff(0*time.Second, 0*time.Second)
}
// baseUrlKey returns the key which urls will be mapped to.
// For example, 127.0.0.1:8080/api/v2/abcde -> 127.0.0.1:8080.
func (b *URLBackoff) baseUrlKey(rawurl *url.URL) string {
// Simple implementation for now, just the host.
// We may backoff specific paths (i.e. "pods") differentially
// in the future.
host, err := url.Parse(rawurl.String())
if err != nil {
glog.V(4).Infof("Error extracting url: %v", rawurl)
panic("bad url!")
}
return host.Host
}
// UpdateBackoff updates backoff metadata
func (b *URLBackoff) UpdateBackoff(actualUrl *url.URL, err error, responseCode int) {
// range for retry counts that we store is [0,13]
if responseCode > maxResponseCode || serverIsOverloadedSet.Has(responseCode) {
b.Backoff.Next(b.baseUrlKey(actualUrl), time.Now())
return
} else if responseCode >= 300 || err != nil {
glog.V(4).Infof("Client is returning errors: code %v, error %v", responseCode, err)
}
//If we got this far, there is no backoff required for this URL anymore.
b.Backoff.Reset(b.baseUrlKey(actualUrl))
}
// CalculateBackoff takes a url and back's off exponentially,
// based on its knowledge of existing failures.
func (b *URLBackoff) CalculateBackoff(actualUrl *url.URL) time.Duration {
return b.Backoff.Get(b.baseUrlKey(actualUrl))
}

View File

@ -0,0 +1,78 @@
/*
Copyright 2014 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package unversioned
import (
"k8s.io/kubernetes/pkg/util"
"net/url"
"testing"
"time"
)
func parse(raw string) *url.URL {
theUrl, _ := url.Parse(raw)
return theUrl
}
func TestURLBackoffFunctionalityCollisions(t *testing.T) {
myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second),
}
// Add some noise and make sure backoff for a clean URL is zero.
myBackoff.UpdateBackoff(parse("http://100.200.300.400:8080"), nil, 500)
myBackoff.UpdateBackoff(parse("http://1.2.3.4:8080"), nil, 500)
if myBackoff.CalculateBackoff(parse("http://1.2.3.4:100")) > 0 {
t.Errorf("URLs are colliding in the backoff map!")
}
}
// TestURLBackoffFunctionality generally tests the URLBackoff wrapper. We avoid duplicating tests from backoff and request.
func TestURLBackoffFunctionality(t *testing.T) {
myBackoff := &URLBackoff{
Backoff: util.NewBackOff(1*time.Second, 60*time.Second),
}
// Now test that backoff increases, then recovers.
// 200 and 300 should both result in clearing the backoff.
// all others like 429 should result in increased backoff.
seconds := []int{0,
1, 2, 4, 8, 0,
1, 2}
returnCodes := []int{
429, 500, 501, 502, 300,
500, 501, 502,
}
if len(seconds) != len(returnCodes) {
t.Fatalf("responseCode to backoff arrays should be the same length... sanity check failed.")
}
for i, sec := range seconds {
backoffSec := myBackoff.CalculateBackoff(parse("http://1.2.3.4:100"))
if backoffSec < time.Duration(sec)*time.Second || backoffSec > time.Duration(sec+5)*time.Second {
t.Errorf("Backoff out of range %v: %v %v", i, sec, backoffSec)
}
myBackoff.UpdateBackoff(parse("http://1.2.3.4:100/responseCodeForFuncTest"), nil, returnCodes[i])
}
if myBackoff.CalculateBackoff(parse("http://1.2.3.4:100")) == 0 {
t.Errorf("The final return code %v should have resulted in a backoff ! ", returnCodes[7])
}
}