Avoid extra allocations in watch loop

pull/6/head
Clayton Coleman 2016-05-21 00:06:11 -04:00
parent f670cc4652
commit b1a759cbbc
No known key found for this signature in database
GPG Key ID: 3D16906B4F1C5CB3
1 changed files with 27 additions and 15 deletions

View File

@ -167,18 +167,22 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
flusher.Flush() flusher.Flush()
var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
ch := s.watching.ResultChan()
for { for {
select { select {
case <-cn.CloseNotify(): case <-cn.CloseNotify():
return return
case <-timeoutCh: case <-timeoutCh:
return return
case event, ok := <-s.watching.ResultChan(): case event, ok := <-ch:
if !ok { if !ok {
// End of results. // End of results.
return return
} }
obj := event.Object obj := event.Object
s.fixup(obj) s.fixup(obj)
if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil { if err := s.embeddedEncoder.EncodeToStream(obj, buf); err != nil {
@ -186,12 +190,15 @@ func (s *WatchServer) ServeHTTP(w http.ResponseWriter, req *http.Request) {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return return
} }
event.Object = &runtime.Unknown{
Raw: buf.Bytes(), // ContentType is not required here because we are defaulting to the serializer
// ContentType is not required here because we are defaulting to the serializer // type
// type unknown.Raw = buf.Bytes()
} event.Object = &unknown
if err := e.Encode((*versioned.InternalEvent)(&event)); err != nil {
// the internal event will be versioned by the encoder
*internalEvent = versioned.InternalEvent(event)
if err := e.Encode(internalEvent); err != nil {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e)) utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v (%#v)", err, e))
// client disconnect. // client disconnect.
return return
@ -208,14 +215,18 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
defer ws.Close() defer ws.Close()
done := make(chan struct{}) done := make(chan struct{})
go wsstream.IgnoreReceives(ws, 0) go wsstream.IgnoreReceives(ws, 0)
var unknown runtime.Unknown
internalEvent := &versioned.InternalEvent{}
buf := &bytes.Buffer{} buf := &bytes.Buffer{}
streamBuf := &bytes.Buffer{} streamBuf := &bytes.Buffer{}
ch := s.watching.ResultChan()
for { for {
select { select {
case <-done: case <-done:
s.watching.Stop() s.watching.Stop()
return return
case event, ok := <-s.watching.ResultChan(): case event, ok := <-ch:
if !ok { if !ok {
// End of results. // End of results.
return return
@ -227,14 +238,15 @@ func (s *WatchServer) HandleWS(ws *websocket.Conn) {
utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err)) utilruntime.HandleError(fmt.Errorf("unable to encode watch object: %v", err))
return return
} }
event.Object = &runtime.Unknown{
Raw: buf.Bytes(), // ContentType is not required here because we are defaulting to the serializer
// ContentType is not required here because we are defaulting to the serializer // type
// type unknown.Raw = buf.Bytes()
} event.Object = &unknown
// the internal event will be versioned by the encoder // the internal event will be versioned by the encoder
internalEvent := versioned.InternalEvent(event) *internalEvent = versioned.InternalEvent(event)
if err := s.encoder.EncodeToStream(&internalEvent, streamBuf); err != nil { if err := s.encoder.EncodeToStream(internalEvent, streamBuf); err != nil {
// encoding error // encoding error
utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err)) utilruntime.HandleError(fmt.Errorf("unable to encode event: %v", err))
s.watching.Stop() s.watching.Stop()