Merge pull request #67285 from yujuhong/add-ctx-logs

Automatic merge from submit-queue (batch tested with PRs 67274, 67285). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

kubelet: plumb context for log requests

This allows kubelets to stop the necessary work when the context has
been canceled (e.g., connection closed), and not leaking a goroutine
and inotify watcher waiting indefinitely.



**What this PR does / why we need it**:

**Which issue(s) this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close the issue(s) when PR gets merged)*:
Partially fixes #64315

**Special notes for your reviewer**:

**Release note**:

```release-note
Fix kubelet to not leak goroutines/intofiy watchers on an inactive connection if it's closed
```
pull/8/head
Kubernetes Submit Queue 2018-08-10 23:19:05 -07:00 committed by GitHub
commit 29232e3edc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 36 additions and 23 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package container
import (
"context"
"fmt"
"io"
"net/url"
@ -113,7 +114,7 @@ type Runtime interface {
// default, it returns a snapshot of the container log. Set 'follow' to true to
// stream the log. Set 'follow' to false and specify the number of lines (e.g.
// "100" or "all") to tail the log.
GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)
GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error)
// Delete a container. If the container is still running, an error is returned.
DeleteContainer(containerID ContainerID) error
// ImageService provides methods to image-related methods.

View File

@ -17,6 +17,7 @@ limitations under the License.
package testing
import (
"context"
"fmt"
"io"
"net/url"
@ -289,7 +290,7 @@ func (f *FakeRuntime) GetPodStatus(uid types.UID, name, namespace string) (*PodS
return &status, f.Err
}
func (f *FakeRuntime) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
func (f *FakeRuntime) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
f.Lock()
defer f.Unlock()

View File

@ -17,6 +17,7 @@ limitations under the License.
package testing
import (
"context"
"io"
"time"
@ -100,7 +101,7 @@ func (r *Mock) AttachContainer(containerID ContainerID, stdin io.Reader, stdout,
return args.Error(0)
}
func (r *Mock) GetContainerLogs(pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
func (r *Mock) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
args := r.Called(pod, containerID, logOptions, stdout, stderr)
return args.Error(0)
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package dockershim
import (
"context"
"fmt"
"io"
"strconv"
@ -39,7 +40,7 @@ import (
// more functions.
type DockerLegacyService interface {
// GetContainerLogs gets logs for a specific container.
GetContainerLogs(*v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error
GetContainerLogs(context.Context, *v1.Pod, kubecontainer.ContainerID, *v1.PodLogOptions, io.Writer, io.Writer) error
// IsCRISupportedLogDriver checks whether the logging driver used by docker is
// supported by native CRI integration.
@ -50,7 +51,7 @@ type DockerLegacyService interface {
}
// GetContainerLogs get container logs directly from docker daemon.
func (d *dockerService) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
func (d *dockerService) GetContainerLogs(_ context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
container, err := d.client.InspectContainer(containerID.ID)
if err != nil {
return err
@ -97,7 +98,7 @@ func (d *dockerService) GetContainerLogTail(uid kubetypes.UID, name, namespace s
Namespace: namespace,
},
}
err := d.GetContainerLogs(pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf)
err := d.GetContainerLogs(context.Background(), pod, containerId, &v1.PodLogOptions{TailLines: &value}, buf, buf)
if err != nil {
return "", err
}

View File

@ -18,6 +18,7 @@ package kubelet
import (
"bytes"
"context"
"fmt"
"io"
"io/ioutil"
@ -1159,7 +1160,7 @@ func (kl *Kubelet) validateContainerLogStatus(podName string, podStatus *v1.PodS
// GetKubeletContainerLogs returns logs from the container
// TODO: this method is returning logs of random container attempts, when it should be returning the most recent attempt
// or all of them.
func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
func (kl *Kubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Pod workers periodically write status to statusManager. If status is not
// cached there, something is wrong (or kubelet just restarted and hasn't
// caught up yet). Just assume the pod is not ready yet.
@ -1205,9 +1206,9 @@ func (kl *Kubelet) GetKubeletContainerLogs(podFullName, containerName string, lo
// dockerLegacyService should only be non-nil when we actually need it, so
// inject it into the runtimeService.
// TODO(random-liu): Remove this hack after deprecating unsupported log driver.
return kl.dockerLegacyService.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
return kl.dockerLegacyService.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
}
return kl.containerRuntime.GetContainerLogs(pod, containerID, logOptions, stdout, stderr)
return kl.containerRuntime.GetContainerLogs(ctx, pod, containerID, logOptions, stdout, stderr)
}
// getPhase returns the phase of a pod given its container info.

View File

@ -17,6 +17,7 @@ limitations under the License.
package kuberuntime
import (
"context"
"errors"
"fmt"
"io"
@ -366,7 +367,7 @@ func getTerminationMessage(status *runtimeapi.ContainerStatus, terminationMessag
func (m *kubeGenericRuntimeManager) readLastStringFromContainerLogs(path string) string {
value := int64(kubecontainer.MaxContainerTerminationMessageLogLines)
buf, _ := circbuf.NewBuffer(kubecontainer.MaxContainerTerminationMessageLogLength)
if err := m.ReadLogs(path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
if err := m.ReadLogs(context.Background(), path, "", &v1.PodLogOptions{TailLines: &value}, buf, buf); err != nil {
return fmt.Sprintf("Error on reading termination message from logs: %v", err)
}
return buf.String()
@ -730,13 +731,13 @@ func findNextInitContainerToRun(pod *v1.Pod, podStatus *kubecontainer.PodStatus)
}
// GetContainerLogs returns logs of a specific container.
func (m *kubeGenericRuntimeManager) GetContainerLogs(pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
func (m *kubeGenericRuntimeManager) GetContainerLogs(ctx context.Context, pod *v1.Pod, containerID kubecontainer.ContainerID, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) (err error) {
status, err := m.runtimeService.ContainerStatus(containerID.ID)
if err != nil {
glog.V(4).Infof("failed to get container status for %v: %v", containerID.String(), err)
return fmt.Errorf("Unable to retrieve container logs for %v", containerID.String())
}
return m.ReadLogs(status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
return m.ReadLogs(ctx, status.GetLogPath(), containerID.ID, logOptions, stdout, stderr)
}
// GetExec gets the endpoint the runtime will serve the exec request from.

View File

@ -17,6 +17,7 @@ limitations under the License.
package kuberuntime
import (
"context"
"io"
"time"
@ -27,9 +28,9 @@ import (
// ReadLogs read the container log and redirect into stdout and stderr.
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func (m *kubeGenericRuntimeManager) ReadLogs(path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
func (m *kubeGenericRuntimeManager) ReadLogs(ctx context.Context, path, containerID string, apiOpts *v1.PodLogOptions, stdout, stderr io.Writer) error {
// Convert v1.PodLogOptions into internal log options.
opts := logs.NewLogOptions(apiOpts, time.Now())
return logs.ReadLogs(path, containerID, opts, m.runtimeService, stdout, stderr)
return logs.ReadLogs(ctx, path, containerID, opts, m.runtimeService, stdout, stderr)
}

View File

@ -19,6 +19,7 @@ package logs
import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -266,7 +267,7 @@ func (w *logWriter) write(msg *logMessage) error {
// ReadLogs read the container log and redirect into stdout and stderr.
// Note that containerID is only needed when following the log, or else
// just pass in empty string "".
func ReadLogs(path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, runtimeService internalapi.RuntimeService, stdout, stderr io.Writer) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("failed to open log file %q: %v", path, err)
@ -317,7 +318,7 @@ func ReadLogs(path, containerID string, opts *LogOptions, runtimeService interna
}
}
// Wait until the next log change.
if found, err := waitLogs(containerID, watcher, runtimeService); !found {
if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found {
return err
}
continue
@ -371,7 +372,7 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) {
// waitLogs wait for the next log write. It returns a boolean and an error. The boolean
// indicates whether a new log is found; the error is error happens during waiting new logs.
func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) {
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) {
// no need to wait if the pod is not running
if running, err := isContainerRunning(id, runtimeService); !running {
return false, err
@ -379,6 +380,8 @@ func waitLogs(id string, w *fsnotify.Watcher, runtimeService internalapi.Runtime
errRetry := 5
for {
select {
case <-ctx.Done():
return false, fmt.Errorf("context cancelled")
case e := <-w.Events:
switch e.Op {
case fsnotify.Write:

View File

@ -17,6 +17,7 @@ limitations under the License.
package server
import (
"context"
"crypto/tls"
"fmt"
"io"
@ -172,7 +173,7 @@ type HostInterface interface {
GetCachedMachineInfo() (*cadvisorapi.MachineInfo, error)
GetRunningPods() ([]*v1.Pod, error)
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
ServeLogs(w http.ResponseWriter, req *http.Request)
ResyncInterval() time.Duration
GetHostname() string
@ -457,6 +458,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
podNamespace := request.PathParameter("podNamespace")
podID := request.PathParameter("podID")
containerName := request.PathParameter("containerName")
ctx := request.Request.Context()
if len(podID) == 0 {
// TODO: Why return JSON when the rest return plaintext errors?
@ -528,7 +530,7 @@ func (s *Server) getContainerLogs(request *restful.Request, response *restful.Re
}
fw := flushwriter.Wrap(response.ResponseWriter)
response.Header().Set("Transfer-Encoding", "chunked")
if err := s.host.GetKubeletContainerLogs(kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
if err := s.host.GetKubeletContainerLogs(ctx, kubecontainer.GetPodFullName(pod), containerName, logOptions, fw, fw); err != nil {
response.WriteError(http.StatusBadRequest, err)
return
}

View File

@ -18,6 +18,7 @@ package server
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
@ -80,7 +81,7 @@ type fakeKubelet struct {
getAttachCheck func(string, types.UID, string, remotecommandserver.Options)
getPortForwardCheck func(string, string, types.UID, portforward.V4Options)
containerLogsFunc func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
containerLogsFunc func(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error
hostnameFunc func() string
resyncInterval time.Duration
loopEntryTime time.Time
@ -128,8 +129,8 @@ func (fk *fakeKubelet) ServeLogs(w http.ResponseWriter, req *http.Request) {
fk.logFunc(w, req)
}
func (fk *fakeKubelet) GetKubeletContainerLogs(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
return fk.containerLogsFunc(podFullName, containerName, logOptions, stdout, stderr)
func (fk *fakeKubelet) GetKubeletContainerLogs(ctx context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
return fk.containerLogsFunc(ctx, podFullName, containerName, logOptions, stdout, stderr)
}
func (fk *fakeKubelet) GetHostname() string {
@ -983,7 +984,7 @@ func setPodByNameFunc(fw *serverTestFramework, namespace, pod, container string)
}
func setGetContainerLogsFunc(fw *serverTestFramework, t *testing.T, expectedPodName, expectedContainerName string, expectedLogOptions *v1.PodLogOptions, output string) {
fw.fakeKubelet.containerLogsFunc = func(podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
fw.fakeKubelet.containerLogsFunc = func(_ context.Context, podFullName, containerName string, logOptions *v1.PodLogOptions, stdout, stderr io.Writer) error {
if podFullName != expectedPodName {
t.Errorf("expected %s, got %s", expectedPodName, podFullName)
}