Add logWriter to agent Create() method

pull/2538/head
Kyle Havlovitz 2016-11-28 16:08:31 -05:00
parent 124f907063
commit 338e36cc5d
8 changed files with 74 additions and 29 deletions

View File

@ -125,7 +125,7 @@ type Agent struct {
// Create is used to create a new Agent. Returns // Create is used to create a new Agent. Returns
// the agent or potentially an error. // the agent or potentially an error.
func Create(config *Config, logOutput io.Writer) (*Agent, error) { func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) {
// Ensure we have a log sink // Ensure we have a log sink
if logOutput == nil { if logOutput == nil {
logOutput = os.Stderr logOutput = os.Stderr
@ -179,6 +179,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) {
config: config, config: config,
logger: log.New(logOutput, "", log.LstdFlags), logger: log.New(logOutput, "", log.LstdFlags),
logOutput: logOutput, logOutput: logOutput,
logWriter: logWriter,
checkReapAfter: make(map[types.CheckID]time.Duration), checkReapAfter: make(map[types.CheckID]time.Duration),
checkMonitors: make(map[types.CheckID]*CheckMonitor), checkMonitors: make(map[types.CheckID]*CheckMonitor),
checkTTLs: make(map[types.CheckID]*CheckTTL), checkTTLs: make(map[types.CheckID]*CheckTTL),

View File

@ -403,6 +403,15 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
return nil, nil return nil, nil
} }
var args structs.DCSpecificRequest
args.Datacenter = s.agent.config.Datacenter
s.parseToken(req, &args.Token)
// Validate that the given token has operator permissions
var reply structs.RaftConfigurationResponse
if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil {
return nil, err
}
// Get the provided loglevel // Get the provided loglevel
logLevel := req.URL.Query().Get("loglevel") logLevel := req.URL.Query().Get("loglevel")
if logLevel == "" { if logLevel == "" {
@ -441,6 +450,10 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) (
for { for {
select { select {
case <-notify: case <-notify:
s.agent.logWriter.DeregisterHandler(handler)
if handler.droppedCount > 0 {
s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount)
}
return nil, nil return nil, nil
case log := <-handler.logCh: case log := <-handler.logCh:
resp.Write([]byte(log + "\n")) resp.Write([]byte(log + "\n"))
@ -464,6 +477,7 @@ type httpLogHandler struct {
filter *logutils.LevelFilter filter *logutils.LevelFilter
logCh chan string logCh chan string
logger *log.Logger logger *log.Logger
droppedCount int
} }
func (h *httpLogHandler) HandleLog(log string) { func (h *httpLogHandler) HandleLog(log string) {
@ -476,10 +490,8 @@ func (h *httpLogHandler) HandleLog(log string) {
select { select {
case h.logCh <- log: case h.logCh <- log:
default: default:
// We can't log synchronously, since we are already being invoked // Just increment a counter for dropped logs to this handler; we can't log now
// from the logWriter, and a log will need to invoke Write() which // because the lock is already held by the LogWriter invoking this
// already holds the lock. We must therefor do the log async, so h.droppedCount += 1
// as to not deadlock
go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint")
} }
} }

View File

@ -5,6 +5,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"io" "io"
"io/ioutil"
"net/http" "net/http"
"net/http/httptest" "net/http/httptest"
"os" "os"
@ -1028,23 +1029,53 @@ func TestHTTPAgent_Monitor(t *testing.T) {
expectedLogs := bytes.Buffer{} expectedLogs := bytes.Buffer{}
logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter)
dir, srv := makeHTTPServerWithConfigLog(t, nil, logger) dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter)
srv.agent.logWriter = logWriter srv.agent.logWriter = logWriter
defer os.RemoveAll(dir) defer os.RemoveAll(dir)
defer srv.Shutdown() defer srv.Shutdown()
defer srv.agent.Shutdown() defer srv.agent.Shutdown()
// Begin streaming logs from the monitor endpoint // Try passing an invalid log level
req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=invalid", nil)
resp := newClosableRecorder() resp := newClosableRecorder()
if _, err := srv.AgentMonitor(resp, req); err != nil {
t.Fatalf("err: %v", err)
}
if resp.Code != 400 {
t.Fatalf("bad: %v", resp.Code)
}
body, _ := ioutil.ReadAll(resp.Body)
if !strings.Contains(string(body), "Unknown log level") {
t.Fatalf("bad: %s", body)
}
// Begin streaming logs from the monitor endpoint
req, _ = http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil)
resp = newClosableRecorder()
go func() { go func() {
if _, err := srv.AgentMonitor(resp, req); err != nil { if _, err := srv.AgentMonitor(resp, req); err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
}() }()
// Write the incoming logs to a channel for reading // Write the incoming logs from http to a channel for comparison
logCh := make(chan string, 0) logCh := make(chan string, 5)
// Block until the first log entry from http
testutil.WaitForResult(func() (bool, error) {
line, err := resp.Body.ReadString('\n')
if err != nil && err != io.EOF {
return false, fmt.Errorf("err: %v", err)
}
if line == "" {
return false, fmt.Errorf("blank line")
}
logCh <- line
return true, nil
}, func(err error) {
t.Fatal(err)
})
go func() { go func() {
for { for {
line, err := resp.Body.ReadString('\n') line, err := resp.Body.ReadString('\n')

View File

@ -16,6 +16,7 @@ import (
"github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/raft" "github.com/hashicorp/raft"
) )
@ -79,14 +80,14 @@ func nextConfig() *Config {
return conf return conf
} }
func makeAgentLog(t *testing.T, conf *Config, l io.Writer) (string, *Agent) { func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) {
dir, err := ioutil.TempDir("", "agent") dir, err := ioutil.TempDir("", "agent")
if err != nil { if err != nil {
t.Fatalf(fmt.Sprintf("err: %v", err)) t.Fatalf(fmt.Sprintf("err: %v", err))
} }
conf.DataDir = dir conf.DataDir = dir
agent, err := Create(conf, l) agent, err := Create(conf, l, writer)
if err != nil { if err != nil {
os.RemoveAll(dir) os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err)) t.Fatalf(fmt.Sprintf("err: %v", err))
@ -112,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
agent, err := Create(conf, nil) agent, err := Create(conf, nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -121,7 +122,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) {
} }
func makeAgent(t *testing.T, conf *Config) (string, *Agent) { func makeAgent(t *testing.T, conf *Config) (string, *Agent) {
return makeAgentLog(t, conf, nil) return makeAgentLog(t, conf, nil, nil)
} }
func externalIP() (string, error) { func externalIP() (string, error) {
@ -845,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) {
agent.Shutdown() agent.Shutdown()
// Should load it back during later start // Should load it back during later start
agent2, err := Create(config, nil) agent2, err := Create(config, nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -979,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) {
} }
config.Services = []*ServiceDefinition{svc2} config.Services = []*ServiceDefinition{svc2}
agent2, err := Create(config, nil) agent2, err := Create(config, nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -1072,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) {
agent.Shutdown() agent.Shutdown()
// Should load it back during later start // Should load it back during later start
agent2, err := Create(config, nil) agent2, err := Create(config, nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
@ -1165,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) {
} }
config.Checks = []*CheckDefinition{check2} config.Checks = []*CheckDefinition{check2}
agent2, err := Create(config, nil) agent2, err := Create(config, nil, nil)
if err != nil { if err != nil {
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }

View File

@ -466,12 +466,11 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) {
// setupAgent is used to start the agent and various interfaces // setupAgent is used to start the agent and various interfaces
func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error {
c.Ui.Output("Starting Consul agent...") c.Ui.Output("Starting Consul agent...")
agent, err := Create(config, logOutput) agent, err := Create(config, logOutput, logWriter)
if err != nil { if err != nil {
c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err))
return err return err
} }
agent.logWriter = logWriter
c.agent = agent c.agent = agent
// Setup the RPC listener // Setup the RPC listener

View File

@ -19,6 +19,7 @@ import (
"time" "time"
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/consul/logger"
"github.com/hashicorp/consul/testutil" "github.com/hashicorp/consul/testutil"
"github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-cleanhttp"
) )
@ -28,10 +29,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) {
} }
func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) {
return makeHTTPServerWithConfigLog(t, cb, nil) return makeHTTPServerWithConfigLog(t, cb, nil, nil)
} }
func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) { func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) {
configTry := 0 configTry := 0
RECONF: RECONF:
configTry += 1 configTry += 1
@ -40,7 +41,7 @@ RECONF:
cb(conf) cb(conf)
} }
dir, agent := makeAgentLog(t, conf, l) dir, agent := makeAgentLog(t, conf, l, logWriter)
servers, err := NewHTTPServers(agent, conf, agent.logOutput) servers, err := NewHTTPServers(agent, conf, agent.logOutput)
if err != nil { if err != nil {
if configTry < 3 { if configTry < 3 {

View File

@ -61,7 +61,7 @@ RECONF:
t.Fatalf("err: %s", err) t.Fatalf("err: %s", err)
} }
dir, agent := makeAgentLog(t, conf, mult) dir, agent := makeAgentLog(t, conf, mult, lw)
rpc := NewAgentRPC(agent, l, mult, lw) rpc := NewAgentRPC(agent, l, mult, lw)
rpcClient, err := NewRPCClient(l.Addr().String()) rpcClient, err := NewRPCClient(l.Addr().String())

View File

@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper {
} }
conf.DataDir = dir conf.DataDir = dir
a, err := agent.Create(conf, lw) a, err := agent.Create(conf, lw, nil)
if err != nil { if err != nil {
os.RemoveAll(dir) os.RemoveAll(dir)
t.Fatalf(fmt.Sprintf("err: %v", err)) t.Fatalf(fmt.Sprintf("err: %v", err))