Merge pull request #23806 from smarterclayton/streaming_watch

Automatic merge from submit-queue

Implement a streaming serializer for watch

Changeover watch to use streaming serialization. Properly version the
watch objects. Implement simple framing for JSON and Protobuf (but not
YAML).

@wojtek-t @lavalamp
pull/6/head
k8s-merge-robot 2016-04-13 05:18:17 -07:00
commit f5e8e7453b
34 changed files with 1182 additions and 172 deletions

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
var SchemeGroupVersion = unversioned.GroupVersion{Group: "testgroup", Version: "v1"}
@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
&v1.DeleteOptions{},
&unversioned.Status{},
&v1.ExportOptions{})
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *TestType) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -63,6 +63,7 @@ func New() *Generator {
`+k8s.io/kubernetes/pkg/util/intstr`,
`+k8s.io/kubernetes/pkg/api/resource`,
`+k8s.io/kubernetes/pkg/runtime`,
`+k8s.io/kubernetes/pkg/watch/versioned`,
`k8s.io/kubernetes/pkg/api/unversioned`,
`k8s.io/kubernetes/pkg/api/v1`,
`k8s.io/kubernetes/pkg/apis/extensions/v1beta1`,

View File

@ -86,8 +86,9 @@ func Run() error {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
groupVersion.Version: restStorageMap,
},
Scheme: api.Scheme,
NegotiatedSerializer: api.Codecs,
Scheme: api.Scheme,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
}
if err := s.InstallAPIGroups([]genericapiserver.APIGroupInfo{apiGroupInfo}); err != nil {
return fmt.Errorf("Error in installing API: %v", err)

View File

@ -29,6 +29,9 @@ var Scheme = runtime.NewScheme()
// Codecs provides access to encoding and decoding for the scheme
var Codecs = serializer.NewCodecFactory(Scheme)
// StreamCodecs provides access to streaming encoding and decoding for the scheme
var StreamCodecs = serializer.NewStreamingCodecFactory(Scheme)
// GroupName is the group name use in this package
const GroupName = ""

View File

@ -38,7 +38,7 @@ import (
func init() {
codecsToTest = append(codecsToTest, func(version unversioned.GroupVersion, item runtime.Object) (runtime.Codec, error) {
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme))
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/arbitrary.content.type")
return api.Codecs.CodecForVersions(s, testapi.ExternalGroupVersions(), nil), nil
})
}
@ -65,7 +65,7 @@ func TestProtobufRoundTrip(t *testing.T) {
func BenchmarkEncodeCodecProtobuf(b *testing.B) {
items := benchmarkItems()
width := len(items)
s := protobuf.NewSerializer(nil, nil)
s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type")
b.ResetTimer()
for i := 0; i < b.N; i++ {
if _, err := runtime.Encode(s, &items[i%width]); err != nil {
@ -86,7 +86,7 @@ func BenchmarkEncodeCodecFromInternalProtobuf(b *testing.B) {
b.Fatal(err)
}
}
s := protobuf.NewSerializer(nil, nil)
s := protobuf.NewSerializer(nil, nil, "application/arbitrary.content.type")
codec := api.Codecs.EncoderForVersion(s, v1.SchemeGroupVersion)
b.ResetTimer()
for i := 0; i < b.N; i++ {

View File

@ -17,12 +17,15 @@ limitations under the License.
package api_test
import (
"encoding/hex"
"encoding/json"
"math/rand"
"reflect"
"strings"
"testing"
"github.com/davecgh/go-spew/spew"
proto "github.com/golang/protobuf/proto"
flag "github.com/spf13/pflag"
"github.com/ugorji/go/codec"
@ -58,6 +61,15 @@ func fuzzInternalObject(t *testing.T, forVersion unversioned.GroupVersion, item
return item
}
func dataAsString(data []byte) string {
dataString := string(data)
if !strings.HasPrefix(dataString, "{") {
dataString = "\n" + hex.Dump(data)
proto.NewBuffer(make([]byte, 0, 1024)).DebugPrint("decoded object", data)
}
return dataString
}
func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
printer := spew.ConfigState{DisableMethods: true}
@ -70,11 +82,12 @@ func roundTrip(t *testing.T, codec runtime.Codec, item runtime.Object) {
obj2, err := runtime.Decode(codec, data)
if err != nil {
t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, string(data), printer.Sprintf("%#v", item))
t.Errorf("0: %v: %v\nCodec: %v\nData: %s\nSource: %#v", name, err, codec, dataAsString(data), printer.Sprintf("%#v", item))
panic("failed")
return
}
if !api.Semantic.DeepEqual(item, obj2) {
t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), string(data), printer.Sprintf("%#v", obj2))
t.Errorf("\n1: %v: diff: %v\nCodec: %v\nSource:\n\n%#v\n\nEncoded:\n\n%s\n\nFinal:\n\n%#v", name, diff.ObjectGoPrintDiff(item, obj2), codec, printer.Sprintf("%#v", item), dataAsString(data), printer.Sprintf("%#v", obj2))
return
}
@ -135,7 +148,14 @@ func TestList(t *testing.T) {
roundTripSame(t, testapi.Default, item)
}
var nonRoundTrippableTypes = sets.NewString("ExportOptions")
var nonRoundTrippableTypes = sets.NewString(
"ExportOptions",
// WatchEvent does not include kind and version and can only be deserialized
// implicitly (if the caller expects the specific object). The watch call defines
// the schema by content type, rather than via kind/version included in each
// object.
"WatchEvent",
)
var nonInternalRoundTrippableTypes = sets.NewString("List", "ListOptions", "ExportOptions")
var nonRoundTrippableTypesByVersion = map[string][]string{}

View File

@ -19,6 +19,7 @@ package v1
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// GroupName is the group name use in this package
@ -87,6 +88,9 @@ func addKnownTypes(scheme *runtime.Scheme) {
// Add common types
scheme.AddKnownTypes(SchemeGroupVersion, &unversioned.Status{})
// Add the watch version that applies
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *Pod) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// GroupName is the group name use in this package
@ -42,6 +43,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
&Scale{},
&v1.ListOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *HorizontalPodAutoscaler) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// GroupName is the group name use in this package
@ -41,6 +42,7 @@ func addKnownTypes(scheme *runtime.Scheme) {
&JobList{},
&v1.ListOptions{},
)
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *Job) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// GroupName is the group name use in this package
@ -61,6 +62,8 @@ func addKnownTypes(scheme *runtime.Scheme) {
&PodSecurityPolicy{},
&PodSecurityPolicyList{},
)
// Add the watch version that applies
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *Deployment) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -20,6 +20,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
versionedwatch "k8s.io/kubernetes/pkg/watch/versioned"
)
// GroupName is the group name use in this package
@ -40,6 +41,8 @@ func addKnownTypes(scheme *runtime.Scheme) {
&RawPod{},
&v1.DeleteOptions{},
)
// Add the watch version that applies
versionedwatch.AddToGroupVersion(scheme, SchemeGroupVersion)
}
func (obj *RawNode) GetObjectKind() unversioned.ObjectKind { return &obj.TypeMeta }

View File

