You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/logging/monitor/monitor.go

137 lines
3.0 KiB

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
1 year ago
// SPDX-License-Identifier: BUSL-1.1
package monitor
import (
"errors"
"sync"
log "github.com/hashicorp/go-hclog"
)
// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor interface {
// Start returns a channel of log messages which are sent
// ever time a log message occurs
Start() <-chan []byte
// Stop deregisters the sink from the InterceptLogger and closes the log
// channels. This returns a count of the number of log messages that were
// dropped during streaming.
Stop() int
}
// monitor implements the Monitor interface
type monitor struct {
sink log.SinkAdapter
// logger is the logger we will be monitoring
logger log.InterceptLogger
// logCh is a buffered chan where we send logs when streaming
logCh chan []byte
// droppedCount is the current count of messages
// that were dropped from the logCh buffer.
droppedCount int
// doneCh coordinates the shutdown of logCh
doneCh chan struct{}
// Defaults to 512.
bufSize int
}
type Config struct {
BufferSize int
Logger log.InterceptLogger
LoggerOptions *log.LoggerOptions
}
// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(cfg Config) Monitor {
bufSize := cfg.BufferSize
if bufSize == 0 {
bufSize = 512
}
sw := &monitor{
logger: cfg.Logger,
logCh: make(chan []byte, bufSize),
doneCh: make(chan struct{}, 1),
bufSize: bufSize,
}
cfg.LoggerOptions.Output = sw
sink := log.NewSinkAdapter(cfg.LoggerOptions)
sw.sink = sink
return sw
}
// Stop deregisters the sink and stops the monitoring process
func (d *monitor) Stop() int {
d.logger.DeregisterSink(d.sink)
close(d.doneCh)
return d.droppedCount
}
// Start registers a sink on the monitor's logger and starts sending
// received log messages over the returned channel.
func (d *monitor) Start() <-chan []byte {
// register our sink with the logger
streamCh := make(chan []byte, d.bufSize)
// run a go routine that listens for streamed
// log messages and sends them to streamCh
wg := new(sync.WaitGroup)
wg.Add(1)
go func() {
defer close(streamCh)
wg.Done()
for {
select {
case logLine := <-d.logCh:
select {
case <-d.doneCh:
return
case streamCh <- logLine:
}
case <-d.doneCh:
return
}
}
}()
//wait for the consumer loop to start before registering the sink to avoid filling the log channel
wg.Wait()
d.logger.RegisterSink(d.sink)
return streamCh
}
// Write attempts to send latest log to logCh
// it drops the log if channel is unavailable to receive
func (d *monitor) Write(p []byte) (int, error) {
// Check whether we have been stopped
select {
case <-d.doneCh:
return 0, errors.New("monitor stopped")
default:
}
bytes := make([]byte, len(p))
copy(bytes, p)
select {
case d.logCh <- bytes:
default:
d.droppedCount++
}
return len(p), nil
}