mirror of https://github.com/k3s-io/k3s
201 lines
5.4 KiB
Go
201 lines
5.4 KiB
Go
|
/*
|
||
|
Copyright 2016 Euan Kemp
|
||
|
|
||
|
Licensed under the Apache License, Version 2.0 (the "License");
|
||
|
you may not use this file except in compliance with the License.
|
||
|
You may obtain a copy of the License at
|
||
|
|
||
|
http://www.apache.org/licenses/LICENSE-2.0
|
||
|
|
||
|
Unless required by applicable law or agreed to in writing, software
|
||
|
distributed under the License is distributed on an "AS IS" BASIS,
|
||
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
|
See the License for the specific language governing permissions and
|
||
|
limitations under the License.
|
||
|
*/
|
||
|
|
||
|
// Package kmsgparser implements a parser for the Linux `/dev/kmsg` format.
|
||
|
// More information about this format may be found here:
|
||
|
// https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
|
||
|
// Some parts of it are slightly inspired by rsyslog's contrib module:
|
||
|
// https://github.com/rsyslog/rsyslog/blob/v8.22.0/contrib/imkmsg/kmsg.c
|
||
|
package kmsgparser
|
||
|
|
||
|
import (
|
||
|
"fmt"
|
||
|
"io"
|
||
|
"os"
|
||
|
"strconv"
|
||
|
"strings"
|
||
|
"syscall"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// Parser is a parser for the kernel ring buffer found at /dev/kmsg
|
||
|
type Parser interface {
|
||
|
// SeekEnd moves the parser to the end of the kmsg queue.
|
||
|
SeekEnd() error
|
||
|
// Parse provides a channel of messages read from the kernel ring buffer.
|
||
|
// When first called, it will read the existing ringbuffer, after which it will emit new messages as they occur.
|
||
|
Parse() <-chan Message
|
||
|
// SetLogger sets the logger that will be used to report malformed kernel
|
||
|
// ringbuffer lines or unexpected kmsg read errors.
|
||
|
SetLogger(Logger)
|
||
|
// Close closes the underlying kmsg reader for this parser
|
||
|
Close() error
|
||
|
}
|
||
|
|
||
|
// Message represents a given kmsg logline, including its timestamp (as
|
||
|
// calculated based on offset from boot time), its possibly multi-line body,
|
||
|
// and so on. More information about these mssages may be found here:
|
||
|
// https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
|
||
|
type Message struct {
|
||
|
Priority int
|
||
|
SequenceNumber int
|
||
|
Timestamp time.Time
|
||
|
Message string
|
||
|
}
|
||
|
|
||
|
func NewParser() (Parser, error) {
|
||
|
f, err := os.Open("/dev/kmsg")
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
bootTime, err := getBootTime()
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
return &parser{
|
||
|
log: &StandardLogger{nil},
|
||
|
kmsgReader: f,
|
||
|
bootTime: bootTime,
|
||
|
}, nil
|
||
|
}
|
||
|
|
||
|
type ReadSeekCloser interface {
|
||
|
io.ReadCloser
|
||
|
io.Seeker
|
||
|
}
|
||
|
|
||
|
type parser struct {
|
||
|
log Logger
|
||
|
kmsgReader ReadSeekCloser
|
||
|
bootTime time.Time
|
||
|
}
|
||
|
|
||
|
func getBootTime() (time.Time, error) {
|
||
|
var sysinfo syscall.Sysinfo_t
|
||
|
err := syscall.Sysinfo(&sysinfo)
|
||
|
if err != nil {
|
||
|
return time.Time{}, fmt.Errorf("could not get boot time: %v", err)
|
||
|
}
|
||
|
// sysinfo only has seconds
|
||
|
return time.Now().Add(-1 * (time.Duration(sysinfo.Uptime) * time.Second)), nil
|
||
|
}
|
||
|
|
||
|
func (p *parser) SetLogger(log Logger) {
|
||
|
p.log = log
|
||
|
}
|
||
|
|
||
|
func (p *parser) Close() error {
|
||
|
return p.kmsgReader.Close()
|
||
|
}
|
||
|
|
||
|
func (p *parser) SeekEnd() error {
|
||
|
_, err := p.kmsgReader.Seek(0, os.SEEK_END)
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// Parse will read from the provided reader and provide a channel of messages
|
||
|
// parsed.
|
||
|
// If the provided reader *is not* a proper Linux kmsg device, Parse might not
|
||
|
// behave correctly since it relies on specific behavior of `/dev/kmsg`
|
||
|
//
|
||
|
// A goroutine is created to process the provided reader. The goroutine will
|
||
|
// exit when the given reader is closed.
|
||
|
// Closing the passed in reader will cause the goroutine to exit.
|
||
|
func (p *parser) Parse() <-chan Message {
|
||
|
|
||
|
output := make(chan Message, 1)
|
||
|
|
||
|
go func() {
|
||
|
defer close(output)
|
||
|
msg := make([]byte, 8192)
|
||
|
for {
|
||
|
// Each read call gives us one full message.
|
||
|
// https://www.kernel.org/doc/Documentation/ABI/testing/dev-kmsg
|
||
|
n, err := p.kmsgReader.Read(msg)
|
||
|
if err != nil {
|
||
|
if err == syscall.EPIPE {
|
||
|
p.log.Warningf("short read from kmsg; skipping")
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
if err == io.EOF {
|
||
|
p.log.Infof("kmsg reader closed, shutting down")
|
||
|
return
|
||
|
}
|
||
|
|
||
|
p.log.Errorf("error reading /dev/kmsg: %v", err)
|
||
|
return
|
||
|
}
|
||
|
|
||
|
msgStr := string(msg[:n])
|
||
|
|
||
|
message, err := p.parseMessage(msgStr)
|
||
|
if err != nil {
|
||
|
p.log.Warningf("unable to parse kmsg message %q: %v", msgStr, err)
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
output <- message
|
||
|
}
|
||
|
}()
|
||
|
|
||
|
return output
|
||
|
}
|
||
|
|
||
|
func (p *parser) parseMessage(input string) (Message, error) {
|
||
|
// Format:
|
||
|
// PRIORITY,SEQUENCE_NUM,TIMESTAMP,-;MESSAGE
|
||
|
parts := strings.SplitN(input, ";", 2)
|
||
|
if len(parts) != 2 {
|
||
|
return Message{}, fmt.Errorf("invalid kmsg; must contain a ';'")
|
||
|
}
|
||
|
|
||
|
metadata, message := parts[0], parts[1]
|
||
|
|
||
|
metadataParts := strings.Split(metadata, ",")
|
||
|
if len(metadataParts) < 3 {
|
||
|
return Message{}, fmt.Errorf("invalid kmsg: must contain at least 3 ',' separated pieces at the start")
|
||
|
}
|
||
|
|
||
|
priority, sequence, timestamp := metadataParts[0], metadataParts[1], metadataParts[2]
|
||
|
|
||
|
prioNum, err := strconv.Atoi(priority)
|
||
|
if err != nil {
|
||
|
return Message{}, fmt.Errorf("could not parse %q as priority: %v", priority, err)
|
||
|
}
|
||
|
|
||
|
sequenceNum, err := strconv.Atoi(sequence)
|
||
|
if err != nil {
|
||
|
return Message{}, fmt.Errorf("could not parse %q as sequence number: %v", priority, err)
|
||
|
}
|
||
|
|
||
|
timestampUsFromBoot, err := strconv.ParseInt(timestamp, 10, 64)
|
||
|
if err != nil {
|
||
|
return Message{}, fmt.Errorf("could not parse %q as timestamp: %v", priority, err)
|
||
|
}
|
||
|
// timestamp is offset in microsecond from boottime.
|
||
|
msgTime := p.bootTime.Add(time.Duration(timestampUsFromBoot) * time.Microsecond)
|
||
|
|
||
|
return Message{
|
||
|
Priority: prioNum,
|
||
|
SequenceNumber: sequenceNum,
|
||
|
Timestamp: msgTime,
|
||
|
Message: message,
|
||
|
}, nil
|
||
|
}
|