@ -34,7 +34,6 @@ import (
"k8s.io/kubernetes/pkg/apiserver/metrics"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"github.com/emicklei/go-restful"
)
@ -284,6 +283,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
isGetter = true
}
var versionedWatchEvent runtime.Object
if isWatcher {
versionedWatchEvent, err = a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent"))
if err != nil {
return nil, err
}
}
var (
connectOptions runtime.Object
versionedConnectOptions runtime.Object
@ -450,11 +457,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// test/integration/auth_test.go is currently the most comprehensive status code test
reqScope := RequestScope{
ContextFunc: ctxFn,
Serializer: a.group.Serializer,
ParameterCodec: a.group.ParameterCodec,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
ContextFunc: ctxFn,
Serializer: a.group.Serializer,
StreamSerializer: a.group.StreamSerializer,
ParameterCodec: a.group.ParameterCodec,
Creater: a.group.Creater,
Convertor: a.group.Convertor,
// TODO: This seems wrong for cross-group subresources. It makes an assumption that a subresource and its parent are in the same group version. Revisit this.
Resource: a.group.GroupVersion.WithResource(resource),
@ -633,9 +641,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("watch"+namespaced+kind+strings.Title(subresource)).
Produces("application/json").
Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
Writes(watchjson.WatchEvent{})
Produces(a.group.StreamSerializer.SupportedMediaTypes()...).
Returns(http.StatusOK, "OK", versionedWatchEvent).
Writes(versionedWatchEvent)
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
}
@ -652,9 +660,9 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Doc(doc).
Param(ws.QueryParameter("pretty", "If 'true', then the output is pretty printed.")).
Operation("watch"+namespaced+kind+strings.Title(subresource)+"List").
Produces("application/json").
Returns(http.StatusOK, "OK", watchjson.WatchEvent{}).
Writes(watchjson.WatchEvent{})
Produces(a.group.StreamSerializer.SupportedMediaTypes()...).
Returns(http.StatusOK, "OK", versionedWatchEvent).
Writes(versionedWatchEvent)
if err := addObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
}

View File

