Merge pull request #45666 from ilackarms/compression

Automatic merge from submit-queue

add gzip compression to GET and LIST requests

Fixes https://github.com/kubernetes/kubernetes/issues/44164



Enable compressed response bodies for non-watch GET and LIST requests on API Objects.

**What this PR does / why we need it**: Adds compression via Accept-Encoding header, returns Content-Encoding header on responses (only supports gzip at this time). Enabled solely for GET and LIST requests which can return very large response bodies. 

**Special notes for your reviewer**:

See https://github.com/kubernetes/kubernetes/issues/44164 for discussion.

**Release note**:

```release-note-
```
pull/6/head
Kubernetes Submit Queue 2017-06-06 07:43:03 -07:00 committed by GitHub
commit cc568f6433
6 changed files with 468 additions and 0 deletions

View File

@ -84,5 +84,6 @@ go_library(
"//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/metrics:go_default_library",
"//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//vendor/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library", "//vendor/k8s.io/apiserver/pkg/registry/rest:go_default_library",
"//vendor/k8s.io/apiserver/pkg/server/filters:go_default_library",
], ],
) )

View File

@ -18,6 +18,7 @@ package endpoints
import ( import (
"bytes" "bytes"
"compress/gzip"
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
@ -1203,6 +1204,102 @@ func TestRequestsWithInvalidQuery(t *testing.T) {
} }
} }
func TestListCompresion(t *testing.T) {
testCases := []struct {
url string
namespace string
selfLink string
legacy bool
label string
field string
acceptEncoding string
}{
// list items in a namespace in the path
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "",
},
{
url: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
namespace: "default",
selfLink: "/" + grouplessPrefix + "/" + grouplessGroupVersion.Version + "/namespaces/default/simple",
acceptEncoding: "gzip",
},
}
for i, testCase := range testCases {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{expectedResourceNamespace: testCase.namespace}
storage["simple"] = &simpleStorage
selfLinker := &setTestSelfLinker{
t: t,
namespace: testCase.namespace,
expectedSet: testCase.selfLink,
}
var handler = handleInternal(storage, admissionControl, selfLinker, nil)
server := httptest.NewServer(handler)
defer server.Close()
req, err := http.NewRequest("GET", server.URL+testCase.url, nil)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", testCase.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
t.Errorf("%d: unexpected status: %d from url %s, Expected: %d, %#v", i, resp.StatusCode, testCase.url, http.StatusOK, resp)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("%d: unexpected error: %v", i, err)
continue
}
t.Logf("%d: body: %s", i, string(body))
continue
}
// TODO: future, restore get links
if !selfLinker.called {
t.Errorf("%d: never set self link", i)
}
if !simpleStorage.namespacePresent {
t.Errorf("%d: namespace not set", i)
} else if simpleStorage.actualNamespace != testCase.namespace {
t.Errorf("%d: %q unexpected resource namespace: %s", i, testCase.url, simpleStorage.actualNamespace)
}
if simpleStorage.requestedLabelSelector == nil || simpleStorage.requestedLabelSelector.String() != testCase.label {
t.Errorf("%d: unexpected label selector: %v", i, simpleStorage.requestedLabelSelector)
}
if simpleStorage.requestedFieldSelector == nil || simpleStorage.requestedFieldSelector.String() != testCase.field {
t.Errorf("%d: unexpected field selector: %v", i, simpleStorage.requestedFieldSelector)
}
var decoder *json.Decoder
if testCase.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.SimpleList
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("failed to read response body as SimpleList: %v", err)
}
}
}
func TestLogs(t *testing.T) { func TestLogs(t *testing.T) {
handler := handle(map[string]rest.Storage{}) handler := handle(map[string]rest.Storage{})
server := httptest.NewServer(handler) server := httptest.NewServer(handler)
@ -1518,6 +1615,77 @@ func TestGet(t *testing.T) {
} }
} }
func TestGetCompression(t *testing.T) {
storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{
item: genericapitesting.Simple{
Other: "foo",
},
}
selfLinker := &setTestSelfLinker{
t: t,
expectedSet: "/" + prefix + "/" + testGroupVersion.Group + "/" + testGroupVersion.Version + "/namespaces/default/simple/id",
name: "id",
namespace: "default",
}
storage["simple"] = &simpleStorage
handler := handleLinker(storage, selfLinker)
server := httptest.NewServer(handler)
defer server.Close()
tests := []struct {
acceptEncoding string
}{
{acceptEncoding: ""},
{acceptEncoding: "gzip"},
}
for _, test := range tests {
req, err := http.NewRequest("GET", server.URL+"/"+prefix+"/"+testGroupVersion.Group+"/"+testGroupVersion.Version+"/namespaces/default/simple/id", nil)
if err != nil {
t.Fatalf("unexpected error cretaing request: %v", err)
}
// It's necessary to manually set Accept-Encoding here
// to prevent http.DefaultClient from automatically
// decoding responses
req.Header.Set("Accept-Encoding", test.acceptEncoding)
resp, err := http.DefaultClient.Do(req)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if resp.StatusCode != http.StatusOK {
t.Fatalf("unexpected response: %#v", resp)
}
var decoder *json.Decoder
if test.acceptEncoding == "gzip" {
gzipReader, err := gzip.NewReader(resp.Body)
if err != nil {
t.Fatalf("unexpected error creating gzip reader: %v", err)
}
decoder = json.NewDecoder(gzipReader)
} else {
decoder = json.NewDecoder(resp.Body)
}
var itemOut genericapitesting.Simple
err = decoder.Decode(&itemOut)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
t.Errorf("unexpected error reading body: %v", err)
}
if itemOut.Name != simpleStorage.item.Name {
t.Errorf("Unexpected data: %#v, expected %#v (%s)", itemOut, simpleStorage.item, string(body))
}
if !selfLinker.called {
t.Errorf("Never set self link")
}
}
}
func TestGetUninitialized(t *testing.T) { func TestGetUninitialized(t *testing.T) {
storage := map[string]rest.Storage{} storage := map[string]rest.Storage{}
simpleStorage := SimpleRESTStorage{ simpleStorage := SimpleRESTStorage{

View File

@ -40,6 +40,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/metrics" "k8s.io/apiserver/pkg/endpoints/metrics"
"k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/registry/rest"
genericfilters "k8s.io/apiserver/pkg/server/filters"
) )
const ( const (
@ -587,6 +588,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
handler = restfulGetResource(getter, exporter, reqScope) handler = restfulGetResource(getter, exporter, reqScope)
} }
handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler) handler = metrics.InstrumentRouteFunc(action.Verb, resource, subresource, handler)
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
doc := "read the specified " + kind doc := "read the specified " + kind
if hasSubresource { if hasSubresource {
doc = "read " + subresource + " of the specified " + kind doc = "read " + subresource + " of the specified " + kind
@ -616,6 +618,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
doc = "list " + subresource + " of objects of kind " + kind doc = "list " + subresource + " of objects of kind " + kind
} }
handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout)) handler := metrics.InstrumentRouteFunc(action.Verb, resource, subresource, restfulListResource(lister, watcher, reqScope, false, a.minRequestTimeout))
handler = genericfilters.RestfulWithCompression(handler, a.group.Context)
route := ws.GET(action.Path).To(handler). route := ws.GET(action.Path).To(handler).
Doc(doc). Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")). Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).

