kubelet: handle recreated log files

if the runtime is configured to rotate the log file, we might end up
watching the old fd where there are no more writes.

When a fsnotify event other than Write is received, reopen the log
file and recreate the watcher.

Signed-off-by: Giuseppe Scrivano <gscrivan@redhat.com>
k3s-v1.15.3
Giuseppe Scrivano 2019-01-15 19:17:43 +01:00
parent 2c30eee92f
commit 341c2c0d1f
No known key found for this signature in database
GPG Key ID: E4730F97F60286ED
1 changed files with 37 additions and 9 deletions

View File

@ -321,7 +321,26 @@ func ReadLogs(ctx context.Context, path, containerID string, opts *LogOptions, r
continue continue
} }
// Wait until the next log change. // Wait until the next log change.
if found, err := waitLogs(ctx, containerID, watcher, runtimeService); !found { found, recreated, err := waitLogs(ctx, containerID, watcher, runtimeService)
if recreated {
newF, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
continue
}
return fmt.Errorf("failed to open log file %q: %v", path, err)
}
f.Close()
if err := watcher.Remove(f.Name()); err != nil && !os.IsNotExist(err) {
klog.Errorf("failed to remove file watch %q: %v", f.Name(), err)
}
f = newF
if err := watcher.Add(f.Name()); err != nil {
return fmt.Errorf("failed to watch file %q: %v", f.Name(), err)
}
r = bufio.NewReader(f)
}
if !found {
return err return err
} }
continue continue
@ -373,34 +392,43 @@ func isContainerRunning(id string, r internalapi.RuntimeService) (bool, error) {
return true, nil return true, nil
} }
// waitLogs wait for the next log write. It returns a boolean and an error. The boolean // waitLogs wait for the next log write. It returns two booleans and an error. The first boolean
// indicates whether a new log is found; the error is error happens during waiting new logs. // indicates whether a new log is found; the second boolean if the log file was recreated;
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, error) { // the error is error happens during waiting new logs.
func waitLogs(ctx context.Context, id string, w *fsnotify.Watcher, runtimeService internalapi.RuntimeService) (bool, bool, error) {
// no need to wait if the pod is not running // no need to wait if the pod is not running
if running, err := isContainerRunning(id, runtimeService); !running { if running, err := isContainerRunning(id, runtimeService); !running {
return false, err return false, false, err
} }
errRetry := 5 errRetry := 5
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return false, fmt.Errorf("context cancelled") return false, false, fmt.Errorf("context cancelled")
case e := <-w.Events: case e := <-w.Events:
switch e.Op { switch e.Op {
case fsnotify.Write: case fsnotify.Write:
return true, nil return true, false, nil
case fsnotify.Create:
fallthrough
case fsnotify.Rename:
fallthrough
case fsnotify.Remove:
fallthrough
case fsnotify.Chmod:
return true, true, nil
default: default:
klog.Errorf("Unexpected fsnotify event: %v, retrying...", e) klog.Errorf("Unexpected fsnotify event: %v, retrying...", e)
} }
case err := <-w.Errors: case err := <-w.Errors:
klog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry) klog.Errorf("Fsnotify watch error: %v, %d error retries remaining", err, errRetry)
if errRetry == 0 { if errRetry == 0 {
return false, err return false, false, err
} }
errRetry-- errRetry--
case <-time.After(stateCheckPeriod): case <-time.After(stateCheckPeriod):
if running, err := isContainerRunning(id, runtimeService); !running { if running, err := isContainerRunning(id, runtimeService); !running {
return false, err return false, false, err
} }
} }
} }