@ -84,8 +84,13 @@ type APIGroupVersion struct {
Mapper meta.RESTMapper
Serializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
// Serializer is used to determine how to convert responses from API methods into bytes to send over
// the wire.
Serializer runtime.NegotiatedSerializer
// StreamSerializer is used for sending a series of objects to the client over a single channel, where
// the underlying channel has no innate framing (such as an io.Writer)
StreamSerializer runtime.NegotiatedSerializer
ParameterCodec runtime.ParameterCodec
Typer runtime.ObjectTyper
Creater runtime.ObjectCreater

View File

@ -48,6 +48,7 @@ import (
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/pkg/version"
"k8s.io/kubernetes/pkg/watch"
"k8s.io/kubernetes/pkg/watch/versioned"
"k8s.io/kubernetes/plugin/pkg/admission/admit"
"k8s.io/kubernetes/plugin/pkg/admission/deny"
@ -160,6 +161,7 @@ func addTestTypes() {
// served in the tests.
api.Scheme.AddKnownTypes(testGroup2Version, &SimpleXGSubresource{})
api.Scheme.AddKnownTypes(testInternalGroup2Version, &SimpleXGSubresource{})
versioned.AddToGroupVersion(api.Scheme, testGroupVersion)
}
func addNewTestTypes() {
@ -174,8 +176,10 @@ func addNewTestTypes() {
}
api.Scheme.AddKnownTypes(newGroupVersion,
&apiservertesting.Simple{}, &apiservertesting.SimpleList{}, &ListOptions{},
&api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{})
api.Scheme.AddKnownTypes(newGroupVersion, &v1.Pod{})
&api.DeleteOptions{}, &apiservertesting.SimpleGetOptions{}, &apiservertesting.SimpleRoot{},
&v1.Pod{},
)
versioned.AddToGroupVersion(api.Scheme, newGroupVersion)
}
func init() {
@ -283,6 +287,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = grouplessGroupVersion
group.OptionsExternalVersion = &grouplessGroupVersion
group.Serializer = api.Codecs
group.StreamSerializer = api.StreamCodecs
if err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
@ -295,6 +300,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = testGroupVersion
group.OptionsExternalVersion = &testGroupVersion
group.Serializer = api.Codecs
group.StreamSerializer = api.StreamCodecs
if err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
@ -307,6 +313,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = newGroupVersion
group.OptionsExternalVersion = &newGroupVersion
group.Serializer = api.Codecs
group.StreamSerializer = api.StreamCodecs
if err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
@ -2408,8 +2415,9 @@ func TestUpdateREST(t *testing.T) {
GroupVersion: newGroupVersion,
OptionsExternalVersion: &newGroupVersion,
Serializer: api.Codecs,
ParameterCodec: api.ParameterCodec,
Serializer: api.Codecs,
StreamSerializer: api.StreamCodecs,
ParameterCodec: api.ParameterCodec,
}
}
@ -2492,8 +2500,9 @@ func TestParentResourceIsRequired(t *testing.T) {
GroupVersion: newGroupVersion,
OptionsExternalVersion: &newGroupVersion,
Serializer: api.Codecs,
ParameterCodec: api.ParameterCodec,
Serializer: api.Codecs,
StreamSerializer: api.StreamCodecs,
ParameterCodec: api.ParameterCodec,
}
container := restful.NewContainer()
if err := group.InstallREST(container); err == nil {
@ -2523,8 +2532,9 @@ func TestParentResourceIsRequired(t *testing.T) {
GroupVersion: newGroupVersion,
OptionsExternalVersion: &newGroupVersion,
Serializer: api.Codecs,
ParameterCodec: api.ParameterCodec,
Serializer: api.Codecs,
StreamSerializer: api.StreamCodecs,
ParameterCodec: api.ParameterCodec,
}
container = restful.NewContainer()
if err := group.InstallREST(container); err != nil {
@ -3236,6 +3246,7 @@ func TestXGSubresource(t *testing.T) {
GroupVersion: testGroupVersion,
OptionsExternalVersion: &testGroupVersion,
Serializer: api.Codecs,
StreamSerializer: api.StreamCodecs,
SubresourceGroupVersionKind: map[string]unversioned.GroupVersionKind{
"simple/subsimple": testGroup2Version.WithKind("SimpleXGSubresource"),

View File

@ -70,8 +70,11 @@ type ScopeNamer interface {
type RequestScope struct {
Namer ScopeNamer
ContextFunc
Serializer runtime.NegotiatedSerializer
Serializer runtime.NegotiatedSerializer
StreamSerializer runtime.NegotiatedSerializer
runtime.ParameterCodec
Creater runtime.ObjectCreater
Convertor runtime.ObjectConvertor

View File

@ -17,32 +17,27 @@ limitations under the License.
package apiserver
import (
"bytes"
"fmt"
"net/http"
"reflect"
"regexp"
"strings"
"time"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/httplog"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
utilruntime "k8s.io/kubernetes/pkg/util/runtime"
"k8s.io/kubernetes/pkg/util/wsstream"
"k8s.io/kubernetes/pkg/watch"
watchjson "k8s.io/kubernetes/pkg/watch/json"
"k8s.io/kubernetes/pkg/watch/versioned"
"github.com/emicklei/go-restful"
"github.com/golang/glog"
"golang.org/x/net/websocket"
)
var (
connectionUpgradeRegex = regexp.MustCompile("(^|.*,\\s*)upgrade($|\\s*,)")
// nothing will ever be sent down this channel
neverExitWatch <-chan time.Time = make(chan time.Time)
)
func isWebsocketRequest(req *http.Request) bool {
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
}
// nothing will ever be sent down this channel
var neverExitWatch <-chan time.Time = make(chan time.Time)
// timeoutFactory abstracts watch timeout logic for testing
type timeoutFactory interface {
@ -64,119 +59,218 @@ func (w *realTimeoutFactory) TimeoutCh() (<-chan time.Time, func() bool) {
return t.C, t.Stop
}
type textEncodable interface {
// EncodesAsText should return true if objects should be transmitted as a WebSocket Text
// frame (otherwise, they will be sent as a Binary frame).
EncodesAsText() bool
}
// serveWatch handles serving requests to the server
// TODO: the functionality in this method and in WatchServer.Serve is not cleanly decoupled.
func serveWatch(watcher watch.Interface, scope RequestScope, req *restful.Request, res *restful.Response, timeout time.Duration) {
s, mediaType, err := negotiateOutputSerializer(req.Request, scope.Serializer)
// negotiate for the stream serializer
serializer, mediaType, err := negotiateOutputSerializer(req.Request, scope.StreamSerializer)
if err != nil {
scope.err(err, res.ResponseWriter, req.Request)
return
}
// TODO: replace with typed serialization
if mediaType != "application/json" {
writeRawJSON(http.StatusNotAcceptable, (errNotAcceptable{[]string{"application/json"}}).Status(), res.ResponseWriter)
encoder := scope.StreamSerializer.EncoderForVersion(serializer, scope.Kind.GroupVersion())
useTextFraming := false
if encodable, ok := encoder.(textEncodable); ok && encodable.EncodesAsText() {
useTextFraming = true
}
// find the embedded serializer matching the media type
embeddedSerializer, ok := scope.Serializer.SerializerForMediaType(mediaType, nil)
if !ok {
scope.err(fmt.Errorf("no serializer defined for %q available for embedded encoding", mediaType), res.ResponseWriter, req.Request)
return
}
encoder := scope.Serializer.EncoderForVersion(s, scope.Kind.GroupVersion())
watchServer := &WatchServer{watcher, encoder, func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
glog.V(5).Infof("Failed to set self link for object %v: %v", reflect.TypeOf(obj), err)
}
}, &realTimeoutFactory{timeout}}
if isWebsocketRequest(req.Request) {
websocket.Handler(watchServer.HandleWS).ServeHTTP(httplog.Unlogged(res.ResponseWriter), req.Request)
} else {
watchServer.ServeHTTP(res.ResponseWriter, req.Request)
embeddedEncoder := scope.Serializer.EncoderForVersion(embeddedSerializer, scope.Kind.GroupVersion())
server := &WatchServer{
watching: watcher,
scope: scope,
useTextFraming: useTextFraming,
mediaType: mediaType,
encoder: encoder,
embeddedEncoder: embeddedEncoder,
fixup: func(obj runtime.Object) {
if err := setSelfLink(obj, req, scope.Namer); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to set link for object %v: %v", reflect.TypeOf(obj), err))
}
},
t: &realTimeoutFactory{timeout},
}
server.ServeHTTP(res.ResponseWriter, req.Request)
}
// WatchServer serves a watch.Interface over a websocket or vanilla HTTP.
type WatchServer struct {
watching watch.Interface
encoder runtime.Encoder
fixup func(runtime.Object)
t timeoutFactory
scope RequestScope
// true if websocket messages should use text framing (as opposed to binary framing)
useTextFraming bool
// the media type this watch is being served with
mediaType string
// used to encode the watch stream event itself
encoder runtime.Encoder
// used to encode the nested object in the watch stream
embeddedEncoder runtime.Encoder
fixup func(runtime.Object)
t timeoutFactory
}
// HandleWS implements a websocket handler.
func (w *WatchServer) HandleWS(ws *websocket.Conn) {
defer ws.Close()
done := make(chan struct{})
go func() {
var unused interface{}
// Expect this to block until the connection is closed. Client should not
// send anything.
websocket.JSON.Receive(ws, &unused)
close(done)
}()
for {
select {
case <-done:
w.watching.Stop()
return
case event, ok := <-w.watching.ResultChan():
if !ok {
// End of results.
return
}
w.fixup(event.Object)
obj, err := watchjson.Object(w.encoder, &event)
if err != nil {
// Client disconnect.
w.watching.Stop()
return
}
if err := websocket.JSON.Send(ws, obj); err != nil {
// Client disconnect.
w.watching.Stop()
return
}
}
}
}
// ServeHTTP serves a series of JSON encoded events via straight HTTP with
// Transfer-Encoding: chunked.
func (self *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
loggedW := httplog.LogOf(req, w)
// Serve serves a series of encoded events via HTTP with Transfer-Encoding: chunked
// or over a websocket connection.
func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w = httplog.Unlogged(w)
timeoutCh, cleanup := self.t.TimeoutCh()
defer cleanup()
defer self.watching.Stop()
if wsstream.IsWebSocketRequest(req) {
w.Header().Set("Content-Type", s.mediaType)
websocket.Handler(s.HandleWS).ServeHTTP(w, req)
return
}
cn, ok := w.(http.CloseNotifier)
if !ok {
loggedW.Addf("unable to get CloseNotifier: %#v", w)
http.NotFound(w, req)
err := fmt.Errorf("unable to start watch - can't get http.CloseNotifier: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
return
}
flusher, ok := w.(http.Flusher)
if !ok {
loggedW.Addf("unable to get Flusher: %#v", w)
http.NotFound(w, req)
err := fmt.Errorf("unable to start watch - can't get http.Flusher: %#v", w)
utilruntime.HandleError(err)
s.scope.err(errors.NewInternalError(err), w, req)
return
}
// get a framed encoder
f, ok := s.encoder.(streaming.Framer)
if !ok {
// programmer error
err := fmt.Errorf("no streaming support is available for media type %q", s.mediaType)
utilruntime.HandleError(err)
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
framer := f.NewFrameWriter(w)
if framer == nil {
// programmer error
err := fmt.Errorf("no stream framing support is available for media type %q", s.mediaType)
utilruntime.HandleError(err)
s.scope.err(errors.NewBadRequest(err.Error()), w, req)
return
}
e := streaming.NewEncoder(framer, s.encoder)
// ensure the connection times out
timeoutCh, cleanup := s.t.TimeoutCh()
defer cleanup()
defer s.watching.Stop()
// begin the stream
w.Header().Set("Content-Type", s.mediaType)
w.Header().Set("Transfer-Encoding", "chunked")
w.WriteHeader(http.StatusOK)
flusher.Flush()
// TODO: use arbitrary serialization on watch
encoder := watchjson.NewEncoder(w, self.encoder)
buf := &bytes.Buffer{}
for {
select {
case <-cn.CloseNotify():
return
case <-timeoutCh:
return
case event, ok := <-self.watching.ResultChan():
case event, ok := <-s.watching.ResultChan():
if !ok {
// End of results.
return
}
self.fixup(event.Object)
if err := encoder.Encode(&event); err != nil {
// Client disconnect.
obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
}
event.Object = &runtime.Unknown{
Raw: buf.Bytes(),
// ContentType is not required here because we are defaulting to the serializer
// type
}
if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
// client disconnect.
return
}
flusher.Flush()
buf.Reset()
}
}
}
// HandleWS implements a websocket handler.
func (s *WatchServer) HandleWS(ws *websocket.Conn) {
defer ws.Close()
done := make(chan struct{})
go wsstream.IgnoreReceives(ws, 0)
buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{}
for {
select {
case <-done:
s.watching.Stop()
return
case event, ok := <-s.watching.ResultChan():
if !ok {
// End of results.
return
}
obj := event.Object
s.fixup(obj)
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
// unexpected error
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return
}
event.Object = &runtime.Unknown{
Raw: buf.Bytes(),
// ContentType is not required here because we are defaulting to the serializer
// type
}
// the internal event will be versioned by the encoder
internalEvent := versioned.InternalEvent(event)
if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil {
// encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
s.watching.Stop()
return
}
if s.useTextFraming {
if err := websocket.Message.Send(ws, streamBuf.String()); err != nil {
// Client disconnect.
s.watching.Stop()
return
}
} else {
if err := websocket.Message.Send(ws, streamBuf.Bytes()); err != nil {
// Client disconnect.
s.watching.Stop()
return
}
}
buf.Reset()
streamBuf.Reset()
}
}
}

View File

@ -72,7 +72,7 @@ func TestWatchWebsocket(t *testing.T) {
ws, err := websocket.Dial(dest.String(), "", "http://localhost")
if err != nil {
t.Errorf("unexpected error: %v", err)
t.Fatalf("unexpected error: %v", err)
}
try := func(action watch.EventType, object runtime.Object) {
@ -89,7 +89,7 @@ func TestWatchWebsocket(t *testing.T) {
}
gotObj, err := runtime.Decode(codec, got.Object)
if err != nil {
t.Fatalf("Decode error: %v", err)
t.Fatalf("Decode error: %v\n%v", err, got)
}
if _, err := api.GetReference(gotObj); err != nil {
t.Errorf("Unable to construct reference: %v", err)
@ -381,10 +381,14 @@ func TestWatchHTTPTimeout(t *testing.T) {
// Setup a new watchserver
watchServer := &WatchServer{
watcher,
newCodec,
func(obj runtime.Object) {},
&fakeTimeoutFactory{timeoutCh, done},
watching: watcher,
mediaType: "testcase/json",
encoder: newCodec,
embeddedEncoder: newCodec,
fixup: func(obj runtime.Object) {},
t: &fakeTimeoutFactory{timeoutCh, done},
}
s := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
@ -526,3 +530,52 @@ func BenchmarkWatchWebsocket(b *testing.B) {
wg.Wait()
b.StopTimer()
}
// BenchmarkWatchProtobuf measures the cost of serving a watch.
func BenchmarkWatchProtobuf(b *testing.B) {
items := benchmarkItems()
simpleStorage := &SimpleRESTStorage{}
handler := handle(map[string]rest.Storage{"simples": simpleStorage})
server := httptest.NewServer(handler)
defer server.Close()
client := http.Client{}
dest, _ := url.Parse(server.URL)
dest.Path = "/" + prefix + "/" + newGroupVersion.Group + "/" + newGroupVersion.Version + "/watch/simples"
dest.RawQuery = ""
request, err := http.NewRequest("GET", dest.String(), nil)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
request.Header.Set("Accept", "application/vnd.kubernetes.protobuf")
response, err := client.Do(request)
if err != nil {
b.Fatalf("unexpected error: %v", err)
}
if response.StatusCode != http.StatusOK {
body, _ := ioutil.ReadAll(response.Body)
b.Fatalf("Unexpected response %#v\n%s", response, body)
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer response.Body.Close()
if _, err := io.Copy(ioutil.Discard, response.Body); err != nil {
b.Fatal(err)
}
wg.Done()
}()
actions := []watch.EventType{watch.Added, watch.Modified, watch.Deleted}
b.ResetTimer()
for i := 0; i < b.N; i++ {
simpleStorage.fakeWatch.Action(actions[i%len(actions)], &items[i%len(items)])
}
simpleStorage.fakeWatch.Stop()
wg.Wait()
b.StopTimer()
}

View File

@ -184,6 +184,8 @@ type APIGroupInfo struct {
Scheme *runtime.Scheme
// NegotiatedSerializer controls how this group encodes and decodes data
NegotiatedSerializer runtime.NegotiatedSerializer
// NegotiatedStreamSerializer controls how streaming responses are encoded and decoded.
NegotiatedStreamSerializer runtime.NegotiatedSerializer
// ParameterCodec performs conversions for query parameters passed to API calls
ParameterCodec runtime.ParameterCodec
@ -864,6 +866,7 @@ func (s *GenericAPIServer) getAPIGroupVersion(apiGroupInfo *APIGroupInfo, groupV
version.Storage = storage
version.ParameterCodec = apiGroupInfo.ParameterCodec
version.Serializer = apiGroupInfo.NegotiatedSerializer
version.StreamSerializer = apiGroupInfo.NegotiatedStreamSerializer
version.Creater = apiGroupInfo.Scheme
version.Convertor = apiGroupInfo.Scheme
version.Typer = apiGroupInfo.Scheme

View File

@ -130,6 +130,7 @@ func TestInstallAPIGroups(t *testing.T) {
IsLegacyGroup: true,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
},
{
// extensions group version
@ -138,6 +139,7 @@ func TestInstallAPIGroups(t *testing.T) {
OptionsExternalVersion: &apiGroupMeta.GroupVersion,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
},
}
s.InstallAPIGroups(apiGroupsInfo)

View File

@ -193,10 +193,11 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": m.v1ResourcesStorage,
},
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
IsLegacyGroup: true,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
}
if autoscalingGroupVersion := (unversioned.GroupVersion{Group: "autoscaling", Version: "v1"}); registered.IsEnabledVersion(autoscalingGroupVersion) {
apiGroupInfo.SubresourceGroupVersionKind = map[string]unversioned.GroupVersionKind{
@ -252,10 +253,11 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1beta1": extensionResources,
},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
@ -284,10 +286,11 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": autoscalingResources,
},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
@ -316,10 +319,11 @@ func (m *Master) InstallAPIs(c *Config) {
VersionedResourcesStorageMap: map[string]map[string]rest.Storage{
"v1": batchResources,
},
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
OptionsExternalVersion: &registered.GroupOrDie(api.GroupName).GroupVersion,
Scheme: api.Scheme,
ParameterCodec: api.ParameterCodec,
NegotiatedSerializer: api.Codecs,
NegotiatedStreamSerializer: api.StreamCodecs,
}
apiGroupsInfo = append(apiGroupsInfo, apiGroupInfo)
@ -660,8 +664,9 @@ func (m *Master) thirdpartyapi(group, kind, version string) *apiserver.APIGroupV
Storage: storage,
OptionsExternalVersion: &optionsExternalVersion,
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
Serializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.Codecs, kind, externalVersion, internalVersion),
StreamSerializer: thirdpartyresourcedata.NewNegotiatedSerializer(api.StreamCodecs, kind, externalVersion, internalVersion),
ParameterCodec: thirdpartyresourcedata.NewThirdPartyParameterCodec(api.ParameterCodec),
Context: m.RequestContextMapper,

View File

@ -432,7 +432,8 @@ func (t *thirdPartyResourceDataCreator) New(kind unversioned.GroupVersionKind) (
return nil, fmt.Errorf("unknown kind %v", kind)
}
return &extensions.ThirdPartyResourceDataList{}, nil
case "ListOptions":
// TODO: this list needs to be formalized higher in the chain
case "ListOptions", "WatchEvent":
if apiutil.GetGroupVersion(t.group, t.version) == kind.GroupVersion().String() {
// Translate third party group to external group.
gvk := registered.EnabledVersionsForGroup(api.GroupName)[0].WithKind(kind.Kind)

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/framer"
utilyaml "k8s.io/kubernetes/pkg/util/yaml"
)
@ -192,3 +193,31 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
}
return ok, nil
}
// NewFrameWriter implements stream framing for this serializer
func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer {
if s.yaml {
// TODO: needs document framing
return nil
}
// we can write JSON objects directly to the writer, because they are self-framing
return w
}
// NewFrameReader implements stream framing for this serializer
func (s *Serializer) NewFrameReader(r io.Reader) io.Reader {
if s.yaml {
// TODO: needs document framing
return nil
}
// we need to extract the JSON chunks of data to pass to Decode()
return framer.NewJSONFramedReader(r)
}
// EncodesAsText returns true because both JSON and YAML are considered textual representations
// of data. This is used to determine whether the serialized object should be transmitted over
// a WebSocket Text or Binary frame. This must remain true for legacy compatibility with v1.1
// watch over websocket implementations.
func (s *Serializer) EncodesAsText() bool {
return true
}

View File

@ -23,12 +23,12 @@ import (
"fmt"
"io"
"reflect"
"sync"
"github.com/gogo/protobuf/proto"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/framer"
)
var (
@ -41,11 +41,6 @@ var (
//
// This encoding scheme is experimental, and is subject to change at any time.
protoEncodingPrefix = []byte{0x6b, 0x38, 0x73, 0x00}
bufferSize = uint64(16384)
availableBuffers = sync.Pool{New: func() interface{} {
return make([]byte, bufferSize)
}}
)
type errNotMarshalable struct {
@ -76,9 +71,10 @@ func NewSerializer(creater runtime.ObjectCreater, typer runtime.Typer, defaultCo
}
type Serializer struct {
prefix []byte
creater runtime.ObjectCreater
typer runtime.Typer
prefix []byte
creater runtime.ObjectCreater
typer runtime.Typer
contentType string
}
var _ runtime.Serializer = &Serializer{}
@ -168,7 +164,7 @@ func (s *Serializer) Decode(originalData []byte, gvk *unversioned.GroupVersionKi
return nil, actual, runtime.NewMissingVersionErr(fmt.Sprintf("%#v", unk.TypeMeta))
}
return unmarshalToObject(s.typer, s.creater, actual, into)
return unmarshalToObject(s.typer, s.creater, actual, into, unk.Raw)
}
// EncodeToStream serializes the provided object to the given writer. Overrides is ignored.
@ -246,6 +242,16 @@ func (s *Serializer) RecognizesData(peek io.Reader) (bool, error) {
return bytes.Equal(s.prefix, prefix), nil
}
// NewFrameWriter implements stream framing for this serializer
func (s *Serializer) NewFrameWriter(w io.Writer) io.Writer {
return framer.NewLengthDelimitedFrameWriter(w)
}
// NewFrameReader implements stream framing for this serializer
func (s *Serializer) NewFrameReader(r io.Reader) io.Reader {
return framer.NewLengthDelimitedFrameReader(r)
}
// copyKindDefaults defaults dst to the value in src if dst does not have a value set.
func copyKindDefaults(dst, src *unversioned.GroupVersionKind) {
if src == nil {
@ -374,13 +380,13 @@ func (s *RawSerializer) Decode(originalData []byte, gvk *unversioned.GroupVersio
return nil, actual, runtime.NewMissingVersionErr("<protobuf encoded body - must provide default type>")
}
return unmarshalToObject(s.typer, s.creater, actual, into)
return unmarshalToObject(s.typer, s.creater, actual, into, data)
}
// unmarshalToObject is the common code between decode in the raw and normal serializer.
func unmarshalToObject(typer runtime.Typer, creater runtime.ObjectCreater, actual *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
func unmarshalToObject(typer runtime.Typer, creater runtime.ObjectCreater, actual *unversioned.GroupVersionKind, into runtime.Object, data []byte) (runtime.Object, *unversioned.GroupVersionKind, error) {
// use the target if necessary
obj, err := runtime.UseOrCreateObject(s.typer, s.creater, *actual, into)
obj, err := runtime.UseOrCreateObject(typer, creater, *actual, into)
if err != nil {
return nil, actual, err
}
@ -430,3 +436,13 @@ func (s *RawSerializer) EncodeToStream(obj runtime.Object, w io.Writer, override
func (s *RawSerializer) RecognizesData(peek io.Reader) (bool, error) {
return false, nil
}
// NewFrameWriter implements stream framing for this serializer
func (s *RawSerializer) NewFrameWriter(w io.Writer) io.Writer {
return framer.NewLengthDelimitedFrameWriter(w)
}
// NewFrameReader implements stream framing for this serializer
func (s *RawSerializer) NewFrameReader(r io.Reader) io.Reader {
return framer.NewLengthDelimitedFrameReader(r)
}

View File

@ -22,11 +22,17 @@ import (
"bytes"
"encoding/hex"
"fmt"
"reflect"
"strings"
"testing"
"k8s.io/kubernetes/pkg/api"
_ "k8s.io/kubernetes/pkg/api/install"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/protobuf"
"k8s.io/kubernetes/pkg/util/diff"
)
type testObject struct {
@ -185,3 +191,151 @@ func TestEncode(t *testing.T) {
}
}
}
func TestDecode(t *testing.T) {
wire1 := []byte{
0x6b, 0x38, 0x73, 0x00, // prefix
0x0a, 0x04,
0x0a, 0x00, // apiversion
0x12, 0x00, // kind
0x12, 0x00, // data
0x1a, 0x00, // content-type
0x22, 0x00, // content-encoding
}
wire2 := []byte{
0x6b, 0x38, 0x73, 0x00, // prefix
0x0a, 0x15,
0x0a, 0x0d, 0x6f, 0x74, 0x68, 0x65, 0x72, 0x2f, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, // apiversion
0x12, 0x04, 0x74, 0x65, 0x73, 0x74, // kind
0x12, 0x03, 0x01, 0x02, 0x03, // data
0x1a, 0x00, // content-type
0x22, 0x00, // content-encoding
}
//err1 := fmt.Errorf("a test error")
testCases := []struct {
obj runtime.Object
data []byte
errFn func(error) bool
}{
{
obj: &runtime.Unknown{},
errFn: func(err error) bool { return err.Error() == "empty data" },
},
{
data: []byte{0x6b},
errFn: func(err error) bool { return strings.Contains(err.Error(), "does not appear to be a protobuf message") },
},
{
obj: &runtime.Unknown{
ContentType: "application/protobuf",
Raw: []byte{},
},
data: wire1,
},
{
obj: &runtime.Unknown{
TypeMeta: runtime.TypeMeta{
APIVersion: "other/version",
Kind: "test",
},
ContentType: "application/protobuf",
Raw: []byte{0x01, 0x02, 0x03},
},
data: wire2,
},
}
for i, test := range testCases {
s := protobuf.NewSerializer(nil, nil, "application/protobuf")
unk := &runtime.Unknown{}
err := runtime.DecodeInto(s, test.data, unk)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: %v", i, err)
continue
case err != nil && test.errFn == nil:
t.Errorf("%d: failed: %v", i, err)
continue
case err != nil:
if !test.errFn(err) {
t.Errorf("%d: failed: %v", i, err)
}
continue
}
if !reflect.DeepEqual(unk, test.obj) {
t.Errorf("%d: unexpected object:\n%#v", i, unk)
continue
}
}
}
func TestDecodeObjects(t *testing.T) {
obj1 := &v1.Pod{
ObjectMeta: v1.ObjectMeta{
Name: "cool",
},
Spec: v1.PodSpec{
Containers: []v1.Container{
{
Name: "test",
},
},
},
}
obj1wire, err := obj1.Marshal()
if err != nil {
t.Fatal(err)
}
wire1, err := (&runtime.Unknown{
TypeMeta: runtime.TypeMeta{Kind: "Pod", APIVersion: "v1"},
Raw: obj1wire,
}).Marshal()
if err != nil {
t.Fatal(err)
}
wire1 = append([]byte{0x6b, 0x38, 0x73, 0x00}, wire1...)
testCases := []struct {
obj runtime.Object
data []byte
errFn func(error) bool
}{
{
obj: obj1,
data: wire1,
},
}
for i, test := range testCases {
s := protobuf.NewSerializer(api.Scheme, runtime.ObjectTyperToTyper(api.Scheme), "application/protobuf")
obj, err := runtime.Decode(s, test.data)
switch {
case err == nil && test.errFn != nil:
t.Errorf("%d: failed: %v", i, err)
continue
case err != nil && test.errFn == nil:
t.Errorf("%d: failed: %v", i, err)
continue
case err != nil:
if !test.errFn(err) {
t.Errorf("%d: failed: %v", i, err)
}
if obj != nil {
t.Errorf("%d: should not have returned an object", i)
}
continue
}
if !api.Semantic.DeepEqual(obj, test.obj) {
t.Errorf("%d: unexpected object:\n%s", i, diff.ObjectGoPrintDiff(test.obj, obj))
continue
}
}
}

View File

@ -20,6 +20,7 @@ package streaming
import (
"bytes"
"fmt"
"io"
"k8s.io/kubernetes/pkg/api/unversioned"
@ -52,9 +53,11 @@ type Serializer interface {
}
type decoder struct {
reader io.Reader
decoder runtime.Decoder
buf []byte
reader io.Reader
decoder runtime.Decoder
buf []byte
maxBytes int
resetRead bool
}
// NewDecoder creates a streaming decoder that reads object chunks from r and decodes them with d.
@ -62,22 +65,50 @@ type decoder struct {
// an entire object.
func NewDecoder(r io.Reader, d runtime.Decoder) Decoder {
return &decoder{
reader: r,
decoder: d,
buf: make([]byte, 1024*1024),
reader: r,
decoder: d,
buf: make([]byte, 1024),
maxBytes: 1024 * 1024,
}
}
var ErrObjectTooLarge = fmt.Errorf("object to decode was longer than maximum allowed size")
// Decode reads the next object from the stream and decodes it.
func (d *decoder) Decode(defaults *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
// TODO: instead of depending on a fixed sized buffer, we should handle ErrShortRead specially and
// grow the buffer capacity up to a maximum amount. Requires the framer to allow repeated reads to
// the stream until the frame is finished.
n, err := d.reader.Read(d.buf)
if err != nil {
return nil, nil, err
base := 0
for {
n, err := d.reader.Read(d.buf[base:])
if err == io.ErrShortBuffer {
if n == 0 {
return nil, nil, fmt.Errorf("got short buffer with n=0, base=%d, cap=%d", base, cap(d.buf))
}
if d.resetRead {
continue
}
// double the buffer size up to maxBytes
if cap(d.buf) < d.maxBytes {
base += n
d.buf = append(d.buf, make([]byte, cap(d.buf))...)
continue
}
// must read the rest of the frame (until we stop getting ErrShortBuffer)
d.resetRead = true
base = 0
return nil, nil, ErrObjectTooLarge
}
if err != nil {
return nil, nil, err
}
if d.resetRead {
// now that we have drained the large read, continue
d.resetRead = false
continue
}
base += n
break
}
return d.decoder.Decode(d.buf[:n], defaults, into)
return d.decoder.Decode(d.buf[:base], defaults, into)
}
type encoder struct {

View File

@ -0,0 +1,83 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package streaming
import (
"bytes"
"io"
"testing"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/framer"
)
type fakeDecoder struct {
got []byte
obj runtime.Object
err error
}
func (d *fakeDecoder) Decode(data []byte, gvk *unversioned.GroupVersionKind, into runtime.Object) (runtime.Object, *unversioned.GroupVersionKind, error) {
d.got = data
return d.obj, nil, d.err
}
func TestEmptyDecoder(t *testing.T) {
buf := bytes.NewBuffer([]byte{})
d := &fakeDecoder{}
_, _, err := NewDecoder(buf, d).Decode(nil, nil)
if err != io.EOF {
t.Fatal(err)
}
}
func TestDecoder(t *testing.T) {
frames := [][]byte{
make([]byte, 1025),
make([]byte, 1024*5),
make([]byte, 1024*1024*5),
make([]byte, 1025),
}
pr, pw := io.Pipe()
fw := framer.NewLengthDelimitedFrameWriter(pw)
go func() {
for i := range frames {
fw.Write(frames[i])
}
pw.Close()
}()
r := framer.NewLengthDelimitedFrameReader(pr)
d := &fakeDecoder{}
dec := NewDecoder(r, d)
if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[0]) {
t.Fatalf("unexpected %v %v", err, len(d.got))
}
if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[1]) {
t.Fatalf("unexpected %v %v", err, len(d.got))
}
if _, _, err := dec.Decode(nil, nil); err != ErrObjectTooLarge || !bytes.Equal(d.got, frames[1]) {
t.Fatalf("unexpected %v %v", err, len(d.got))
}
if _, _, err := dec.Decode(nil, nil); err != nil || !bytes.Equal(d.got, frames[3]) {
t.Fatalf("unexpected %v %v", err, len(d.got))
}
if _, _, err := dec.Decode(nil, nil); err != io.EOF {
t.Fatalf("unexpected %v %v", err, len(d.got))
}
}

View File

@ -22,6 +22,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/runtime/serializer/streaming"
)
// EnableCrossGroupDecoding modifies the given decoder in place, if it is a codec
@ -277,6 +278,24 @@ func (c *codec) EncodeToStream(obj runtime.Object, w io.Writer, overrides ...unv
return c.encoder.EncodeToStream(obj, w, overrides...)
}
// NewFrameWriter calls into the nested encoder to expose its framing
func (c *codec) NewFrameWriter(w io.Writer) io.Writer {
f, ok := c.encoder.(streaming.Framer)
if !ok {
return nil
}
return f.NewFrameWriter(w)
}
// NewFrameReader calls into the nested decoder to expose its framing
func (c *codec) NewFrameReader(r io.Reader) io.Reader {
f, ok := c.decoder.(streaming.Framer)
if !ok {
return nil
}
return f.NewFrameReader(r)
}
// promoteOrPrependGroupVersion finds the group version in the provided group versions that has the same group as target.
// If the group is found the returned array will have that group version in the first position - if the group is not found
// the returned array will have target in the first position.

156
pkg/util/framer/framer.go Normal file
View File

@ -0,0 +1,156 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package framer implements simple frame decoding techniques for an io.ReadCloser
package framer
import (
"encoding/binary"
"encoding/json"
"io"
)
type lengthDelimitedFrameWriter struct {
w io.Writer
}
func NewLengthDelimitedFrameWriter(w io.Writer) io.Writer {
return &lengthDelimitedFrameWriter{w: w}
}
// Write writes a single frame to the nested writer, prepending it with the length in
// in bytes of data (as a 4 byte, bigendian uint32).
func (w *lengthDelimitedFrameWriter) Write(data []byte) (int, error) {
header := [4]byte{}
binary.BigEndian.PutUint32(header[:], uint32(len(data)))
n, err := w.w.Write(header[:])
if err != nil {
return 0, err
}
if n != len(header) {
return 0, io.ErrShortWrite
}
return w.w.Write(data)
}
type lengthDelimitedFrameReader struct {
r io.Reader
remaining int
}
// NewLengthDelimitedFrameReader returns an io.Reader that will decode length-prefixed
// frames off of a stream.
//
// The protocol is:
//
// stream: message ...
// message: prefix body
// prefix: 4 byte uint32 in BigEndian order, denotes length of body
// body: bytes (0..prefix)
//
// If the buffer passed to Read is not long enough to contain an entire frame, io.ErrShortRead
// will be returned along with the number of bytes read.
func NewLengthDelimitedFrameReader(r io.Reader) io.Reader {
return &lengthDelimitedFrameReader{r: r}
}
// Read attempts to read an entire frame into data. If that is not possible, io.ErrShortBuffer
// is returned and subsequent calls will attempt to read the last frame. A frame is complete when
// err is nil.
func (r *lengthDelimitedFrameReader) Read(data []byte) (int, error) {
if r.remaining <= 0 {
header := [4]byte{}
n, err := io.ReadAtLeast(r.r, header[:4], 4)
if err != nil {
return 0, err
}
if n != 4 {
return 0, io.ErrUnexpectedEOF
}
frameLength := int(binary.BigEndian.Uint32(header[:]))
r.remaining = frameLength
}
expect := r.remaining
max := expect
if max > len(data) {
max = len(data)
}
n, err := io.ReadAtLeast(r.r, data[:max], int(max))
r.remaining -= n
if err == io.ErrShortBuffer || r.remaining > 0 {
return n, io.ErrShortBuffer
}
if err != nil {
return n, err
}
if n != expect {
return n, io.ErrUnexpectedEOF
}
return n, nil
}
type jsonFrameReader struct {
decoder *json.Decoder
remaining []byte
}
// NewJSONFramedReader returns an io.Reader that will decode individual JSON objects off
// of a wire.
//
// The boundaries between each frame are valid JSON objects. A JSON parsing error will terminate
// the read.
func NewJSONFramedReader(r io.Reader) io.Reader {
return &jsonFrameReader{
decoder: json.NewDecoder(r),
}
}
// ReadFrame decodes the next JSON object in the stream, or returns an error. The returned
// byte slice will be modified the next time ReadFrame is invoked and should not be altered.
func (r *jsonFrameReader) Read(data []byte) (int, error) {
// Return whatever remaining data exists from an in progress frame
if n := len(r.remaining); n > 0 {
if n <= cap(data) {
data = append(data[0:0], r.remaining...)
r.remaining = nil
return n, nil
}
n = cap(data)
data = append(data[0:0], r.remaining[:n]...)
r.remaining = r.remaining[n:]
return n, io.ErrShortBuffer
}
// RawMessage#Unmarshal appends to data - we reset the slice down to 0 and will either see
// data written to data, or be larger than data and a different array.
m := json.RawMessage(data[:0])
if err := r.decoder.Decode(&m); err != nil {
return 0, err
}
// If capacity of data is less than length of the message, decoder will allocate a new slice
// and set m to it, which means we need to copy the partial result back into data and preserve
// the remaining result for subsequent reads.
if n := cap(data); len(m) > n {
data = append(data[0:0], m[:n]...)
r.remaining = m[n:]
return n, io.ErrShortBuffer
}
return len(m), nil
}

View File

@ -0,0 +1,175 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package framer
import (
"bytes"
"io"
"testing"
)
func TestRead(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x04,
0x01, 0x02, 0x03, 0x04,
0x00, 0x00, 0x00, 0x03,
0x05, 0x06, 0x07,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x08,
}
b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b)
buf := make([]byte, 1)
if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x02}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read the remaining frame
buf = make([]byte, 2)
if n, err := r.Read(buf); err != nil && n != 2 && bytes.Equal(buf, []byte{0x03, 0x04}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read with buffer equal to frame
buf = make([]byte, 3)
if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x07}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read empty frame
buf = make([]byte, 3)
if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read with larger buffer than frame
buf = make([]byte, 3)
if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read EOF
if n, err := r.Read(buf); err != io.EOF && n != 0 {
t.Fatalf("unexpected: %v %d", err, n)
}
}
func TestReadLarge(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x04,
0x01, 0x02, 0x03, 0x04,
0x00, 0x00, 0x00, 0x03,
0x05, 0x06, 0x07,
0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x01,
0x08,
}
b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b)
buf := make([]byte, 40)
if n, err := r.Read(buf); err != nil && n != 4 && bytes.Equal(buf, []byte{0x01, 0x02, 0x03, 0x04}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
if n, err := r.Read(buf); err != nil && n != 3 && bytes.Equal(buf, []byte{0x05, 0x06, 0x7}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
if n, err := r.Read(buf); err != nil && n != 0 && bytes.Equal(buf, []byte{}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
if n, err := r.Read(buf); err != nil && n != 1 && bytes.Equal(buf, []byte{0x08}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read EOF
if n, err := r.Read(buf); err != io.EOF && n != 0 {
t.Fatalf("unexpected: %v %d", err, n)
}
}
func TestReadInvalidFrame(t *testing.T) {
data := []byte{
0x00, 0x00, 0x00, 0x04,
0x01, 0x02,
}
b := bytes.NewBuffer(data)
r := NewLengthDelimitedFrameReader(b)
buf := make([]byte, 1)
if n, err := r.Read(buf); err != io.ErrShortBuffer && n != 1 && bytes.Equal(buf, []byte{0x01}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read the remaining frame
buf = make([]byte, 3)
if n, err := r.Read(buf); err != io.ErrUnexpectedEOF && n != 1 && bytes.Equal(buf, []byte{0x02}) {
t.Fatalf("unexpected: %v %d %v", err, n, buf)
}
// read EOF
if n, err := r.Read(buf); err != io.EOF && n != 0 {
t.Fatalf("unexpected: %v %d", err, n)
}
}
func TestJSONFrameReader(t *testing.T) {
b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]")
r := NewJSONFramedReader(b)
buf := make([]byte, 20)
if n, err := r.Read(buf); err != nil || n != 13 || string(buf[:n]) != `{"test":true}` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != nil || n != 5 || string(buf[:n]) != `["a"]` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.EOF || n != 0 {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
}
func TestJSONFrameReaderShortBuffer(t *testing.T) {
b := bytes.NewBufferString("{\"test\":true}\n1\n[\"a\"]")
r := NewJSONFramedReader(b)
buf := make([]byte, 0, 3)
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `{"t` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `est` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `":t` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `rue` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `}` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != nil || n != 1 || string(buf[:n]) != `1` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.ErrShortBuffer || n != 3 || string(buf[:n]) != `["a` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != nil || n != 2 || string(buf[:n]) != `"]` {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
if n, err := r.Read(buf); err != io.EOF || n != 0 {
t.Fatalf("unexpected: %v %d %q", err, n, buf)
}
}

View File

@ -89,9 +89,9 @@ func IsWebSocketRequest(req *http.Request) bool {
return connectionUpgradeRegex.MatchString(strings.ToLower(req.Header.Get("Connection"))) && strings.ToLower(req.Header.Get("Upgrade")) == "websocket"
}
// ignoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// IgnoreReceives reads from a WebSocket until it is closed, then returns. If timeout is set, the
// read and write deadlines are pushed every time a new message is received.
func ignoreReceives(ws *websocket.Conn, timeout time.Duration) {
func IgnoreReceives(ws *websocket.Conn, timeout time.Duration) {
defer runtime.HandleCrash()
var data []byte
for {

View File

@ -82,7 +82,7 @@ func (r *Reader) handle(ws *websocket.Conn) {
encode := len(ws.Config().Protocol) > 0 && ws.Config().Protocol[0] == base64BinaryWebSocketProtocol
defer close(r.err)
defer ws.Close()
go ignoreReceives(ws, r.timeout)
go IgnoreReceives(ws, r.timeout)
r.err <- messageCopy(ws, r.r, encode, r.ping, r.timeout)
}

View File

@ -166,7 +166,7 @@ func readWebSocket(r *Reader, t *testing.T, fn func(*websocket.Conn), protocols
s, addr := newServer(func(ws *websocket.Conn) {
cfg := ws.Config()
cfg.Protocol = protocols
go ignoreReceives(ws, 0)
go IgnoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err
@ -198,7 +198,7 @@ func expectWebSocketFrames(r *Reader, t *testing.T, fn func(*websocket.Conn), fr
s, addr := newServer(func(ws *websocket.Conn) {
cfg := ws.Config()
cfg.Protocol = protocols
go ignoreReceives(ws, 0)
go IgnoreReceives(ws, 0)
go func() {
err := <-r.err
errCh <- err

View File

@ -0,0 +1,84 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package versioned
import (
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/conversion"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/watch"
)
// WatchEventKind is name reserved for serializing watch events.
const WatchEventKind = "WatchEvent"
// AddToGroupVersion registers the watch external and internal kinds with the scheme, and ensures the proper
// conversions are in place.
func AddToGroupVersion(scheme *runtime.Scheme, groupVersion unversioned.GroupVersion) {
scheme.AddKnownTypeWithName(groupVersion.WithKind(WatchEventKind), &Event{})
scheme.AddKnownTypeWithName(
unversioned.GroupVersion{Group: groupVersion.Group, Version: runtime.APIVersionInternal}.WithKind(WatchEventKind),
&InternalEvent{},
)
scheme.AddConversionFuncs(
Convert_versioned_Event_to_watch_Event,
Convert_versioned_InternalEvent_to_versioned_Event,
Convert_watch_Event_to_versioned_Event,
Convert_versioned_Event_to_versioned_InternalEvent,
)
}
func Convert_watch_Event_to_versioned_Event(in *watch.Event, out *Event, s conversion.Scope) error {
out.Type = string(in.Type)
switch t := in.Object.(type) {
case *runtime.Unknown:
// TODO: handle other fields on Unknown and detect type
out.Object.Raw = t.Raw
case nil:
default:
out.Object.Object = in.Object
}
return nil
}
func Convert_versioned_InternalEvent_to_versioned_Event(in *InternalEvent, out *Event, s conversion.Scope) error {
return Convert_watch_Event_to_versioned_Event((*watch.Event)(in), out, s)
}
func Convert_versioned_Event_to_watch_Event(in *Event, out *watch.Event, s conversion.Scope) error {
out.Type = watch.EventType(in.Type)
if in.Object.Object != nil {
out.Object = in.Object.Object
} else if in.Object.Raw != nil {
// TODO: handle other fields on Unknown and detect type
out.Object = &runtime.Unknown{
Raw: in.Object.Raw,
ContentType: runtime.ContentTypeJSON,
}
}
return nil
}
func Convert_versioned_Event_to_versioned_InternalEvent(in *Event, out *InternalEvent, s conversion.Scope) error {
return Convert_versioned_Event_to_watch_Event(in, (*watch.Event)(out), s)
}
// InternalEvent makes watch.Event versioned
type InternalEvent watch.Event
func (e *InternalEvent) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind }
func (e *Event) GetObjectKind() unversioned.ObjectKind { return unversioned.EmptyObjectKind }

View File

@ -0,0 +1,37 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Package versioned contains the versioned types for watch. This is the first
// serialization version unless otherwise noted.
package versioned
import (
"k8s.io/kubernetes/pkg/runtime"
)
// Event represents a single event to a watched resource.
//
// +protobuf=true
type Event struct {
Type string `json:"type" protobuf:"bytes,1,opt,name=type"`
// Object is:
// * If Type is Added or Modified: the new state of the object.
// * If Type is Deleted: the state of the object immediately before deletion.
// * If Type is Error: *api.Status is recommended; other types may make sense
// depending on context.
Object runtime.RawExtension `json:"object" protobuf:"bytes,2,opt,name=object"`
}