View File

@ -11,6 +11,7 @@ load(
go_test( go_test(
name = "go_default_test", name = "go_default_test",
srcs = [ srcs = [
"compression_test.go",
"cors_test.go", "cors_test.go",
"maxinflight_test.go", "maxinflight_test.go",
"timeout_test.go", "timeout_test.go",
@ -31,6 +32,7 @@ go_test(
go_library( go_library(
name = "go_default_library", name = "go_default_library",
srcs = [ srcs = [
"compression.go",
"cors.go", "cors.go",
"doc.go", "doc.go",
"longrunning.go", "longrunning.go",
@ -40,6 +42,7 @@ go_library(
], ],
tags = ["automanaged"], tags = ["automanaged"],
deps = [ deps = [
"//vendor/github.com/emicklei/go-restful:go_default_library",
"//vendor/github.com/golang/glog:go_default_library", "//vendor/github.com/golang/glog:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library",
"//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",

View File

@ -0,0 +1,183 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"compress/gzip"
"compress/zlib"
"errors"
"fmt"
"io"
"net/http"
"strings"
"github.com/emicklei/go-restful"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apiserver/pkg/endpoints/request"
)
// Compressor is an interface to compression writers
type Compressor interface {
io.WriteCloser
Flush() error
}
const (
headerAcceptEncoding = "Accept-Encoding"
headerContentEncoding = "Content-Encoding"
encodingGzip = "gzip"
encodingDeflate = "deflate"
)
// WithCompression wraps an http.Handler with the Compression Handler
func WithCompression(handler http.Handler, ctxMapper request.RequestContextMapper) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
wantsCompression, encoding := wantsCompressedResponse(req, ctxMapper)
w.Header().Set("Vary", "Accept-Encoding")
if wantsCompression {
compressionWriter, err := NewCompressionResponseWriter(w, encoding)
if err != nil {
handleError(w, req, err)
runtime.HandleError(fmt.Errorf("failed to compress HTTP response: %v", err))
return
}
compressionWriter.Header().Set("Content-Encoding", encoding)
handler.ServeHTTP(compressionWriter, req)
compressionWriter.(*compressionResponseWriter).Close()
} else {
handler.ServeHTTP(w, req)
}
})
}
// wantsCompressedResponse reads the Accept-Encoding header to see if and which encoding is requested.
func wantsCompressedResponse(req *http.Request, ctxMapper request.RequestContextMapper) (bool, string) {
// don't compress watches
ctx, ok := ctxMapper.Get(req)
if !ok {
return false, ""
}
info, ok := request.RequestInfoFrom(ctx)
if !ok {
return false, ""
}
if !info.IsResourceRequest {
return false, ""
}
if info.Verb == "watch" {
return false, ""
}
header := req.Header.Get(headerAcceptEncoding)
gi := strings.Index(header, encodingGzip)
zi := strings.Index(header, encodingDeflate)
// use in order of appearance
switch {
case gi == -1:
return zi != -1, encodingDeflate
case zi == -1:
return gi != -1, encodingGzip
case gi < zi:
return true, encodingGzip
default:
return true, encodingDeflate
}
}
type compressionResponseWriter struct {
writer http.ResponseWriter
compressor Compressor
encoding string
}
// NewCompressionResponseWriter returns wraps w with a compression ResponseWriter, using the given encoding
func NewCompressionResponseWriter(w http.ResponseWriter, encoding string) (http.ResponseWriter, error) {
var compressor Compressor
switch encoding {
case encodingGzip:
compressor = gzip.NewWriter(w)
case encodingDeflate:
compressor = zlib.NewWriter(w)
default:
return nil, fmt.Errorf("%s is not a supported encoding type", encoding)
}
return &compressionResponseWriter{
writer: w,
compressor: compressor,
encoding: encoding,
}, nil
}
// compressionResponseWriter implements http.Responsewriter Interface
var _ http.ResponseWriter = &compressionResponseWriter{}
func (c *compressionResponseWriter) Header() http.Header {
return c.writer.Header()
}
// compress data according to compression method
func (c *compressionResponseWriter) Write(p []byte) (int, error) {
if c.compressorClosed() {
return -1, errors.New("compressing error: tried to write data using closed compressor")
}
c.Header().Set(headerContentEncoding, c.encoding)
return c.compressor.Write(p)
}
func (c *compressionResponseWriter) WriteHeader(status int) {
c.writer.WriteHeader(status)
}
// CloseNotify is part of http.CloseNotifier interface
func (c *compressionResponseWriter) CloseNotify() <-chan bool {
return c.writer.(http.CloseNotifier).CloseNotify()
}
// Close the underlying compressor
func (c *compressionResponseWriter) Close() error {
if c.compressorClosed() {
return errors.New("Compressing error: tried to close already closed compressor")
}
c.compressor.Close()
c.compressor = nil
return nil
}
func (c *compressionResponseWriter) Flush() {
if c.compressorClosed() {
return
}
c.compressor.Flush()
}
func (c *compressionResponseWriter) compressorClosed() bool {
return nil == c.compressor
}
// RestfulWithCompression wraps WithCompression to be compatible with go-restful
func RestfulWithCompression(function restful.RouteFunction, ctxMapper request.RequestContextMapper) restful.RouteFunction {
return restful.RouteFunction(func(request *restful.Request, response *restful.Response) {
handler := WithCompression(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
response.ResponseWriter = w
request.Request = req
function(request, response)
}), ctxMapper)
handler.ServeHTTP(response.ResponseWriter, request.Request)
})
}

View File

@ -0,0 +1,110 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package filters
import (
"bytes"
"compress/gzip"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"testing"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/endpoints/filters"
"k8s.io/apiserver/pkg/endpoints/request"
)
func TestCompression(t *testing.T) {
tests := []struct {
encoding string
watch bool
}{
{"", false},
{"gzip", true},
{"gzip", false},
}
responseData := []byte("1234")
requestContextMapper := request.NewRequestContextMapper()
for _, test := range tests {
handler := WithCompression(
http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.Write(responseData)
}),
requestContextMapper,
)
handler = filters.WithRequestInfo(handler, newTestRequestInfoResolver(), requestContextMapper)
handler = request.WithRequestContext(handler, requestContextMapper)
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{
Transport: &http.Transport{
DisableCompression: true,
},
}
url := server.URL + "/api/v1/pods"
if test.watch {
url = url + "?watch=1"
}
request, err := http.NewRequest("GET", url, nil)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
request.Header.Set("Accept-Encoding", test.encoding)
response, err := client.Do(request)
if err != nil {
t.Errorf("unexpected error: %v", err)
}
var reader io.Reader
if test.encoding == "gzip" && !test.watch {
if response.Header.Get("Content-Encoding") != "gzip" {
t.Fatal("expected response header Content-Encoding to be set to \"gzip\"")
}
if response.Header.Get("Vary") != "Accept-Encoding" {
t.Fatal("expected response header Vary to be set to \"Accept-Encoding\"")
}
reader, err = gzip.NewReader(response.Body)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
} else {
if response.Header.Get("Content-Encoding") == "gzip" {
t.Fatal("expected response header Content-Encoding not to be set")
}
reader = response.Body
}
body, err := ioutil.ReadAll(reader)
if err != nil {
t.Fatal("unexpected error: %v", err)
}
if !bytes.Equal(body, responseData) {
t.Fatalf("Expected response body %s to equal %s", body, responseData)
}
}
}
func newTestRequestInfoResolver() *request.RequestInfoFactory {
return &request.RequestInfoFactory{
APIPrefixes: sets.NewString("api", "apis"),
GrouplessAPIPrefixes: sets.NewString("api"),
}
}