Exhaust every request body before closing it (#5166) (#5479)

From the documentation:
> The default HTTP client's Transport may not
> reuse HTTP/1.x "keep-alive" TCP connections if the Body is
> not read to completion and closed.

This effectively enable keep-alive for the fixed requests.

Signed-off-by: Romain Baugue <romain.baugue@elwinar.com>
pull/5503/head
Romain Baugue 2019-04-18 10:50:37 +02:00 committed by Brian Brazil
parent 3da718c21a
commit f2ca36e56a
7 changed files with 37 additions and 8 deletions

View File

@ -17,6 +17,7 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
"net" "net"
@ -306,7 +307,10 @@ func fetchApps(ctx context.Context, client *http.Client, url string) (*appList,
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer resp.Body.Close() defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
if (resp.StatusCode < 200) || (resp.StatusCode >= 300) { if (resp.StatusCode < 200) || (resp.StatusCode >= 300) {
return nil, errors.Errorf("non 2xx status '%v' response during marathon service discovery", resp.StatusCode) return nil, errors.Errorf("non 2xx status '%v' response during marathon service discovery", resp.StatusCode)

View File

@ -17,6 +17,8 @@ import (
"context" "context"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"os" "os"
@ -98,7 +100,10 @@ func (d *discovery) parseServiceNodes(resp *http.Response, name string) (*target
} }
dec := json.NewDecoder(resp.Body) dec := json.NewDecoder(resp.Body)
defer resp.Body.Close() defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
err := dec.Decode(&nodes) err := dec.Decode(&nodes)
if err != nil { if err != nil {

View File

@ -17,6 +17,7 @@ import (
"bytes" "bytes"
"context" "context"
"encoding/json" "encoding/json"
"io"
"io/ioutil" "io/ioutil"
"math" "math"
"net/http" "net/http"
@ -114,7 +115,10 @@ func (c *Client) Write(samples model.Samples) error {
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
// API returns status code 204 for successful writes. // API returns status code 204 for successful writes.
// http://opentsdb.net/docs/build/html/api_http/put.html // http://opentsdb.net/docs/build/html/api_http/put.html

View File

@ -19,6 +19,8 @@ import (
"crypto/md5" "crypto/md5"
"encoding/json" "encoding/json"
"fmt" "fmt"
"io"
"io/ioutil"
"net" "net"
"net/http" "net/http"
"net/url" "net/url"
@ -508,7 +510,10 @@ func (n *Manager) sendOne(ctx context.Context, c *http.Client, url string, b []b
if err != nil { if err != nil {
return err return err
} }
defer resp.Body.Close() defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
// Any HTTP status 2xx is OK. // Any HTTP status 2xx is OK.
if resp.StatusCode/100 != 2 { if resp.StatusCode/100 != 2 {

View File

@ -14,6 +14,7 @@
package notifier package notifier
import ( import (
"bytes"
"context" "context"
"crypto/md5" "crypto/md5"
"encoding/json" "encoding/json"
@ -224,7 +225,7 @@ func TestCustomDo(t *testing.T) {
testutil.Equals(t, testURL, req.URL.String()) testutil.Equals(t, testURL, req.URL.String())
return &http.Response{ return &http.Response{
Body: ioutil.NopCloser(nil), Body: ioutil.NopCloser(bytes.NewBuffer(nil)),
}, nil }, nil
}, },
}, nil) }, nil)

View File

@ -20,6 +20,7 @@ import (
"context" "context"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"math" "math"
"net/http" "net/http"
"sync" "sync"
@ -543,7 +544,10 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
if err != nil { if err != nil {
return "", err return "", err
} }
defer resp.Body.Close() defer func() {
io.Copy(ioutil.Discard, resp.Body)
resp.Body.Close()
}()
if resp.StatusCode != http.StatusOK { if resp.StatusCode != http.StatusOK {
return "", errors.Errorf("server returned HTTP status %s", resp.Status) return "", errors.Errorf("server returned HTTP status %s", resp.Status)

View File

@ -95,7 +95,10 @@ func (c *Client) Store(ctx context.Context, req []byte) error {
// recoverable. // recoverable.
return recoverableError{err} return recoverableError{err}
} }
defer httpResp.Body.Close() defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
if httpResp.StatusCode/100 != 2 { if httpResp.StatusCode/100 != 2 {
scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen)) scanner := bufio.NewScanner(io.LimitReader(httpResp.Body, maxErrMsgLen))
@ -148,7 +151,10 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error sending request") return nil, errors.Wrap(err, "error sending request")
} }
defer httpResp.Body.Close() defer func() {
io.Copy(ioutil.Discard, httpResp.Body)
httpResp.Body.Close()
}()
if httpResp.StatusCode/100 != 2 { if httpResp.StatusCode/100 != 2 {
return nil, errors.Errorf("server returned HTTP status %s", httpResp.Status) return nil, errors.Errorf("server returned HTTP status %s", httpResp.Status)
} }