diff --git a/pkg/apiserver/watch.go b/pkg/apiserver/watch.go index 8094ac9c00..b62693822f 100644 --- a/pkg/apiserver/watch.go +++ b/pkg/apiserver/watch.go @@ -167,18 +167,22 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) flusher.Flush() + var unknown runtime.Unknown + internalEvent := &versioned.InternalEvent{} buf := &bytes.Buffer{} + ch := s.watching.ResultChan() for { select { case <-cn.CloseNotify(): return case <-timeoutCh: return - case event, ok := <-s.watching.ResultChan(): + case event, ok := <-ch: if !ok { // End of results. return } + obj := event.Object s.fixup(obj) if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil { @@ -186,12 +190,15 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) { utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) return } - event.Object = &runtime.Unknown{ - Raw: buf.Bytes(), - // ContentType is not required here because we are defaulting to the serializer - // type - } - if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil { + + // ContentType is not required here because we are defaulting to the serializer + // type + unknown.Raw = buf.Bytes() + event.Object = &unknown + + // the internal event will be versioned by the encoder + *internalEvent = versioned.InternalEvent(event) + if err := e.Encode(internalEvent); err != nil { utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e)) // client disconnect. return @@ -208,14 +215,18 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { defer ws.Close() done := make(chan struct{}) go wsstream.IgnoreReceives(ws, 0) + + var unknown runtime.Unknown + internalEvent := &versioned.InternalEvent{} buf := &bytes.Buffer{} streamBuf := &bytes.Buffer{} + ch := s.watching.ResultChan() for { select { case <-done: s.watching.Stop() return - case event, ok := <-s.watching.ResultChan(): + case event, ok := <-ch: if !ok { // End of results. return @@ -227,14 +238,15 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) { utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) return } - event.Object = &runtime.Unknown{ - Raw: buf.Bytes(), - // ContentType is not required here because we are defaulting to the serializer - // type - } + + // ContentType is not required here because we are defaulting to the serializer + // type + unknown.Raw = buf.Bytes() + event.Object = &unknown + // the internal event will be versioned by the encoder - internalEvent := versioned.InternalEvent(event) - if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil { + *internalEvent = versioned.InternalEvent(event) + if err := s.encoder.EncodeToStream(internalEvent, streamBuf); err != nil { // encoding error utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err)) s.watching.Stop()