Add a method for encoding directly to a io.Writer and use it for HTTPx

pull/6/head
Brendan Burns 2015-09-17 17:43:05 -07:00
parent a518a27354
commit 8998219686
10 changed files with 112 additions and 49 deletions

View File

@ -28,16 +28,18 @@ cleanup() {
kube::log::status "Benchmark cleanup complete"
}
ARGS="-bench-pods 3000 -bench-tasks 100 -bench-tasks 10"
runTests() {
kube::etcd::start
kube::log::status "Running benchmarks"
KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchtime 1s -cpu 4" \
KUBE_GOFLAGS="-tags 'benchmark no-docker' -bench . -benchmem -benchtime 1s -cpu 4" \
KUBE_RACE="-race" \
KUBE_TEST_API_VERSIONS="v1" \
KUBE_TIMEOUT="-timeout 10m" \
KUBE_TEST_ETCD_PREFIXES="registry"\
ETCD_CUSTOM_PREFIX="None" \
KUBE_TEST_ARGS="-bench-quiet 0 -bench-pods 30 -bench-tasks 1"\
KUBE_TEST_ARGS="${ARGS}" \
"${KUBE_ROOT}/hack/test-go.sh" test/integration
cleanup
}

View File

@ -18,6 +18,7 @@ package meta
import (
"errors"
"io"
"testing"
"k8s.io/kubernetes/pkg/runtime"
@ -29,6 +30,10 @@ func (fakeCodec) Encode(runtime.Object) ([]byte, error) {
return []byte{}, nil
}
func (fakeCodec) EncodeToStream(runtime.Object, io.Writer) error {
return nil
}
func (fakeCodec) Decode([]byte) (runtime.Object, error) {
return nil, nil
}

View File

@ -380,24 +380,32 @@ func isPrettyPrint(req *http.Request) bool {
// writeJSON renders an object as JSON to the response.
func writeJSON(statusCode int, codec runtime.Codec, object runtime.Object, w http.ResponseWriter, pretty bool) {
w.Header().Set("Content-Type", "application/json")
// We send the status code before we encode the object, so if we error, the status code stays but there will
// still be an error object. This seems ok, the alternative is to validate the object before
// encoding, but this really should never happen, so it's wasted compute for every API request.
w.WriteHeader(statusCode)
if pretty {
prettyJSON(codec, object, w)
return
}
err := codec.EncodeToStream(object, w)
if err != nil {
errorJSONFatal(err, codec, w)
}
}
func prettyJSON(codec runtime.Codec, object runtime.Object, w http.ResponseWriter) {
formatted := &bytes.Buffer{}
output, err := codec.Encode(object)
if err != nil {
errorJSONFatal(err, codec, w)
}
if err := json.Indent(formatted, output, "", " "); err != nil {
errorJSONFatal(err, codec, w)
return
}
if pretty {
// PR #2243: Pretty-print JSON by default.
formatted := &bytes.Buffer{}
err = json.Indent(formatted, output, "", " ")
if err != nil {
errorJSONFatal(err, codec, w)
return
}
output = formatted.Bytes()
}
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(statusCode)
w.Write(output)
w.Write(formatted.Bytes())
}
// errorJSON renders an error to the response. Returns the HTTP status code of the error.

View File

@ -2432,9 +2432,8 @@ func expectApiStatus(t *testing.T, method, url string, data []byte, code int) *u
return nil
}
var status unversioned.Status
_, err = extractBody(response, &status)
if err != nil {
t.Fatalf("unexpected error on %s %s: %v", method, url, err)
if body, err := extractBody(response, &status); err != nil {
t.Fatalf("unexpected error on %s %s: %v\nbody:\n%s", method, url, err, body)
return nil
}
if code != response.StatusCode {
@ -2470,7 +2469,10 @@ func TestWriteJSONDecodeError(t *testing.T) {
writeJSON(http.StatusOK, codec, &UnregisteredAPIObject{"Undecodable"}, w, false)
}))
defer server.Close()
status := expectApiStatus(t, "GET", server.URL, nil, http.StatusInternalServerError)
// We send a 200 status code before we encode the object, so we expect OK, but there will
// still be an error object. This seems ok, the alternative is to validate the object before
// encoding, but this really should never happen, so it's wasted compute for every API request.
status := expectApiStatus(t, "GET", server.URL, nil, http.StatusOK)
if status.Reason != unversioned.StatusReasonUnknown {
t.Errorf("unexpected reason %#v", status)
}

View File

@ -17,8 +17,10 @@ limitations under the License.
package conversion
import (
"bytes"
"encoding/json"
"fmt"
"io"
"path"
)
@ -51,6 +53,14 @@ import (
// config files.
//
func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []byte, err error) {
buff := &bytes.Buffer{}
if err := s.EncodeToVersionStream(obj, destVersion, buff); err != nil {
return nil, err
}
return buff.Bytes(), nil
}
func (s *Scheme) EncodeToVersionStream(obj interface{}, destVersion string, stream io.Writer) error {
obj = maybeCopy(obj)
v, _ := EnforcePtr(obj) // maybeCopy guarantees a pointer
@ -58,28 +68,34 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by
// destVersion is v1, encode it to v1 for backward compatibility.
pkg := path.Base(v.Type().PkgPath())
if pkg == "unversioned" && destVersion != "v1" {
return s.encodeUnversionedObject(obj)
// TODO: convert this to streaming too
data, err := s.encodeUnversionedObject(obj)
if err != nil {
return err
}
_, err = stream.Write(data)
return err
}
if _, registered := s.typeToVersion[v.Type()]; !registered {
return nil, fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion)
return fmt.Errorf("type %v is not registered for %q and it will be impossible to Decode it, therefore Encode will refuse to encode it.", v.Type(), destVersion)
}
objVersion, objKind, err := s.ObjectVersionAndKind(obj)
if err != nil {
return nil, err
return err
}
// Perform a conversion if necessary.
if objVersion != destVersion {
objOut, err := s.NewObject(destVersion, objKind)
if err != nil {
return nil, err
return err
}
flags, meta := s.generateConvertMeta(objVersion, destVersion, obj)
err = s.converter.Convert(obj, objOut, flags, meta)
if err != nil {
return nil, err
return err
}
obj = objOut
}
@ -87,29 +103,29 @@ func (s *Scheme) EncodeToVersion(obj interface{}, destVersion string) (data []by
// ensure the output object name comes from the destination type
_, objKind, err = s.ObjectVersionAndKind(obj)
if err != nil {
return nil, err
return err
}
// Version and Kind should be set on the wire.
err = s.SetVersionAndKind(destVersion, objKind, obj)
if err != nil {
return nil, err
return err
}
// To add metadata, do some simple surgery on the JSON.
data, err = json.Marshal(obj)
if err != nil {
return nil, err
encoder := json.NewEncoder(stream)
if err := encoder.Encode(obj); err != nil {
return err
}
// Version and Kind should be blank in memory. Reset them, since it's
// possible that we modified a user object and not a copy above.
err = s.SetVersionAndKind("", "", obj)
if err != nil {
return nil, err
return err
}
return data, nil
return nil
}
func (s *Scheme) encodeUnversionedObject(obj interface{}) (data []byte, err error) {

View File

@ -20,6 +20,7 @@ import (
"bytes"
"encoding/json"
"fmt"
"io"
"strings"
"k8s.io/kubernetes/pkg/api/latest"
@ -219,40 +220,49 @@ const template = `{
"items": [ %s ]
}`
func encodeToJSON(obj *experimental.ThirdPartyResourceData) ([]byte, error) {
func encodeToJSON(obj *experimental.ThirdPartyResourceData, stream io.Writer) error {
var objOut interface{}
if err := json.Unmarshal(obj.Data, &objOut); err != nil {
return nil, err
return err
}
objMap, ok := objOut.(map[string]interface{})
if !ok {
return nil, fmt.Errorf("unexpected type: %v", objOut)
return fmt.Errorf("unexpected type: %v", objOut)
}
objMap["metadata"] = obj.ObjectMeta
return json.Marshal(objMap)
encoder := json.NewEncoder(stream)
return encoder.Encode(objMap)
}
func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) (data []byte, err error) {
func (t *thirdPartyResourceDataCodec) Encode(obj runtime.Object) ([]byte, error) {
buff := &bytes.Buffer{}
if err := t.EncodeToStream(obj, buff); err != nil {
return nil, err
}
return buff.Bytes(), nil
}
func (t *thirdPartyResourceDataCodec) EncodeToStream(obj runtime.Object, stream io.Writer) (err error) {
switch obj := obj.(type) {
case *experimental.ThirdPartyResourceData:
return encodeToJSON(obj)
return encodeToJSON(obj, stream)
case *experimental.ThirdPartyResourceDataList:
// TODO: There must be a better way to do this...
buff := &bytes.Buffer{}
dataStrings := make([]string, len(obj.Items))
for ix := range obj.Items {
data, err := encodeToJSON(&obj.Items[ix])
buff := &bytes.Buffer{}
err := encodeToJSON(&obj.Items[ix], buff)
if err != nil {
return nil, err
return err
}
dataStrings[ix] = string(data)
dataStrings[ix] = buff.String()
}
fmt.Fprintf(buff, template, t.kind+"List", strings.Join(dataStrings, ","))
return buff.Bytes(), nil
fmt.Fprintf(stream, template, t.kind+"List", strings.Join(dataStrings, ","))
return nil
case *unversioned.Status:
return t.delegate.Encode(obj)
return t.delegate.EncodeToStream(obj, stream)
default:
return nil, fmt.Errorf("unexpected object to encode: %#v", obj)
return fmt.Errorf("unexpected object to encode: %#v", obj)
}
}

View File

@ -17,6 +17,8 @@ limitations under the License.
package runtime
import (
"io"
"k8s.io/kubernetes/pkg/util/yaml"
)
@ -78,6 +80,10 @@ func (c *codecWrapper) Encode(obj Object) ([]byte, error) {
return c.EncodeToVersion(obj, c.version)
}
func (c *codecWrapper) EncodeToStream(obj Object, stream io.Writer) error {
return c.EncodeToVersionStream(obj, c.version, stream)
}
// TODO: Make this behaviour default when we move everyone away from
// the unversioned types.
//

View File

@ -16,6 +16,10 @@ limitations under the License.
package runtime
import (
"io"
)
// ObjectScheme represents common conversions between formal external API versions
// and the internal Go structs. ObjectScheme is typically used with ObjectCodec to
// transform internal Go structs into serialized versions. There may be many valid
@ -45,6 +49,7 @@ type Decoder interface {
// Encoder defines methods for serializing API objects into bytes
type Encoder interface {
Encode(obj Object) (data []byte, err error)
EncodeToStream(obj Object, stream io.Writer) error
}
// Codec defines methods for serializing and deserializing API objects.
@ -67,6 +72,7 @@ type ObjectEncoder interface {
// to a specified output version. An error is returned if the object
// cannot be converted for any reason.
EncodeToVersion(obj Object, outVersion string) ([]byte, error)
EncodeToVersionStream(obj Object, outVersion string, stream io.Writer) error
}
// ObjectConvertor converts an object to a different version.

View File

@ -19,6 +19,7 @@ package runtime
import (
"encoding/json"
"fmt"
"io"
"net/url"
"reflect"
@ -434,6 +435,10 @@ func (s *Scheme) EncodeToVersion(obj Object, destVersion string) (data []byte, e
return s.raw.EncodeToVersion(obj, destVersion)
}
func (s *Scheme) EncodeToVersionStream(obj Object, destVersion string, stream io.Writer) error {
return s.raw.EncodeToVersionStream(obj, destVersion, stream)
}
// Decode converts a YAML or JSON string back into a pointer to an api object.
// Deduces the type based upon the APIVersion and Kind fields, which are set
// by Encode. Only versioned objects (APIVersion != "") are accepted. The object

View File

@ -246,13 +246,16 @@ func TestExtensionMapping(t *testing.T) {
}{
{
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionA{TestString: "foo"}}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}}`,
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"A","testString":"foo"}}
`,
}, {
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: &ExtensionB{TestString: "bar"}}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}}`,
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":{"kind":"B","testString":"bar"}}
`,
}, {
&InternalExtensionType{Extension: runtime.EmbeddedObject{Object: nil}},
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":null}`,
`{"kind":"ExtensionType","apiVersion":"testExternal","extension":null}
`,
},
}
@ -261,7 +264,7 @@ func TestExtensionMapping(t *testing.T) {
if err != nil {
t.Errorf("unexpected error '%v' (%#v)", err, item.obj)
} else if e, a := item.encoded, string(gotEncoded); e != a {
t.Errorf("expected %v, got %v", e, a)
t.Errorf("expected\n%#v\ngot\n%#v\n", e, a)
}
gotDecoded, err := scheme.Decode([]byte(item.encoded))