From 338e36cc5d92789cef52741a0f6d4de9e7b6639a Mon Sep 17 00:00:00 2001 From: Kyle Havlovitz Date: Mon, 28 Nov 2016 16:08:31 -0500 Subject: [PATCH] Add logWriter to agent Create() method --- command/agent/agent.go | 3 +- command/agent/agent_endpoint.go | 28 +++++++++++++------ command/agent/agent_endpoint_test.go | 41 ++++++++++++++++++++++++---- command/agent/agent_test.go | 17 ++++++------ command/agent/command.go | 3 +- command/agent/http_test.go | 7 +++-- command/agent/rpc_client_test.go | 2 +- command/util_test.go | 2 +- 8 files changed, 74 insertions(+), 29 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 6bee6fba07..f3c5461b84 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -125,7 +125,7 @@ type Agent struct { // Create is used to create a new Agent. Returns // the agent or potentially an error. -func Create(config *Config, logOutput io.Writer) (*Agent, error) { +func Create(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) (*Agent, error) { // Ensure we have a log sink if logOutput == nil { logOutput = os.Stderr @@ -179,6 +179,7 @@ func Create(config *Config, logOutput io.Writer) (*Agent, error) { config: config, logger: log.New(logOutput, "", log.LstdFlags), logOutput: logOutput, + logWriter: logWriter, checkReapAfter: make(map[types.CheckID]time.Duration), checkMonitors: make(map[types.CheckID]*CheckMonitor), checkTTLs: make(map[types.CheckID]*CheckTTL), diff --git a/command/agent/agent_endpoint.go b/command/agent/agent_endpoint.go index fd4586b20b..cb4afbde92 100644 --- a/command/agent/agent_endpoint.go +++ b/command/agent/agent_endpoint.go @@ -403,6 +403,15 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( return nil, nil } + var args structs.DCSpecificRequest + args.Datacenter = s.agent.config.Datacenter + s.parseToken(req, &args.Token) + // Validate that the given token has operator permissions + var reply structs.RaftConfigurationResponse + if err := s.agent.RPC("Operator.RaftGetConfiguration", &args, &reply); err != nil { + return nil, err + } + // Get the provided loglevel logLevel := req.URL.Query().Get("loglevel") if logLevel == "" { @@ -441,6 +450,10 @@ func (s *HTTPServer) AgentMonitor(resp http.ResponseWriter, req *http.Request) ( for { select { case <-notify: + s.agent.logWriter.DeregisterHandler(handler) + if handler.droppedCount > 0 { + s.agent.logger.Printf("[WARN] agent: Dropped %d logs during monitor request", handler.droppedCount) + } return nil, nil case log := <-handler.logCh: resp.Write([]byte(log + "\n")) @@ -461,9 +474,10 @@ func (s *HTTPServer) syncChanges() { } type httpLogHandler struct { - filter *logutils.LevelFilter - logCh chan string - logger *log.Logger + filter *logutils.LevelFilter + logCh chan string + logger *log.Logger + droppedCount int } func (h *httpLogHandler) HandleLog(log string) { @@ -476,10 +490,8 @@ func (h *httpLogHandler) HandleLog(log string) { select { case h.logCh <- log: default: - // We can't log synchronously, since we are already being invoked - // from the logWriter, and a log will need to invoke Write() which - // already holds the lock. We must therefor do the log async, so - // as to not deadlock - go h.logger.Printf("[WARN] Dropping logs to monitor http endpoint") + // Just increment a counter for dropped logs to this handler; we can't log now + // because the lock is already held by the LogWriter invoking this + h.droppedCount += 1 } } diff --git a/command/agent/agent_endpoint_test.go b/command/agent/agent_endpoint_test.go index 6bc875a304..1919855e56 100644 --- a/command/agent/agent_endpoint_test.go +++ b/command/agent/agent_endpoint_test.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "net/http" "net/http/httptest" "os" @@ -1028,23 +1029,53 @@ func TestHTTPAgent_Monitor(t *testing.T) { expectedLogs := bytes.Buffer{} logger := io.MultiWriter(os.Stdout, &expectedLogs, logWriter) - dir, srv := makeHTTPServerWithConfigLog(t, nil, logger) + dir, srv := makeHTTPServerWithConfigLog(t, nil, logger, logWriter) srv.agent.logWriter = logWriter defer os.RemoveAll(dir) defer srv.Shutdown() defer srv.agent.Shutdown() - // Begin streaming logs from the monitor endpoint - req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + // Try passing an invalid log level + req, _ := http.NewRequest("GET", "/v1/agent/monitor?loglevel=invalid", nil) resp := newClosableRecorder() + if _, err := srv.AgentMonitor(resp, req); err != nil { + t.Fatalf("err: %v", err) + } + if resp.Code != 400 { + t.Fatalf("bad: %v", resp.Code) + } + body, _ := ioutil.ReadAll(resp.Body) + if !strings.Contains(string(body), "Unknown log level") { + t.Fatalf("bad: %s", body) + } + + // Begin streaming logs from the monitor endpoint + req, _ = http.NewRequest("GET", "/v1/agent/monitor?loglevel=debug", nil) + resp = newClosableRecorder() go func() { if _, err := srv.AgentMonitor(resp, req); err != nil { t.Fatalf("err: %s", err) } }() - // Write the incoming logs to a channel for reading - logCh := make(chan string, 0) + // Write the incoming logs from http to a channel for comparison + logCh := make(chan string, 5) + + // Block until the first log entry from http + testutil.WaitForResult(func() (bool, error) { + line, err := resp.Body.ReadString('\n') + if err != nil && err != io.EOF { + return false, fmt.Errorf("err: %v", err) + } + if line == "" { + return false, fmt.Errorf("blank line") + } + logCh <- line + return true, nil + }, func(err error) { + t.Fatal(err) + }) + go func() { for { line, err := resp.Body.ReadString('\n') diff --git a/command/agent/agent_test.go b/command/agent/agent_test.go index 92740f3520..42aed2f8f5 100644 --- a/command/agent/agent_test.go +++ b/command/agent/agent_test.go @@ -16,6 +16,7 @@ import ( "github.com/hashicorp/consul/consul" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/raft" ) @@ -79,14 +80,14 @@ func nextConfig() *Config { return conf } -func makeAgentLog(t *testing.T, conf *Config, l io.Writer) (string, *Agent) { +func makeAgentLog(t *testing.T, conf *Config, l io.Writer, writer *logger.LogWriter) (string, *Agent) { dir, err := ioutil.TempDir("", "agent") if err != nil { t.Fatalf(fmt.Sprintf("err: %v", err)) } conf.DataDir = dir - agent, err := Create(conf, l) + agent, err := Create(conf, l, writer) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err)) @@ -112,7 +113,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { t.Fatalf("err: %s", err) } - agent, err := Create(conf, nil) + agent, err := Create(conf, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -121,7 +122,7 @@ func makeAgentKeyring(t *testing.T, conf *Config, key string) (string, *Agent) { } func makeAgent(t *testing.T, conf *Config) (string, *Agent) { - return makeAgentLog(t, conf, nil) + return makeAgentLog(t, conf, nil, nil) } func externalIP() (string, error) { @@ -845,7 +846,7 @@ func TestAgent_PersistService(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -979,7 +980,7 @@ func TestAgent_PurgeServiceOnDuplicate(t *testing.T) { } config.Services = []*ServiceDefinition{svc2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1072,7 +1073,7 @@ func TestAgent_PersistCheck(t *testing.T) { agent.Shutdown() // Should load it back during later start - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } @@ -1165,7 +1166,7 @@ func TestAgent_PurgeCheckOnDuplicate(t *testing.T) { } config.Checks = []*CheckDefinition{check2} - agent2, err := Create(config, nil) + agent2, err := Create(config, nil, nil) if err != nil { t.Fatalf("err: %s", err) } diff --git a/command/agent/command.go b/command/agent/command.go index c4df74660d..d47537363a 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -466,12 +466,11 @@ func (c *Config) discoverEc2Hosts(logger *log.Logger) ([]string, error) { // setupAgent is used to start the agent and various interfaces func (c *Command) setupAgent(config *Config, logOutput io.Writer, logWriter *logger.LogWriter) error { c.Ui.Output("Starting Consul agent...") - agent, err := Create(config, logOutput) + agent, err := Create(config, logOutput, logWriter) if err != nil { c.Ui.Error(fmt.Sprintf("Error starting agent: %s", err)) return err } - agent.logWriter = logWriter c.agent = agent // Setup the RPC listener diff --git a/command/agent/http_test.go b/command/agent/http_test.go index 5953f4c320..b02b659705 100644 --- a/command/agent/http_test.go +++ b/command/agent/http_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/consul/logger" "github.com/hashicorp/consul/testutil" "github.com/hashicorp/go-cleanhttp" ) @@ -28,10 +29,10 @@ func makeHTTPServer(t *testing.T) (string, *HTTPServer) { } func makeHTTPServerWithConfig(t *testing.T, cb func(c *Config)) (string, *HTTPServer) { - return makeHTTPServerWithConfigLog(t, cb, nil) + return makeHTTPServerWithConfigLog(t, cb, nil, nil) } -func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer) (string, *HTTPServer) { +func makeHTTPServerWithConfigLog(t *testing.T, cb func(c *Config), l io.Writer, logWriter *logger.LogWriter) (string, *HTTPServer) { configTry := 0 RECONF: configTry += 1 @@ -40,7 +41,7 @@ RECONF: cb(conf) } - dir, agent := makeAgentLog(t, conf, l) + dir, agent := makeAgentLog(t, conf, l, logWriter) servers, err := NewHTTPServers(agent, conf, agent.logOutput) if err != nil { if configTry < 3 { diff --git a/command/agent/rpc_client_test.go b/command/agent/rpc_client_test.go index a8f473f767..a6feb12f51 100644 --- a/command/agent/rpc_client_test.go +++ b/command/agent/rpc_client_test.go @@ -61,7 +61,7 @@ RECONF: t.Fatalf("err: %s", err) } - dir, agent := makeAgentLog(t, conf, mult) + dir, agent := makeAgentLog(t, conf, mult, lw) rpc := NewAgentRPC(agent, l, mult, lw) rpcClient, err := NewRPCClient(l.Addr().String()) diff --git a/command/util_test.go b/command/util_test.go index 8260a42173..1a4a1e8927 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -74,7 +74,7 @@ func testAgentWithConfig(t *testing.T, cb func(c *agent.Config)) *agentWrapper { } conf.DataDir = dir - a, err := agent.Create(conf, lw) + a, err := agent.Create(conf, lw, nil) if err != nil { os.RemoveAll(dir) t.Fatalf(fmt.Sprintf("err: %v", err))