Merge pull request #31446 from liggitt/log-streaming

Automatic merge from submit-queue

Fix hang/websocket timeout when streaming container log with no content

When streaming and following a container log, no response headers are sent from the kubelet `containerLogs` endpoint until the first byte of content is written to the log. This propagates back to the API server, which also will not send response headers until it gets response headers from the kubelet. That includes upgrade headers, which means a websocket connection upgrade is not performed and can time out.

To recreate, create a busybox pod that runs `/bin/sh -c 'sleep 30 && echo foo && sleep 10'`

As soon as the pod starts, query the kubelet API:
```
curl -N -k -v 'https://<node>:10250/containerLogs/<ns>/<pod>/<container>?follow=true&limitBytes=100'
```

or the master API:
```
curl -N -k -v 'http://<master>:8080/api/v1/<ns>/pods/<pod>/log?follow=true&limitBytes=100'
```

In both cases, notice that the response headers are not sent until the first byte of log content is available.

This PR:
* does a 0-byte write prior to handing off to the container runtime stream copy. That commits the response header, even if the subsequent copy blocks waiting for the first byte of content from the log.
* fixes a bug with the "ping" frame sent to websocket streams, which was not respecting the requested protocol (it was sending a binary frame to a websocket that requested a base64 text protocol)
* fixes a bug in the limitwriter, which was not propagating 0-length writes, even before the writer's limit was reached
pull/6/head
Kubernetes Submit Queue 2016-08-26 06:09:43 -07:00 committed by GitHub
commit eeac23282d
4 changed files with 110 additions and 3 deletions

View File

@ -2605,6 +2605,14 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, lo
if err != nil {
return err
}
// Do a zero-byte write to stdout before handing off to the container runtime.
// This ensures at least one Write call is made to the writer when copying starts,
// even if we then block waiting for log output from the container.
if _, err := stdout.Write([]byte{}); err != nil {
return err
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
}

View File

@ -42,7 +42,7 @@ func (w *limitWriter) Write(p []byte) (n int, err error) {
if int64(len(p)) > w.n {
p = p[:w.n]
}
if len(p) > 0 {
if w.n > 0 {
n, err = w.w.Write(p)
w.n -= int64(n)
}

View File

@ -0,0 +1,93 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package limitwriter
import (
"reflect"
"testing"
)
type recordingWriter struct {
Wrote [][]byte
}
func (r *recordingWriter) Write(data []byte) (int, error) {
r.Wrote = append(r.Wrote, data)
return len(data), nil
}
func TestLimitWriter(t *testing.T) {
testcases := map[string]struct {
Limit int64
Writes [][]byte
ExpectedRecordedWrites [][]byte
ExpectedReportedWrites []int
ExpectedReportedErrors []error
}{
"empty": {},
"empty write": {
Limit: 1000,
Writes: [][]byte{{}},
ExpectedRecordedWrites: [][]byte{{}},
ExpectedReportedWrites: []int{0},
ExpectedReportedErrors: []error{nil},
},
"unlimited write": {
Limit: 1000,
Writes: [][]byte{[]byte(`foo`)},
ExpectedRecordedWrites: [][]byte{[]byte(`foo`)},
ExpectedReportedWrites: []int{3},
ExpectedReportedErrors: []error{nil},
},
"limited write": {
Limit: 5,
Writes: [][]byte{[]byte(``), []byte(`1`), []byte(`23`), []byte(`456789`), []byte(`10`), []byte(``)},
ExpectedRecordedWrites: [][]byte{[]byte(``), []byte(`1`), []byte(`23`), []byte(`45`)},
ExpectedReportedWrites: []int{0, 1, 2, 2, 0, 0},
ExpectedReportedErrors: []error{nil, nil, nil, ErrMaximumWrite, ErrMaximumWrite, ErrMaximumWrite},
},
}
for k, tc := range testcases {
var reportedWrites []int
var reportedErrors []error
recordingWriter := &recordingWriter{}
limitwriter := New(recordingWriter, tc.Limit)
for _, w := range tc.Writes {
n, err := limitwriter.Write(w)
reportedWrites = append(reportedWrites, n)
reportedErrors = append(reportedErrors, err)
}
if !reflect.DeepEqual(recordingWriter.Wrote, tc.ExpectedRecordedWrites) {
t.Errorf("%s: expected recorded writes %v, got %v", k, tc.ExpectedRecordedWrites, recordingWriter.Wrote)
}
if !reflect.DeepEqual(reportedWrites, tc.ExpectedReportedWrites) {
t.Errorf("%s: expected reported writes %v, got %v", k, tc.ExpectedReportedWrites, reportedWrites)
}
if !reflect.DeepEqual(reportedErrors, tc.ExpectedReportedErrors) {
t.Errorf("%s: expected reported errors %v, got %v", k, tc.ExpectedReportedErrors, reportedErrors)
}
}
}

View File

@ -143,8 +143,14 @@ func messageCopy(ws *websocket.Conn, r io.Reader, base64Encode, ping bool, timeo
buf := make([]byte, 2048)
if ping {
resetTimeout(ws, timeout)
if err := websocket.Message.Send(ws, []byte{}); err != nil {
return err
if base64Encode {
if err := websocket.Message.Send(ws, ""); err != nil {
return err
}
} else {
if err := websocket.Message.Send(ws, []byte{}); err != nil {
return err
}
}
}
for {