mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
1236 lines
30 KiB
1236 lines
30 KiB
package checks |
|
|
|
import ( |
|
"bufio" |
|
"context" |
|
"crypto/tls" |
|
"errors" |
|
"fmt" |
|
"io" |
|
"net" |
|
"net/http" |
|
"os" |
|
osexec "os/exec" |
|
"strings" |
|
"sync" |
|
"syscall" |
|
"time" |
|
|
|
http2 "golang.org/x/net/http2" |
|
|
|
"github.com/hashicorp/consul/agent/structs" |
|
"github.com/hashicorp/go-hclog" |
|
|
|
"github.com/armon/circbuf" |
|
"github.com/hashicorp/consul/agent/exec" |
|
"github.com/hashicorp/consul/api" |
|
"github.com/hashicorp/consul/lib" |
|
"github.com/hashicorp/go-cleanhttp" |
|
) |
|
|
|
const ( |
|
// MinInterval is the minimal interval between |
|
// two checks. Do not allow for a interval below this value. |
|
// Otherwise we risk fork bombing a system. |
|
MinInterval = time.Second |
|
|
|
// DefaultBufSize is the maximum size of the captured |
|
// check output by default. Prevents an enormous buffer |
|
// from being captured |
|
DefaultBufSize = 4 * 1024 // 4KB |
|
|
|
// UserAgent is the value of the User-Agent header |
|
// for HTTP health checks. |
|
UserAgent = "Consul Health Check" |
|
) |
|
|
|
// RPC is an interface that an RPC client must implement. This is a helper |
|
// interface that is implemented by the agent delegate for checks that need |
|
// to make RPC calls. |
|
type RPC interface { |
|
RPC(method string, args interface{}, reply interface{}) error |
|
} |
|
|
|
// CheckNotifier interface is used by the CheckMonitor |
|
// to notify when a check has a status update. The update |
|
// should take care to be idempotent. |
|
type CheckNotifier interface { |
|
UpdateCheck(checkID structs.CheckID, status, output string) |
|
// ServiceExists return true if the given service does exists |
|
ServiceExists(serviceID structs.ServiceID) bool |
|
} |
|
|
|
// CheckMonitor is used to periodically invoke a script to |
|
// determine the health of a given check. It is compatible with |
|
// nagios plugins and expects the output in the same format. |
|
// Supports failures_before_critical and success_before_passing. |
|
type CheckMonitor struct { |
|
Notify CheckNotifier |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
Script string |
|
ScriptArgs []string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
OutputMaxSize int |
|
StatusHandler *StatusHandler |
|
|
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
} |
|
|
|
// Start is used to start a check monitor. |
|
// Monitor runs until stop is called |
|
func (c *CheckMonitor) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
go c.run() |
|
} |
|
|
|
// Stop is used to stop a check monitor. |
|
func (c *CheckMonitor) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
} |
|
|
|
// run is invoked by a goroutine to run until Stop() is called |
|
func (c *CheckMonitor) run() { |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// check is invoked periodically to perform the script check |
|
func (c *CheckMonitor) check() { |
|
// Create the command |
|
var cmd *osexec.Cmd |
|
var err error |
|
if len(c.ScriptArgs) > 0 { |
|
cmd, err = exec.Subprocess(c.ScriptArgs) |
|
} else { |
|
cmd, err = exec.Script(c.Script) |
|
} |
|
if err != nil { |
|
c.Logger.Error("Check failed to setup", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
|
|
// Collect the output |
|
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize)) |
|
cmd.Stdout = output |
|
cmd.Stderr = output |
|
exec.SetSysProcAttr(cmd) |
|
|
|
truncateAndLogOutput := func() string { |
|
outputStr := string(output.Bytes()) |
|
if output.TotalWritten() > output.Size() { |
|
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", |
|
output.Size(), output.TotalWritten(), outputStr) |
|
} |
|
c.Logger.Trace("Check output", |
|
"check", c.CheckID.String(), |
|
"output", outputStr, |
|
) |
|
return outputStr |
|
} |
|
|
|
// Start the check |
|
if err := cmd.Start(); err != nil { |
|
c.Logger.Error("Check failed to invoke", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
|
|
// Wait for the check to complete |
|
waitCh := make(chan error, 1) |
|
go func() { |
|
waitCh <- cmd.Wait() |
|
}() |
|
|
|
timeout := 30 * time.Second |
|
if c.Timeout > 0 { |
|
timeout = c.Timeout |
|
} |
|
select { |
|
case <-time.After(timeout): |
|
if err := exec.KillCommandSubtree(cmd); err != nil { |
|
c.Logger.Warn("Check failed to kill after timeout", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
} |
|
|
|
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String()) |
|
c.Logger.Warn("Timed out running check", |
|
"check", c.CheckID.String(), |
|
"timeout", timeout.String(), |
|
) |
|
|
|
outputStr := truncateAndLogOutput() |
|
if len(outputStr) > 0 { |
|
msg += "\n\n" + outputStr |
|
} |
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg) |
|
|
|
// Now wait for the process to exit so we never start another |
|
// instance concurrently. |
|
<-waitCh |
|
return |
|
|
|
case err = <-waitCh: |
|
// The process returned before the timeout, proceed normally |
|
} |
|
|
|
// Check if the check passed |
|
outputStr := truncateAndLogOutput() |
|
if err == nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, outputStr) |
|
return |
|
} |
|
|
|
// If the exit code is 1, set check as warning |
|
exitErr, ok := err.(*osexec.ExitError) |
|
if ok { |
|
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok { |
|
code := status.ExitStatus() |
|
if code == 1 { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, outputStr) |
|
return |
|
} |
|
} |
|
} |
|
|
|
// Set the health as critical |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, outputStr) |
|
} |
|
|
|
// CheckTTL is used to apply a TTL to check status, |
|
// and enables clients to set the status of a check |
|
// but upon the TTL expiring, the check status is |
|
// automatically set to critical. |
|
type CheckTTL struct { |
|
Notify CheckNotifier |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
TTL time.Duration |
|
Logger hclog.Logger |
|
|
|
timer *time.Timer |
|
|
|
lastOutput string |
|
lastOutputLock sync.RWMutex |
|
|
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
|
|
OutputMaxSize int |
|
} |
|
|
|
// Start is used to start a check ttl, runs until Stop() |
|
func (c *CheckTTL) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if c.OutputMaxSize < 1 { |
|
c.OutputMaxSize = DefaultBufSize |
|
} |
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
c.timer = time.NewTimer(c.TTL) |
|
go c.run() |
|
} |
|
|
|
// Stop is used to stop a check ttl. |
|
func (c *CheckTTL) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.timer.Stop() |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
} |
|
|
|
// run is used to handle TTL expiration and to update the check status |
|
func (c *CheckTTL) run() { |
|
for { |
|
select { |
|
case <-c.timer.C: |
|
c.Logger.Warn("Check missed TTL, is now critical", |
|
"check", c.CheckID.String(), |
|
) |
|
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput()) |
|
|
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// getExpiredOutput formats the output for the case when the TTL is expired. |
|
func (c *CheckTTL) getExpiredOutput() string { |
|
c.lastOutputLock.RLock() |
|
defer c.lastOutputLock.RUnlock() |
|
|
|
const prefix = "TTL expired" |
|
if c.lastOutput == "" { |
|
return prefix |
|
} |
|
|
|
return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput) |
|
} |
|
|
|
// SetStatus is used to update the status of the check, |
|
// and to renew the TTL. If expired, TTL is restarted. |
|
// output is returned (might be truncated) |
|
func (c *CheckTTL) SetStatus(status, output string) string { |
|
c.Logger.Debug("Check status updated", |
|
"check", c.CheckID.String(), |
|
"status", status, |
|
) |
|
total := len(output) |
|
if total > c.OutputMaxSize { |
|
output = fmt.Sprintf("%s ... (captured %d of %d bytes)", |
|
output[:c.OutputMaxSize], c.OutputMaxSize, total) |
|
} |
|
c.Notify.UpdateCheck(c.CheckID, status, output) |
|
// Store the last output so we can retain it if the TTL expires. |
|
c.lastOutputLock.Lock() |
|
c.lastOutput = output |
|
c.lastOutputLock.Unlock() |
|
|
|
c.timer.Reset(c.TTL) |
|
return output |
|
} |
|
|
|
// CheckHTTP is used to periodically make an HTTP request to |
|
// determine the health of a given check. |
|
// The check is passing if the response code is 2XX. |
|
// The check is warning if the response code is 429. |
|
// The check is critical if the response code is anything else |
|
// or if the request returns an error |
|
// Supports failures_before_critical and success_before_passing. |
|
type CheckHTTP struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
HTTP string |
|
Header map[string][]string |
|
Method string |
|
Body string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
TLSClientConfig *tls.Config |
|
OutputMaxSize int |
|
StatusHandler *StatusHandler |
|
DisableRedirects bool |
|
|
|
httpClient *http.Client |
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
stopWg sync.WaitGroup |
|
|
|
// Set if checks are exposed through Connect proxies |
|
// If set, this is the target of check() |
|
ProxyHTTP string |
|
} |
|
|
|
func (c *CheckHTTP) CheckType() structs.CheckType { |
|
return structs.CheckType{ |
|
CheckID: c.CheckID.ID, |
|
HTTP: c.HTTP, |
|
Method: c.Method, |
|
Body: c.Body, |
|
Header: c.Header, |
|
Interval: c.Interval, |
|
ProxyHTTP: c.ProxyHTTP, |
|
Timeout: c.Timeout, |
|
OutputMaxSize: c.OutputMaxSize, |
|
} |
|
} |
|
|
|
// Start is used to start an HTTP check. |
|
// The check runs until stop is called |
|
func (c *CheckHTTP) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
|
|
if c.httpClient == nil { |
|
// Create the transport. We disable HTTP Keep-Alive's to prevent |
|
// failing checks due to the keepalive interval. |
|
trans := cleanhttp.DefaultTransport() |
|
trans.DisableKeepAlives = true |
|
|
|
// Take on the supplied TLS client config. |
|
trans.TLSClientConfig = c.TLSClientConfig |
|
|
|
// Create the HTTP client. |
|
c.httpClient = &http.Client{ |
|
Timeout: 10 * time.Second, |
|
Transport: trans, |
|
} |
|
if c.DisableRedirects { |
|
c.httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error { |
|
return http.ErrUseLastResponse |
|
} |
|
} |
|
if c.Timeout > 0 { |
|
c.httpClient.Timeout = c.Timeout |
|
} |
|
|
|
if c.OutputMaxSize < 1 { |
|
c.OutputMaxSize = DefaultBufSize |
|
} |
|
} |
|
|
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
c.stopWg.Add(1) |
|
go c.run() |
|
} |
|
|
|
// Stop is used to stop an HTTP check. |
|
func (c *CheckHTTP) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
|
|
// Wait for the c.run() goroutine to complete before returning. |
|
c.stopWg.Wait() |
|
} |
|
|
|
// run is invoked by a goroutine to run until Stop() is called |
|
func (c *CheckHTTP) run() { |
|
defer c.stopWg.Done() |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// check is invoked periodically to perform the HTTP check |
|
func (c *CheckHTTP) check() { |
|
method := c.Method |
|
if method == "" { |
|
method = "GET" |
|
} |
|
|
|
target := c.HTTP |
|
if c.ProxyHTTP != "" { |
|
target = c.ProxyHTTP |
|
} |
|
|
|
bodyReader := strings.NewReader(c.Body) |
|
req, err := http.NewRequest(method, target, bodyReader) |
|
if err != nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
|
|
req.Header = http.Header(c.Header) |
|
|
|
// this happens during testing but not in prod |
|
if req.Header == nil { |
|
req.Header = make(http.Header) |
|
} |
|
|
|
if host := req.Header.Get("Host"); host != "" { |
|
req.Host = host |
|
} |
|
|
|
if req.Header.Get("User-Agent") == "" { |
|
req.Header.Set("User-Agent", UserAgent) |
|
} |
|
if req.Header.Get("Accept") == "" { |
|
req.Header.Set("Accept", "text/plain, text/*, */*") |
|
} |
|
|
|
resp, err := c.httpClient.Do(req) |
|
if err != nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
defer resp.Body.Close() |
|
|
|
// Read the response into a circular buffer to limit the size |
|
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize)) |
|
if _, err := io.Copy(output, resp.Body); err != nil { |
|
c.Logger.Warn("Check error while reading body", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
} |
|
|
|
// Format the response body |
|
result := fmt.Sprintf("HTTP %s %s: %s Output: %s", method, target, resp.Status, output.String()) |
|
|
|
if resp.StatusCode >= 200 && resp.StatusCode <= 299 { |
|
// PASSING (2xx) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, result) |
|
} else if resp.StatusCode == 429 { |
|
// WARNING |
|
// 429 Too Many Requests (RFC 6585) |
|
// The user has sent too many requests in a given amount of time. |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, result) |
|
} else { |
|
// CRITICAL |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, result) |
|
} |
|
} |
|
|
|
type CheckH2PING struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
H2PING string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
TLSClientConfig *tls.Config |
|
StatusHandler *StatusHandler |
|
|
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
stopWg sync.WaitGroup |
|
} |
|
|
|
func shutdownHTTP2ClientConn(clientConn *http2.ClientConn, timeout time.Duration, checkIDString string, logger hclog.Logger) { |
|
ctx, cancel := context.WithTimeout(context.Background(), timeout/2) |
|
defer cancel() |
|
err := clientConn.Shutdown(ctx) |
|
if err != nil { |
|
logger.Warn("Shutdown of H2Ping check client connection gave an error", |
|
"check", checkIDString, |
|
"error", err) |
|
} |
|
} |
|
|
|
func (c *CheckH2PING) check() { |
|
t := &http2.Transport{} |
|
var dialFunc func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error) |
|
if c.TLSClientConfig != nil { |
|
t.TLSClientConfig = c.TLSClientConfig |
|
dialFunc = func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error) { |
|
dialer := &tls.Dialer{Config: tlscfg} |
|
return dialer.DialContext(ctx, network, address) |
|
} |
|
} else { |
|
t.AllowHTTP = true |
|
dialFunc = func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error) { |
|
dialer := &net.Dialer{} |
|
return dialer.DialContext(ctx, network, address) |
|
} |
|
} |
|
target := c.H2PING |
|
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout) |
|
defer cancel() |
|
conn, err := dialFunc(ctx, "tcp", target, c.TLSClientConfig) |
|
if err != nil { |
|
message := fmt.Sprintf("Failed to dial to %s: %s", target, err) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message) |
|
return |
|
} |
|
defer conn.Close() |
|
clientConn, err := t.NewClientConn(conn) |
|
if err != nil { |
|
message := fmt.Sprintf("Failed to create client connection %s", err) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message) |
|
return |
|
} |
|
defer shutdownHTTP2ClientConn(clientConn, c.Timeout, c.CheckID.String(), c.Logger) |
|
err = clientConn.Ping(ctx) |
|
if err == nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, "HTTP2 ping was successful") |
|
} else { |
|
message := fmt.Sprintf("HTTP2 ping failed: %s", err) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message) |
|
} |
|
} |
|
|
|
// Stop is used to stop an H2PING check. |
|
func (c *CheckH2PING) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
c.stopWg.Wait() |
|
} |
|
|
|
func (c *CheckH2PING) run() { |
|
defer c.stopWg.Done() |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (c *CheckH2PING) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if c.Timeout <= 0 { |
|
c.Timeout = 10 * time.Second |
|
} |
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
c.stopWg.Add(1) |
|
go c.run() |
|
} |
|
|
|
// CheckTCP is used to periodically make an TCP/UDP connection to |
|
// determine the health of a given check. |
|
// The check is passing if the connection succeeds |
|
// The check is critical if the connection returns an error |
|
// Supports failures_before_critical and success_before_passing. |
|
type CheckTCP struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
TCP string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
StatusHandler *StatusHandler |
|
|
|
dialer *net.Dialer |
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
} |
|
|
|
// Start is used to start a TCP check. |
|
// The check runs until stop is called |
|
func (c *CheckTCP) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
|
|
if c.dialer == nil { |
|
// Create the socket dialer |
|
c.dialer = &net.Dialer{ |
|
Timeout: 10 * time.Second, |
|
} |
|
if c.Timeout > 0 { |
|
c.dialer.Timeout = c.Timeout |
|
} |
|
} |
|
|
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
go c.run() |
|
} |
|
|
|
// Stop is used to stop a TCP check. |
|
func (c *CheckTCP) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
} |
|
|
|
// run is invoked by a goroutine to run until Stop() is called |
|
func (c *CheckTCP) run() { |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
// check is invoked periodically to perform the TCP check |
|
func (c *CheckTCP) check() { |
|
conn, err := c.dialer.Dial(`tcp`, c.TCP) |
|
if err != nil { |
|
c.Logger.Warn("Check socket connection failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
conn.Close() |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP)) |
|
} |
|
|
|
// CheckUDP is used to periodically send a UDP datagram to determine the health of a given check. |
|
// The check is passing if the connection succeeds, the response is bytes.Equal to the bytes passed |
|
// in or if the error returned is a timeout error |
|
// The check is critical if: the connection succeeds but the response is not equal to the bytes passed in, |
|
// the connection succeeds but the error returned is not a timeout error or the connection fails |
|
type CheckUDP struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
UDP string |
|
Message string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
StatusHandler *StatusHandler |
|
|
|
dialer *net.Dialer |
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
} |
|
|
|
func (c *CheckUDP) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
|
|
if c.dialer == nil { |
|
// Create the socket dialer |
|
c.dialer = &net.Dialer{ |
|
Timeout: 10 * time.Second, |
|
} |
|
if c.Timeout > 0 { |
|
c.dialer.Timeout = c.Timeout |
|
} |
|
} |
|
|
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
go c.run() |
|
} |
|
|
|
func (c *CheckUDP) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
} |
|
|
|
func (c *CheckUDP) run() { |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
|
|
} |
|
|
|
func (c *CheckUDP) check() { |
|
|
|
conn, err := c.dialer.Dial(`udp`, c.UDP) |
|
|
|
if err != nil { |
|
if e, ok := err.(net.Error); ok && e.Timeout() { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP)) |
|
return |
|
} else { |
|
c.Logger.Warn("Check socket connection failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
} |
|
defer conn.Close() |
|
|
|
n, err := fmt.Fprintf(conn, c.Message) |
|
if err != nil { |
|
c.Logger.Warn("Check socket write failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
|
|
if n != len(c.Message) { |
|
c.Logger.Warn("Check socket short write", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
|
|
if err != nil { |
|
c.Logger.Warn("Check socket write failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
_, err = bufio.NewReader(conn).Read(make([]byte, 1)) |
|
if err != nil { |
|
if strings.Contains(err.Error(), "i/o timeout") { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP)) |
|
return |
|
} else { |
|
c.Logger.Warn("Check socket read failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
return |
|
} |
|
} else if err == nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP)) |
|
} |
|
} |
|
|
|
// CheckDocker is used to periodically invoke a script to |
|
// determine the health of an application running inside a |
|
// Docker Container. We assume that the script is compatible |
|
// with nagios plugins and expects the output in the same format. |
|
// Supports failures_before_critical and success_before_passing. |
|
type CheckDocker struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
Script string |
|
ScriptArgs []string |
|
DockerContainerID string |
|
Shell string |
|
Interval time.Duration |
|
Logger hclog.Logger |
|
Client *DockerClient |
|
StatusHandler *StatusHandler |
|
|
|
stop chan struct{} |
|
} |
|
|
|
func (c *CheckDocker) Start() { |
|
if c.stop != nil { |
|
panic("Docker check already started") |
|
} |
|
|
|
if c.Logger == nil { |
|
c.Logger = hclog.New(&hclog.LoggerOptions{Output: io.Discard}) |
|
} |
|
|
|
if c.Shell == "" { |
|
c.Shell = os.Getenv("SHELL") |
|
if c.Shell == "" { |
|
c.Shell = "/bin/sh" |
|
} |
|
} |
|
c.stop = make(chan struct{}) |
|
go c.run() |
|
} |
|
|
|
func (c *CheckDocker) Stop() { |
|
if c.stop == nil { |
|
panic("Stop called before start") |
|
} |
|
close(c.stop) |
|
} |
|
|
|
func (c *CheckDocker) run() { |
|
defer c.Client.Close() |
|
firstWait := lib.RandomStagger(c.Interval) |
|
next := time.After(firstWait) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stop: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (c *CheckDocker) check() { |
|
var out string |
|
status, b, err := c.doCheck() |
|
if err != nil { |
|
c.Logger.Debug("Check failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
out = err.Error() |
|
} else { |
|
// out is already limited to CheckBufSize since we're getting a |
|
// limited buffer. So we don't need to truncate it just report |
|
// that it was truncated. |
|
out = string(b.Bytes()) |
|
if int(b.TotalWritten()) > len(out) { |
|
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out) |
|
} |
|
c.Logger.Trace("Check output", |
|
"check", c.CheckID.String(), |
|
"output", out, |
|
) |
|
} |
|
c.StatusHandler.updateCheck(c.CheckID, status, out) |
|
} |
|
|
|
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) { |
|
var cmd []string |
|
if len(c.ScriptArgs) > 0 { |
|
cmd = c.ScriptArgs |
|
} else { |
|
cmd = []string{c.Shell, "-c", c.Script} |
|
} |
|
|
|
execID, err := c.Client.CreateExec(c.DockerContainerID, cmd) |
|
if err != nil { |
|
return api.HealthCritical, nil, err |
|
} |
|
|
|
buf, err := c.Client.StartExec(c.DockerContainerID, execID) |
|
if err != nil { |
|
return api.HealthCritical, nil, err |
|
} |
|
|
|
exitCode, err := c.Client.InspectExec(c.DockerContainerID, execID) |
|
if err != nil { |
|
return api.HealthCritical, nil, err |
|
} |
|
|
|
switch exitCode { |
|
case 0: |
|
return api.HealthPassing, buf, nil |
|
case 1: |
|
c.Logger.Debug("Check failed", |
|
"check", c.CheckID.String(), |
|
"exit_code", exitCode, |
|
) |
|
return api.HealthWarning, buf, nil |
|
default: |
|
c.Logger.Debug("Check failed", |
|
"check", c.CheckID.String(), |
|
"exit_code", exitCode, |
|
) |
|
return api.HealthCritical, buf, nil |
|
} |
|
} |
|
|
|
// CheckGRPC is used to periodically send request to a gRPC server |
|
// application that implements gRPC health-checking protocol. |
|
// The check is passing if returned status is SERVING. |
|
// The check is critical if connection fails or returned status is |
|
// not SERVING. |
|
// Supports failures_before_critical and success_before_passing. |
|
type CheckGRPC struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
GRPC string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
TLSClientConfig *tls.Config |
|
Logger hclog.Logger |
|
StatusHandler *StatusHandler |
|
|
|
probe *GrpcHealthProbe |
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
|
|
// Set if checks are exposed through Connect proxies |
|
// If set, this is the target of check() |
|
ProxyGRPC string |
|
} |
|
|
|
func (c *CheckGRPC) CheckType() structs.CheckType { |
|
return structs.CheckType{ |
|
CheckID: c.CheckID.ID, |
|
GRPC: c.GRPC, |
|
ProxyGRPC: c.ProxyGRPC, |
|
Interval: c.Interval, |
|
Timeout: c.Timeout, |
|
} |
|
} |
|
|
|
func (c *CheckGRPC) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
timeout := 10 * time.Second |
|
if c.Timeout > 0 { |
|
timeout = c.Timeout |
|
} |
|
c.probe = NewGrpcHealthProbe(c.GRPC, timeout, c.TLSClientConfig) |
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
go c.run() |
|
} |
|
|
|
func (c *CheckGRPC) run() { |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (c *CheckGRPC) check() { |
|
target := c.GRPC |
|
if c.ProxyGRPC != "" { |
|
target = c.ProxyGRPC |
|
} |
|
|
|
err := c.probe.Check(target) |
|
if err != nil { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error()) |
|
} else { |
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target)) |
|
} |
|
} |
|
|
|
func (c *CheckGRPC) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
} |
|
|
|
type CheckOSService struct { |
|
CheckID structs.CheckID |
|
ServiceID structs.ServiceID |
|
OSService string |
|
Interval time.Duration |
|
Timeout time.Duration |
|
Logger hclog.Logger |
|
StatusHandler *StatusHandler |
|
Client *OSServiceClient |
|
|
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
stopWg sync.WaitGroup |
|
} |
|
|
|
func (c *CheckOSService) CheckType() structs.CheckType { |
|
return structs.CheckType{ |
|
CheckID: c.CheckID.ID, |
|
OSService: c.OSService, |
|
Interval: c.Interval, |
|
Timeout: c.Timeout, |
|
} |
|
} |
|
|
|
func (c *CheckOSService) Start() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
c.stop = false |
|
c.stopCh = make(chan struct{}) |
|
c.stopWg.Add(1) |
|
go c.run() |
|
} |
|
|
|
func (c *CheckOSService) Stop() { |
|
c.stopLock.Lock() |
|
defer c.stopLock.Unlock() |
|
if !c.stop { |
|
c.stop = true |
|
close(c.stopCh) |
|
} |
|
|
|
// Wait for the c.run() goroutine to complete before returning. |
|
c.stopWg.Wait() |
|
} |
|
|
|
func (c *CheckOSService) run() { |
|
defer c.stopWg.Done() |
|
// Get the randomized initial pause time |
|
initialPauseTime := lib.RandomStagger(c.Interval) |
|
next := time.After(initialPauseTime) |
|
for { |
|
select { |
|
case <-next: |
|
c.check() |
|
next = time.After(c.Interval) |
|
case <-c.stopCh: |
|
return |
|
} |
|
} |
|
} |
|
|
|
func (c *CheckOSService) doCheck() (string, error) { |
|
err := c.Client.Check(c.OSService) |
|
if err == nil { |
|
return api.HealthPassing, nil |
|
} |
|
if errors.Is(err, ErrOSServiceStatusCritical) { |
|
return api.HealthCritical, err |
|
} |
|
|
|
return api.HealthWarning, err |
|
} |
|
|
|
func (c *CheckOSService) check() { |
|
var out string |
|
var status string |
|
var err error |
|
|
|
waitCh := make(chan error, 1) |
|
go func() { |
|
status, err = c.doCheck() |
|
waitCh <- err |
|
}() |
|
|
|
timeout := 30 * time.Second |
|
if c.Timeout > 0 { |
|
timeout = c.Timeout |
|
} |
|
select { |
|
case <-time.After(timeout): |
|
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String()) |
|
c.Logger.Warn("Timed out running check", |
|
"check", c.CheckID.String(), |
|
"timeout", timeout.String(), |
|
) |
|
|
|
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, msg) |
|
|
|
// Now wait for the process to exit so we never start another |
|
// instance concurrently. |
|
<-waitCh |
|
return |
|
|
|
case err = <-waitCh: |
|
// The process returned before the timeout, proceed normally |
|
} |
|
|
|
out = fmt.Sprintf("Service \"%s\" is healthy", c.OSService) |
|
if err != nil { |
|
c.Logger.Debug("Check failed", |
|
"check", c.CheckID.String(), |
|
"error", err, |
|
) |
|
out = err.Error() |
|
} |
|
c.StatusHandler.updateCheck(c.CheckID, status, out) |
|
} |
|
|
|
// StatusHandler keep tracks of successive error/success counts and ensures |
|
// that status can be set to critical/passing only once the successive number of event |
|
// reaches the given threshold. |
|
type StatusHandler struct { |
|
inner CheckNotifier |
|
logger hclog.Logger |
|
successBeforePassing int |
|
successCounter int |
|
failuresBeforeWarning int |
|
failuresBeforeCritical int |
|
failuresCounter int |
|
} |
|
|
|
// NewStatusHandler set counters values to threshold in order to immediatly update status after first check. |
|
func NewStatusHandler(inner CheckNotifier, logger hclog.Logger, successBeforePassing, failuresBeforeWarning, failuresBeforeCritical int) *StatusHandler { |
|
return &StatusHandler{ |
|
logger: logger, |
|
inner: inner, |
|
successBeforePassing: successBeforePassing, |
|
successCounter: successBeforePassing, |
|
failuresBeforeWarning: failuresBeforeWarning, |
|
failuresBeforeCritical: failuresBeforeCritical, |
|
failuresCounter: failuresBeforeCritical, |
|
} |
|
} |
|
|
|
func (s *StatusHandler) updateCheck(checkID structs.CheckID, status, output string) { |
|
|
|
if status == api.HealthPassing || status == api.HealthWarning { |
|
s.successCounter++ |
|
s.failuresCounter = 0 |
|
if s.successCounter >= s.successBeforePassing { |
|
s.logger.Debug("Check status updated", |
|
"check", checkID.String(), |
|
"status", status, |
|
) |
|
s.inner.UpdateCheck(checkID, status, output) |
|
return |
|
} |
|
s.logger.Warn("Check passed but has not reached success threshold", |
|
"check", checkID.String(), |
|
"status", status, |
|
"success_count", s.successCounter, |
|
"success_threshold", s.successBeforePassing, |
|
) |
|
} else { |
|
s.failuresCounter++ |
|
s.successCounter = 0 |
|
if s.failuresCounter >= s.failuresBeforeCritical { |
|
s.logger.Warn("Check is now critical", "check", checkID.String()) |
|
s.inner.UpdateCheck(checkID, status, output) |
|
return |
|
} |
|
// Defaults to same value as failuresBeforeCritical if not set. |
|
if s.failuresCounter >= s.failuresBeforeWarning { |
|
s.logger.Warn("Check is now warning", "check", checkID.String()) |
|
s.inner.UpdateCheck(checkID, api.HealthWarning, output) |
|
return |
|
} |
|
s.logger.Warn("Check failed but has not reached warning/failure threshold", |
|
"check", checkID.String(), |
|
"status", status, |
|
"failure_count", s.failuresCounter, |
|
"warning_threshold", s.failuresBeforeWarning, |
|
"failure_threshold", s.failuresBeforeCritical, |
|
) |
|
} |
|
}
|
|
|