mirror of https://github.com/k3s-io/k3s
Merge pull request #69986 from pohly/podlogs
e2e: live streaming of pod events and stdout for CSI volume testspull/58/head
commit
0d37650107
|
@ -162,6 +162,7 @@ filegroup(
|
|||
"//test/e2e/framework/ginkgowrapper:all-srcs",
|
||||
"//test/e2e/framework/ingress:all-srcs",
|
||||
"//test/e2e/framework/metrics:all-srcs",
|
||||
"//test/e2e/framework/podlogs:all-srcs",
|
||||
"//test/e2e/framework/providers/aws:all-srcs",
|
||||
"//test/e2e/framework/providers/azure:all-srcs",
|
||||
"//test/e2e/framework/providers/gce:all-srcs",
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
load("@io_bazel_rules_go//go:def.bzl", "go_library")
|
||||
|
||||
go_library(
|
||||
name = "go_default_library",
|
||||
srcs = ["podlogs.go"],
|
||||
importpath = "k8s.io/kubernetes/test/e2e/framework/podlogs",
|
||||
visibility = ["//visibility:public"],
|
||||
deps = [
|
||||
"//staging/src/k8s.io/api/core/v1:go_default_library",
|
||||
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
|
||||
"//staging/src/k8s.io/client-go/kubernetes:go_default_library",
|
||||
"//vendor/github.com/pkg/errors:go_default_library",
|
||||
],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "package-srcs",
|
||||
srcs = glob(["**"]),
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:private"],
|
||||
)
|
||||
|
||||
filegroup(
|
||||
name = "all-srcs",
|
||||
srcs = [":package-srcs"],
|
||||
tags = ["automanaged"],
|
||||
visibility = ["//visibility:public"],
|
||||
)
|
|
@ -0,0 +1,263 @@
|
|||
/*
|
||||
Copyright 2018 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
// Package podlogs enables live capturing of all events and log
|
||||
// messages for some or all pods in a namespace as they get generated.
|
||||
// This helps debugging both a running test (what is currently going
|
||||
// on?) and the output of a CI run (events appear in chronological
|
||||
// order and output that normally isn't available like the command
|
||||
// stdout messages are available).
|
||||
package podlogs
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"github.com/pkg/errors"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"regexp"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
clientset "k8s.io/client-go/kubernetes"
|
||||
)
|
||||
|
||||
// LogsForPod starts reading the logs for a certain pod. If the pod has more than one
|
||||
// container, opts.Container must be set. Reading stops when the context is done.
|
||||
// The stream includes formatted error messages and ends with
|
||||
// rpc error: code = Unknown desc = Error: No such container: 41a...
|
||||
// when the pod gets deleted while streaming.
|
||||
func LogsForPod(ctx context.Context, cs clientset.Interface, ns, pod string, opts *v1.PodLogOptions) (io.ReadCloser, error) {
|
||||
req := cs.Core().Pods(ns).GetLogs(pod, opts)
|
||||
return req.Context(ctx).Stream()
|
||||
}
|
||||
|
||||
// LogOutput determines where output from CopyAllLogs goes.
|
||||
type LogOutput struct {
|
||||
// If not nil, errors will be logged here.
|
||||
StatusWriter io.Writer
|
||||
|
||||
// If not nil, all output goes to this writer with "<pod>/<container>:" as prefix.
|
||||
LogWriter io.Writer
|
||||
|
||||
// Base directory for one log file per container.
|
||||
// The full path of each log file will be <log path prefix><pod>-<container>.log.
|
||||
LogPathPrefix string
|
||||
}
|
||||
|
||||
// Matches harmless errors from pkg/kubelet/kubelet_pods.go.
|
||||
var expectedErrors = regexp.MustCompile(`container .* in pod .* is (terminated|waiting to start|not available)|the server could not find the requested resource`)
|
||||
|
||||
// CopyAllLogs follows the logs of all containers in all pods,
|
||||
// including those that get created in the future, and writes each log
|
||||
// line as configured in the output options. It does that until the
|
||||
// context is done or until an error occurs.
|
||||
//
|
||||
// Beware that there is currently no way to force log collection
|
||||
// before removing pods, which means that there is a known race
|
||||
// between "stop pod" and "collecting log entries". The alternative
|
||||
// would be a blocking function with collects logs from all currently
|
||||
// running pods, but that then would have the disadvantage that
|
||||
// already deleted pods aren't covered.
|
||||
func CopyAllLogs(ctx context.Context, cs clientset.Interface, ns string, to LogOutput) error {
|
||||
watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot create Pod event watcher")
|
||||
}
|
||||
|
||||
go func() {
|
||||
var m sync.Mutex
|
||||
logging := map[string]bool{}
|
||||
check := func() {
|
||||
m.Lock()
|
||||
defer m.Unlock()
|
||||
|
||||
pods, err := cs.Core().Pods(ns).List(meta.ListOptions{})
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: get pod list in %s: %s\n", ns, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
for _, pod := range pods.Items {
|
||||
for _, c := range pod.Spec.Containers {
|
||||
name := pod.ObjectMeta.Name + "/" + c.Name
|
||||
if logging[name] {
|
||||
continue
|
||||
}
|
||||
readCloser, err := LogsForPod(ctx, cs, ns, pod.ObjectMeta.Name,
|
||||
&v1.PodLogOptions{
|
||||
Container: c.Name,
|
||||
Follow: true,
|
||||
})
|
||||
if err != nil {
|
||||
// We do get "normal" errors here, like trying to read too early.
|
||||
// We can ignore those.
|
||||
if to.StatusWriter != nil &&
|
||||
expectedErrors.FindStringIndex(err.Error()) == nil {
|
||||
fmt.Fprintf(to.StatusWriter, "WARNING: pod log: %s: %s\n", name, err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Determine where we write. If this fails, we intentionally return without clearing
|
||||
// the logging[name] flag, which prevents trying over and over again to
|
||||
// create the output file.
|
||||
var out io.Writer
|
||||
var closer io.Closer
|
||||
var prefix string
|
||||
if to.LogWriter != nil {
|
||||
out = to.LogWriter
|
||||
prefix = name + ": "
|
||||
} else {
|
||||
var err error
|
||||
filename := to.LogPathPrefix + pod.ObjectMeta.Name + "-" + c.Name + ".log"
|
||||
err = os.MkdirAll(path.Dir(filename), 0755)
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create directory for %s: %s\n", filename, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
// The test suite might run the same test multiple times,
|
||||
// so we have to append here.
|
||||
file, err := os.OpenFile(filename, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
|
||||
if err != nil {
|
||||
if to.StatusWriter != nil {
|
||||
fmt.Fprintf(to.StatusWriter, "ERROR: pod log: create file %s: %s\n", filename, err)
|
||||
}
|
||||
return
|
||||
}
|
||||
closer = file
|
||||
out = file
|
||||
}
|
||||
go func() {
|
||||
if closer != nil {
|
||||
defer closer.Close()
|
||||
}
|
||||
defer func() {
|
||||
m.Lock()
|
||||
logging[name] = false
|
||||
m.Unlock()
|
||||
readCloser.Close()
|
||||
}()
|
||||
scanner := bufio.NewScanner(readCloser)
|
||||
first := true
|
||||
for scanner.Scan() {
|
||||
line := scanner.Text()
|
||||
// Filter out the expected "end of stream" error message,
|
||||
// it would just confuse developers who don't know about it.
|
||||
// Same for attempts to read logs from a container that
|
||||
// isn't ready (yet?!).
|
||||
if !strings.HasPrefix(line, "rpc error: code = Unknown desc = Error: No such container:") &&
|
||||
!strings.HasPrefix(line, "Unable to retrieve container logs for ") {
|
||||
if first {
|
||||
if to.LogWriter == nil {
|
||||
// Because the same log might be written to multiple times
|
||||
// in different test instances, log an extra line to separate them.
|
||||
// Also provides some useful extra information.
|
||||
fmt.Fprintf(out, "==== start of log for container %s ====\n", name)
|
||||
}
|
||||
first = false
|
||||
}
|
||||
fmt.Fprintf(out, "%s%s\n", prefix, scanner.Text())
|
||||
}
|
||||
}
|
||||
}()
|
||||
logging[name] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Watch events to see whether we can start logging
|
||||
// and log interesting ones.
|
||||
check()
|
||||
for {
|
||||
select {
|
||||
case <-watcher.ResultChan():
|
||||
check()
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// WatchPods prints pod status events for a certain namespace or all namespaces
|
||||
// when namespace name is empty.
|
||||
func WatchPods(ctx context.Context, cs clientset.Interface, ns string, to io.Writer) error {
|
||||
watcher, err := cs.Core().Pods(ns).Watch(meta.ListOptions{})
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "cannot create Pod event watcher")
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer watcher.Stop()
|
||||
for {
|
||||
select {
|
||||
case e := <-watcher.ResultChan():
|
||||
if e.Object == nil {
|
||||
continue
|
||||
}
|
||||
|
||||
pod, ok := e.Object.(*v1.Pod)
|
||||
if !ok {
|
||||
continue
|
||||
}
|
||||
buffer := new(bytes.Buffer)
|
||||
fmt.Fprintf(buffer,
|
||||
"pod event: %s: %s/%s %s: %s %s\n",
|
||||
e.Type,
|
||||
pod.Namespace,
|
||||
pod.Name,
|
||||
pod.Status.Phase,
|
||||
pod.Status.Reason,
|
||||
pod.Status.Conditions,
|
||||
)
|
||||
for _, cst := range pod.Status.ContainerStatuses {
|
||||
fmt.Fprintf(buffer, " %s: ", cst.Name)
|
||||
if cst.State.Waiting != nil {
|
||||
fmt.Fprintf(buffer, "WAITING: %s - %s",
|
||||
cst.State.Waiting.Reason,
|
||||
cst.State.Waiting.Message,
|
||||
)
|
||||
} else if cst.State.Running != nil {
|
||||
fmt.Fprintf(buffer, "RUNNING")
|
||||
} else if cst.State.Waiting != nil {
|
||||
fmt.Fprintf(buffer, "TERMINATED: %s - %s",
|
||||
cst.State.Waiting.Reason,
|
||||
cst.State.Waiting.Message,
|
||||
)
|
||||
}
|
||||
fmt.Fprintf(buffer, "\n")
|
||||
}
|
||||
to.Write(buffer.Bytes())
|
||||
case <-ctx.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return nil
|
||||
}
|
|
@ -70,6 +70,7 @@ go_library(
|
|||
"//staging/src/k8s.io/csi-api/pkg/crd:go_default_library",
|
||||
"//test/e2e/framework:go_default_library",
|
||||
"//test/e2e/framework/metrics:go_default_library",
|
||||
"//test/e2e/framework/podlogs:go_default_library",
|
||||
"//test/e2e/framework/providers/gce:go_default_library",
|
||||
"//test/e2e/framework/testfiles:go_default_library",
|
||||
"//test/e2e/storage/drivers:go_default_library",
|
||||
|
|
|
@ -17,8 +17,10 @@ limitations under the License.
|
|||
package storage
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"regexp"
|
||||
"time"
|
||||
|
||||
"k8s.io/api/core/v1"
|
||||
|
@ -30,6 +32,7 @@ import (
|
|||
csiv1alpha1 "k8s.io/csi-api/pkg/apis/csi/v1alpha1"
|
||||
csiclient "k8s.io/csi-api/pkg/client/clientset/versioned"
|
||||
"k8s.io/kubernetes/test/e2e/framework"
|
||||
"k8s.io/kubernetes/test/e2e/framework/podlogs"
|
||||
"k8s.io/kubernetes/test/e2e/storage/testsuites"
|
||||
"k8s.io/kubernetes/test/e2e/storage/utils"
|
||||
imageutils "k8s.io/kubernetes/test/utils/image"
|
||||
|
@ -57,6 +60,7 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
|||
f := framework.NewDefaultFramework("csi-volumes")
|
||||
|
||||
var (
|
||||
cancel context.CancelFunc
|
||||
cs clientset.Interface
|
||||
crdclient apiextensionsclient.Interface
|
||||
csics csiclient.Interface
|
||||
|
@ -66,11 +70,40 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
|||
)
|
||||
|
||||
BeforeEach(func() {
|
||||
ctx, c := context.WithCancel(context.Background())
|
||||
cancel = c
|
||||
|
||||
cs = f.ClientSet
|
||||
crdclient = f.APIExtensionsClientSet
|
||||
csics = f.CSIClientSet
|
||||
ns = f.Namespace
|
||||
|
||||
// Debugging of the following tests heavily depends on the log output
|
||||
// of the different containers. Therefore include all of that in log
|
||||
// files (when using --report-dir, as in the CI) or the output stream
|
||||
// (otherwise).
|
||||
to := podlogs.LogOutput{
|
||||
StatusWriter: GinkgoWriter,
|
||||
}
|
||||
if framework.TestContext.ReportDir == "" {
|
||||
to.LogWriter = GinkgoWriter
|
||||
} else {
|
||||
test := CurrentGinkgoTestDescription()
|
||||
reg := regexp.MustCompile("[^a-zA-Z0-9_-]+")
|
||||
// We end the prefix with a slash to ensure that all logs
|
||||
// end up in a directory named after the current test.
|
||||
to.LogPathPrefix = framework.TestContext.ReportDir + "/" +
|
||||
reg.ReplaceAllString(test.FullTestText, "_") + "/"
|
||||
}
|
||||
podlogs.CopyAllLogs(ctx, cs, ns.Name, to)
|
||||
|
||||
// pod events are something that the framework already collects itself
|
||||
// after a failed test. Logging them live is only useful for interactive
|
||||
// debugging, not when we collect reports.
|
||||
if framework.TestContext.ReportDir == "" {
|
||||
podlogs.WatchPods(ctx, cs, ns.Name, GinkgoWriter)
|
||||
}
|
||||
|
||||
nodes := framework.GetReadySchedulableNodesOrDie(f.ClientSet)
|
||||
node = nodes.Items[rand.Intn(len(nodes.Items))]
|
||||
config = framework.VolumeTestConfig{
|
||||
|
@ -84,6 +117,10 @@ var _ = utils.SIGDescribe("CSI Volumes", func() {
|
|||
createCSICRDs(crdclient)
|
||||
})
|
||||
|
||||
AfterEach(func() {
|
||||
cancel()
|
||||
})
|
||||
|
||||
for driverName, initCSIDriver := range csiTestDrivers {
|
||||
curDriverName := driverName
|
||||
curInitCSIDriver := initCSIDriver
|
||||
|
|
Loading…
Reference in New Issue