Merge pull request #67573 from m1kola/52218_watching_selectors

Allows to combine the `-f` and `-l` flags in kubectl logs
pull/564/head
Kubernetes Prow Robot 2019-02-25 18:47:45 -08:00 committed by GitHub
commit 1ddfd8ff73
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 367 additions and 55 deletions

View File

@ -17,10 +17,12 @@ limitations under the License.
package logs
import (
"bufio"
"errors"
"fmt"
"io"
"os"
"sync"
"time"
"github.com/spf13/cobra"
@ -59,6 +61,9 @@ var (
# Begin streaming the logs of the ruby container in pod web-1
kubectl logs -f -c ruby web-1
# Begin streaming the logs from all containers in pods defined by label app=nginx
kubectl logs -f -lapp=nginx --all-containers=true
# Display only the most recent 20 lines of output in pod nginx
kubectl logs --tail=20 nginx
@ -86,7 +91,7 @@ type LogsOptions struct {
Options runtime.Object
Resources []string
ConsumeRequestFn func(*rest.Request, io.Writer) error
ConsumeRequestFn func(rest.ResponseWrapper, io.Writer) error
// PodLogOptions
SinceTime string
@ -101,6 +106,7 @@ type LogsOptions struct {
// whether or not a container name was given via --container
ContainerNameSpecified bool
Selector string
MaxFollowConcurency int
Object runtime.Object
GetPodTimeout time.Duration
@ -115,6 +121,7 @@ func NewLogsOptions(streams genericclioptions.IOStreams, allContainers bool) *Lo
IOStreams: streams,
AllContainers: allContainers,
Tail: -1,
MaxFollowConcurency: 5,
}
}
@ -151,6 +158,7 @@ func NewCmdLogs(f cmdutil.Factory, streams genericclioptions.IOStreams) *cobra.C
cmd.Flags().StringVarP(&o.Container, "container", "c", o.Container, "Print the logs of this container")
cmdutil.AddPodRunningTimeoutFlag(cmd, defaultPodLogsTimeout)
cmd.Flags().StringVarP(&o.Selector, "selector", "l", o.Selector, "Selector (label query) to filter on.")
cmd.Flags().IntVar(&o.MaxFollowConcurency, "max-log-requests", o.MaxFollowConcurency, "Specify maximum number of concurrent logs to follow when using by a selector. Defaults to 5.")
return cmd
}
@ -256,10 +264,6 @@ func (o *LogsOptions) Complete(f cmdutil.Factory, cmd *cobra.Command, args []str
}
func (o LogsOptions) Validate() error {
if o.Follow && len(o.Selector) > 0 {
return fmt.Errorf("only one of follow (-f) or selector (-l) is allowed")
}
if len(o.SinceTime) > 0 && o.SinceSeconds != 0 {
return fmt.Errorf("at most one of `sinceTime` or `sinceSeconds` may be specified")
}
@ -298,6 +302,47 @@ func (o LogsOptions) RunLogs() error {
return err
}
if o.Follow && len(requests) > 1 {
if len(requests) > o.MaxFollowConcurency {
return fmt.Errorf(
"you are attempting to follow %d log streams, but maximum allowed concurency is %d, use --max-log-requests to increase the limit",
len(requests), o.MaxFollowConcurency,
)
}
return o.parallelConsumeRequest(requests)
}
return o.sequentialConsumeRequest(requests)
}
func (o LogsOptions) parallelConsumeRequest(requests []rest.ResponseWrapper) error {
reader, writer := io.Pipe()
wg := &sync.WaitGroup{}
wg.Add(len(requests))
for _, request := range requests {
go func(request rest.ResponseWrapper) {
if err := o.ConsumeRequestFn(request, writer); err != nil {
writer.CloseWithError(err)
// It's important to return here to propagate the error via the pipe
return
}
wg.Done()
}(request)
}
go func() {
wg.Wait()
writer.Close()
}()
_, err := io.Copy(o.Out, reader)
return err
}
func (o LogsOptions) sequentialConsumeRequest(requests []rest.ResponseWrapper) error {
for _, request := range requests {
if err := o.ConsumeRequestFn(request, o.Out); err != nil {
return err
@ -307,13 +352,33 @@ func (o LogsOptions) RunLogs() error {
return nil
}
func DefaultConsumeRequest(request *rest.Request, out io.Writer) error {
// DefaultConsumeRequest reads the data from request and writes into
// the out writer. It buffers data from requests until the newline or io.EOF
// occurs in the data, so it doesn't interleave logs sub-line
// when running concurrently.
//
// A successful read returns err == nil, not err == io.EOF.
// Because the function is defined to read from request until io.EOF, it does
// not treat an io.EOF as an error to be reported.
func DefaultConsumeRequest(request rest.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream()
if err != nil {
return err
}
defer readCloser.Close()
_, err = io.Copy(out, readCloser)
r := bufio.NewReader(readCloser)
for {
bytes, err := r.ReadBytes('\n')
if _, err := out.Write(bytes); err != nil {
return err
}
if err != nil {
if err != io.EOF {
return err
}
return nil
}
}
}

View File

@ -17,11 +17,15 @@ limitations under the License.
package logs
import (
"bytes"
"errors"
"fmt"
"io"
"io/ioutil"
"strings"
"sync"
"testing"
"testing/iotest"
"time"
corev1 "k8s.io/api/core/v1"
@ -34,36 +38,198 @@ import (
func TestLog(t *testing.T) {
tests := []struct {
name, version, podPath, logPath string
pod *corev1.Pod
name string
opts func(genericclioptions.IOStreams) *LogsOptions
expectedErr string
expectedOutSubstrings []string
}{
{
name: "v1 - pod log",
pod: testPod(),
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{data: strings.NewReader("test log content\n")},
},
}
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = mock.mockConsumeRequest
return o
},
expectedOutSubstrings: []string{"test log content\n"},
},
{
name: "get logs from multiple requests sequentially",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
&responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
&responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
},
}
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = mock.mockConsumeRequest
return o
},
expectedOutSubstrings: []string{
// Order in this case muse always by the same, because we read requests sequentially
"test log content from source 1\ntest log content from source 2\ntest log content from source 3\n",
},
},
{
name: "follow logs from multiple requests concurrently",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
wg := &sync.WaitGroup{}
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{data: strings.NewReader("test log content from source 1\n")},
&responseWrapperMock{data: strings.NewReader("test log content from source 2\n")},
&responseWrapperMock{data: strings.NewReader("test log content from source 3\n")},
},
wg: wg,
}
wg.Add(3)
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = mock.mockConsumeRequest
o.Follow = true
return o
},
expectedOutSubstrings: []string{
"test log content from source 1\n",
"test log content from source 2\n",
"test log content from source 3\n",
},
},
{
name: "fail to follow logs from multiple requests when there are more logs sources then MaxFollowConcurency allows",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
wg := &sync.WaitGroup{}
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{data: strings.NewReader("test log content\n")},
&responseWrapperMock{data: strings.NewReader("test log content\n")},
&responseWrapperMock{data: strings.NewReader("test log content\n")},
},
wg: wg,
}
wg.Add(3)
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = mock.mockConsumeRequest
o.MaxFollowConcurency = 2
o.Follow = true
return o
},
expectedErr: "you are attempting to follow 3 log streams, but maximum allowed concurency is 2, use --max-log-requests to increase the limit",
},
{
name: "fail if LogsForObject fails",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
o := NewLogsOptions(streams, false)
o.LogsForObject = func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
return nil, errors.New("Error from the LogsForObject")
}
return o
},
expectedErr: "Error from the LogsForObject",
},
{
name: "fail to get logs, if ConsumeRequestFn fails",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{},
&responseWrapperMock{},
},
}
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
return o
},
expectedErr: "Error from the ConsumeRequestFn",
},
{
name: "fail to follow logs from multiple requests, if ConsumeRequestFn fails",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
wg := &sync.WaitGroup{}
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{
&responseWrapperMock{},
&responseWrapperMock{},
&responseWrapperMock{},
},
wg: wg,
}
wg.Add(3)
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
o.Follow = true
return o
},
expectedErr: "Error from the ConsumeRequestFn",
},
{
name: "fail to follow logs, if ConsumeRequestFn fails",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
mock := &logTestMock{
logsForObjectRequests: []restclient.ResponseWrapper{&responseWrapperMock{}},
}
o := NewLogsOptions(streams, false)
o.LogsForObject = mock.mockLogsForObject
o.ConsumeRequestFn = func(req restclient.ResponseWrapper, out io.Writer) error {
return errors.New("Error from the ConsumeRequestFn")
}
o.Follow = true
return o
},
expectedErr: "Error from the ConsumeRequestFn",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
logContent := "test log content"
tf := cmdtesting.NewTestFactory().WithNamespace("test")
defer tf.Cleanup()
streams, _, buf, _ := genericclioptions.NewTestIOStreams()
mock := &logTestMock{
logsContent: logContent,
opts := test.opts(streams)
opts.Namespace = "test"
opts.Object = testPod()
opts.Options = &corev1.PodLogOptions{}
err := opts.RunLogs()
if err == nil && len(test.expectedErr) > 0 {
t.Fatalf("expected error %q, got none", test.expectedErr)
}
opts := NewLogsOptions(streams, false)
opts.Namespace = "test"
opts.Object = test.pod
opts.Options = &corev1.PodLogOptions{}
opts.LogsForObject = mock.mockLogsForObject
opts.ConsumeRequestFn = mock.mockConsumeRequest
opts.RunLogs()
if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
}
if buf.String() != logContent {
t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String())
bufStr := buf.String()
if test.expectedOutSubstrings != nil {
for _, substr := range test.expectedOutSubstrings {
if !strings.Contains(bufStr, substr) {
t.Errorf("%s: expected to contain %#v. Output: %#v", test.name, substr, bufStr)
}
}
}
})
}
@ -199,23 +365,6 @@ func TestValidateLogOptions(t *testing.T) {
args: []string{"my-pod", "my-container"},
expected: "only one of -c or an inline",
},
{
name: "follow and selector conflict",
opts: func(streams genericclioptions.IOStreams) *LogsOptions {
o := NewLogsOptions(streams, false)
o.Selector = "foo"
o.Follow = true
var err error
o.Options, err = o.ToLogOptions()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
return o
},
expected: "only one of follow (-f) or selector (-l) is allowed",
},
}
for _, test := range tests {
streams := genericclioptions.NewTestIOStreamsDiscard()
@ -274,16 +423,114 @@ func TestLogComplete(t *testing.T) {
}
}
func TestDefaultConsumeRequest(t *testing.T) {
tests := []struct {
name string
request restclient.ResponseWrapper
expectedErr string
expectedOut string
}{
{
name: "error from request stream",
request: &responseWrapperMock{
err: errors.New("err from the stream"),
},
expectedErr: "err from the stream",
},
{
name: "error while reading",
request: &responseWrapperMock{
data: iotest.TimeoutReader(strings.NewReader("Some data")),
},
expectedErr: iotest.ErrTimeout.Error(),
expectedOut: "Some data",
},
{
name: "read with empty string",
request: &responseWrapperMock{
data: strings.NewReader(""),
},
expectedOut: "",
},
{
name: "read without new lines",
request: &responseWrapperMock{
data: strings.NewReader("some string without a new line"),
},
expectedOut: "some string without a new line",
},
{
name: "read with newlines in the middle",
request: &responseWrapperMock{
data: strings.NewReader("foo\nbar"),
},
expectedOut: "foo\nbar",
},
{
name: "read with newline at the end",
request: &responseWrapperMock{
data: strings.NewReader("foo\n"),
},
expectedOut: "foo\n",
},
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
buf := &bytes.Buffer{}
err := DefaultConsumeRequest(test.request, buf)
if err != nil && !strings.Contains(err.Error(), test.expectedErr) {
t.Errorf("%s: expected to find:\n\t%s\nfound:\n\t%s\n", test.name, test.expectedErr, err.Error())
}
if buf.String() != test.expectedOut {
t.Errorf("%s: did not get expected log content. Got: %s", test.name, buf.String())
}
})
}
}
type responseWrapperMock struct {
data io.Reader
err error
}
func (r *responseWrapperMock) DoRaw() ([]byte, error) {
data, _ := ioutil.ReadAll(r.data)
return data, r.err
}
func (r *responseWrapperMock) Stream() (io.ReadCloser, error) {
return ioutil.NopCloser(r.data), r.err
}
type logTestMock struct {
logsContent string
logsForObjectRequests []restclient.ResponseWrapper
// We need a WaitGroup in some test cases to make sure that we fetch logs concurrently.
// These test cases will finish successfully without the WaitGroup, but the WaitGroup
// will help us to identify regression when someone accidentally changes
// concurrent fetching to sequential
wg *sync.WaitGroup
}
func (l *logTestMock) mockConsumeRequest(req *restclient.Request, out io.Writer) error {
fmt.Fprintf(out, l.logsContent)
return nil
func (l *logTestMock) mockConsumeRequest(request restclient.ResponseWrapper, out io.Writer) error {
readCloser, err := request.Stream()
if err != nil {
return err
}
defer readCloser.Close()
// Just copy everything for a test sake
_, err = io.Copy(out, readCloser)
if l.wg != nil {
l.wg.Done()
l.wg.Wait()
}
return err
}
func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*restclient.Request, error) {
func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]restclient.ResponseWrapper, error) {
switch object.(type) {
case *corev1.Pod:
_, ok := options.(*corev1.PodLogOptions)
@ -291,7 +538,7 @@ func (l *logTestMock) mockLogsForObject(restClientGetter genericclioptions.RESTC
return nil, errors.New("provided options object is not a PodLogOptions")
}
return []*restclient.Request{{}}, nil
return l.logsForObjectRequests, nil
default:
return nil, fmt.Errorf("cannot get the logs from %T", object)
}

View File

@ -29,7 +29,7 @@ import (
)
// LogsForObjectFunc is a function type that can tell you how to get logs for a runtime.object
type LogsForObjectFunc func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error)
type LogsForObjectFunc func(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error)
// LogsForObjectFn gives a way to easily override the function for unit testing if needed.
var LogsForObjectFn LogsForObjectFunc = logsForObject

View File

@ -32,7 +32,7 @@ import (
"k8s.io/kubernetes/pkg/kubectl/util/podutils"
)
func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error) {
func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error) {
clientConfig, err := restClientGetter.ToRESTConfig()
if err != nil {
return nil, err
@ -47,7 +47,7 @@ func logsForObject(restClientGetter genericclioptions.RESTClientGetter, object,
// TODO: remove internal clientset once all callers use external versions
// this is split for easy test-ability
func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]*rest.Request, error) {
func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, options runtime.Object, timeout time.Duration, allContainers bool) ([]rest.ResponseWrapper, error) {
opts, ok := options.(*corev1.PodLogOptions)
if !ok {
return nil, errors.New("provided options object is not a PodLogOptions")
@ -55,7 +55,7 @@ func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, opt
switch t := object.(type) {
case *corev1.PodList:
ret := []*rest.Request{}
ret := []rest.ResponseWrapper{}
for i := range t.Items {
currRet, err := logsForObjectWithClient(clientset, &t.Items[i], options, timeout, allContainers)
if err != nil {
@ -68,10 +68,10 @@ func logsForObjectWithClient(clientset corev1client.CoreV1Interface, object, opt
case *corev1.Pod:
// if allContainers is true, then we're going to locate all containers and then iterate through them. At that point, "allContainers" is false
if !allContainers {
return []*rest.Request{clientset.Pods(t.Namespace).GetLogs(t.Name, opts)}, nil
return []rest.ResponseWrapper{clientset.Pods(t.Namespace).GetLogs(t.Name, opts)}, nil
}
ret := []*rest.Request{}
ret := []rest.ResponseWrapper{}
for _, c := range t.Spec.InitContainers {
currOpts := opts.DeepCopy()
currOpts.Container = c.Name