consul/agent/checks/check.go

1240 lines
30 KiB
Go
Raw Normal View History

// Copyright (c) HashiCorp, Inc.
[COMPLIANCE] License changes (#18443) * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Adding explicit MPL license for sub-package This directory and its subdirectories (packages) contain files licensed with the MPLv2 `LICENSE` file in this directory and are intentionally licensed separately from the BSL `LICENSE` file at the root of this repository. * Updating the license from MPL to Business Source License Going forward, this project will be licensed under the Business Source License v1.1. Please see our blog post for more details at <Blog URL>, FAQ at www.hashicorp.com/licensing-faq, and details of the license at www.hashicorp.com/bsl. * add missing license headers * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 * Update copyright file headers to BUSL-1.1 --------- Co-authored-by: hashicorp-copywrite[bot] <110428419+hashicorp-copywrite[bot]@users.noreply.github.com>
2023-08-11 13:12:13 +00:00
// SPDX-License-Identifier: BUSL-1.1
package checks
import (
UDP check for service stanza #12221 (#12722) * UDP check for service stanza #12221 * add pass status on timeout condition * delete useless files * Update check_test.go improve comment in test * fix test * fix requested changes and update TestRuntimeConfig_Sanitize.golden * add freeport to TestCheckUDPCritical * improve comment for CheckUDP struct * fix requested changes * fix requested changes * fix requested changes * add UDP to proto * add UDP to proto and add a changelog * add requested test on agent_endpoint_test.go * add test for given endpoints * fix failing tests * add documentation for udp healthcheck * regenerate proto using buf * Update website/content/api-docs/agent/check.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/api-docs/agent/check.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/discovery/checks.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/ecs/configuration-reference.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/ecs/configuration-reference.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * add debug echo * add debug circle-ci * add debug circle-ci bash * use echo instead of status_stage * remove debug and status from devtools script and use echo instead * Update website/content/api-docs/agent/check.mdx Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * fix test * replace status_stage with status * replace functions with echo Co-authored-by: Dhia Ayachi <dhia@hashicorp.com> Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>
2022-06-06 19:13:19 +00:00
"bufio"
"context"
"crypto/tls"
2022-06-07 17:27:14 +00:00
"errors"
"fmt"
"io"
"net"
"net/http"
"os"
osexec "os/exec"
"strings"
"sync"
"syscall"
"time"
http2 "golang.org/x/net/http2"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/go-hclog"
"github.com/armon/circbuf"
"github.com/hashicorp/consul/agent/exec"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-cleanhttp"
)
const (
// MinInterval is the minimal interval between
// two checks. Do not allow for a interval below this value.
// Otherwise we risk fork bombing a system.
MinInterval = time.Second
// DefaultBufSize is the maximum size of the captured
2020-01-27 13:00:33 +00:00
// check output by default. Prevents an enormous buffer
// from being captured
DefaultBufSize = 4 * 1024 // 4KB
// UserAgent is the value of the User-Agent header
// for HTTP health checks.
UserAgent = "Consul Health Check"
)
2018-06-30 01:15:48 +00:00
// RPC is an interface that an RPC client must implement. This is a helper
// interface that is implemented by the agent delegate for checks that need
// to make RPC calls.
type RPC interface {
RPC(ctx context.Context, method string, args interface{}, reply interface{}) error
2018-06-30 01:15:48 +00:00
}
// CheckNotifier interface is used by the CheckMonitor
// to notify when a check has a status update. The update
// should take care to be idempotent.
type CheckNotifier interface {
UpdateCheck(checkID structs.CheckID, status, output string)
// ServiceExists return true if the given service does exists
ServiceExists(serviceID structs.ServiceID) bool
}
// CheckMonitor is used to periodically invoke a script to
// determine the health of a given check. It is compatible with
// nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckMonitor struct {
Notify CheckNotifier
CheckID structs.CheckID
ServiceID structs.ServiceID
Script string
ScriptArgs []string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
OutputMaxSize int
StatusHandler *StatusHandler
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// Start is used to start a check monitor.
// Monitor runs until stop is called
func (c *CheckMonitor) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}
// Stop is used to stop a check monitor.
func (c *CheckMonitor) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckMonitor) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
// check is invoked periodically to perform the script check
func (c *CheckMonitor) check() {
// Create the command
var cmd *osexec.Cmd
var err error
if len(c.ScriptArgs) > 0 {
cmd, err = exec.Subprocess(c.ScriptArgs)
} else {
cmd, err = exec.Script(c.Script)
}
if err != nil {
c.Logger.Error("Check failed to setup",
"check", c.CheckID.String(),
"error", err,
)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
// Collect the output
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
cmd.Stdout = output
cmd.Stderr = output
exec.SetSysProcAttr(cmd)
truncateAndLogOutput := func() string {
outputStr := string(output.Bytes())
if output.TotalWritten() > output.Size() {
outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s",
output.Size(), output.TotalWritten(), outputStr)
}
c.Logger.Trace("Check output",
"check", c.CheckID.String(),
"output", outputStr,
)
return outputStr
}
// Start the check
if err := cmd.Start(); err != nil {
c.Logger.Error("Check failed to invoke",
"check", c.CheckID.String(),
"error", err,
)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
// Wait for the check to complete
waitCh := make(chan error, 1)
go func() {
waitCh <- cmd.Wait()
}()
timeout := 30 * time.Second
if c.Timeout > 0 {
timeout = c.Timeout
}
select {
case <-time.After(timeout):
if err := exec.KillCommandSubtree(cmd); err != nil {
c.Logger.Warn("Check failed to kill after timeout",
"check", c.CheckID.String(),
"error", err,
)
}
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String())
c.Logger.Warn("Timed out running check",
"check", c.CheckID.String(),
"timeout", timeout.String(),
)
outputStr := truncateAndLogOutput()
if len(outputStr) > 0 {
msg += "\n\n" + outputStr
}
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, msg)
// Now wait for the process to exit so we never start another
// instance concurrently.
<-waitCh
return
case err = <-waitCh:
// The process returned before the timeout, proceed normally
}
// Check if the check passed
outputStr := truncateAndLogOutput()
if err == nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, outputStr)
return
}
// If the exit code is 1, set check as warning
exitErr, ok := err.(*osexec.ExitError)
if ok {
if status, ok := exitErr.Sys().(syscall.WaitStatus); ok {
code := status.ExitStatus()
if code == 1 {
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, outputStr)
return
}
}
}
// Set the health as critical
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, outputStr)
}
// CheckTTL is used to apply a TTL to check status,
// and enables clients to set the status of a check
// but upon the TTL expiring, the check status is
// automatically set to critical.
type CheckTTL struct {
Notify CheckNotifier
CheckID structs.CheckID
ServiceID structs.ServiceID
TTL time.Duration
Logger hclog.Logger
timer *time.Timer
lastOutput string
lastOutputLock sync.RWMutex
stop bool
stopCh chan struct{}
stopLock sync.Mutex
OutputMaxSize int
}
// Start is used to start a check ttl, runs until Stop()
func (c *CheckTTL) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.OutputMaxSize < 1 {
c.OutputMaxSize = DefaultBufSize
}
c.stop = false
c.stopCh = make(chan struct{})
c.timer = time.NewTimer(c.TTL)
go c.run()
}
// Stop is used to stop a check ttl.
func (c *CheckTTL) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.timer.Stop()
c.stop = true
close(c.stopCh)
}
}
// run is used to handle TTL expiration and to update the check status
func (c *CheckTTL) run() {
for {
select {
case <-c.timer.C:
c.Logger.Warn("Check missed TTL, is now critical",
"check", c.CheckID.String(),
)
c.Notify.UpdateCheck(c.CheckID, api.HealthCritical, c.getExpiredOutput())
case <-c.stopCh:
return
}
}
}
// getExpiredOutput formats the output for the case when the TTL is expired.
func (c *CheckTTL) getExpiredOutput() string {
c.lastOutputLock.RLock()
defer c.lastOutputLock.RUnlock()
const prefix = "TTL expired"
if c.lastOutput == "" {
return prefix
}
return fmt.Sprintf("%s (last output before timeout follows): %s", prefix, c.lastOutput)
}
// SetStatus is used to update the status of the check,
// and to renew the TTL. If expired, TTL is restarted.
// output is returned (might be truncated)
func (c *CheckTTL) SetStatus(status, output string) string {
c.Logger.Debug("Check status updated",
"check", c.CheckID.String(),
"status", status,
)
total := len(output)
if total > c.OutputMaxSize {
output = fmt.Sprintf("%s ... (captured %d of %d bytes)",
output[:c.OutputMaxSize], c.OutputMaxSize, total)
}
c.Notify.UpdateCheck(c.CheckID, status, output)
// Store the last output so we can retain it if the TTL expires.
c.lastOutputLock.Lock()
c.lastOutput = output
c.lastOutputLock.Unlock()
c.timer.Reset(c.TTL)
return output
}
// CheckHTTP is used to periodically make an HTTP request to
// determine the health of a given check.
// The check is passing if the response code is 2XX.
// The check is warning if the response code is 429.
// The check is critical if the response code is anything else
// or if the request returns an error
// Supports failures_before_critical and success_before_passing.
type CheckHTTP struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
HTTP string
Header map[string][]string
Method string
Body string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
TLSClientConfig *tls.Config
OutputMaxSize int
StatusHandler *StatusHandler
DisableRedirects bool
httpClient *http.Client
stop bool
stopCh chan struct{}
stopLock sync.Mutex
stopWg sync.WaitGroup
// Set if checks are exposed through Connect proxies
// If set, this is the target of check()
ProxyHTTP string
}
func (c *CheckHTTP) CheckType() structs.CheckType {
return structs.CheckType{
CheckID: c.CheckID.ID,
HTTP: c.HTTP,
Method: c.Method,
Body: c.Body,
Header: c.Header,
Interval: c.Interval,
ProxyHTTP: c.ProxyHTTP,
Timeout: c.Timeout,
OutputMaxSize: c.OutputMaxSize,
}
}
// Start is used to start an HTTP check.
// The check runs until stop is called
func (c *CheckHTTP) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.httpClient == nil {
// Create the transport. We disable HTTP Keep-Alive's to prevent
// failing checks due to the keepalive interval.
trans := cleanhttp.DefaultTransport()
trans.DisableKeepAlives = true
// Take on the supplied TLS client config.
trans.TLSClientConfig = c.TLSClientConfig
// Create the HTTP client.
c.httpClient = &http.Client{
Timeout: 10 * time.Second,
Transport: trans,
}
if c.DisableRedirects {
c.httpClient.CheckRedirect = func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
}
}
if c.Timeout > 0 {
c.httpClient.Timeout = c.Timeout
}
if c.OutputMaxSize < 1 {
c.OutputMaxSize = DefaultBufSize
}
}
c.stop = false
c.stopCh = make(chan struct{})
c.stopWg.Add(1)
go c.run()
}
// Stop is used to stop an HTTP check.
func (c *CheckHTTP) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
// Wait for the c.run() goroutine to complete before returning.
c.stopWg.Wait()
}
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckHTTP) run() {
defer c.stopWg.Done()
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
// check is invoked periodically to perform the HTTP check
func (c *CheckHTTP) check() {
method := c.Method
if method == "" {
method = "GET"
}
target := c.HTTP
if c.ProxyHTTP != "" {
target = c.ProxyHTTP
}
bodyReader := strings.NewReader(c.Body)
req, err := http.NewRequest(method, target, bodyReader)
if err != nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
req.Header = http.Header(c.Header)
// this happens during testing but not in prod
if req.Header == nil {
req.Header = make(http.Header)
}
if host := req.Header.Get("Host"); host != "" {
req.Host = host
}
if req.Header.Get("User-Agent") == "" {
req.Header.Set("User-Agent", UserAgent)
}
if req.Header.Get("Accept") == "" {
req.Header.Set("Accept", "text/plain, text/*, */*")
}
resp, err := c.httpClient.Do(req)
if err != nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
defer resp.Body.Close()
// Read the response into a circular buffer to limit the size
output, _ := circbuf.NewBuffer(int64(c.OutputMaxSize))
if _, err := io.Copy(output, resp.Body); err != nil {
c.Logger.Warn("Check error while reading body",
"check", c.CheckID.String(),
"error", err,
)
}
// Format the response body
result := fmt.Sprintf("HTTP %s %s: %s Output: %s", method, target, resp.Status, output.String())
if resp.StatusCode >= 200 && resp.StatusCode <= 299 {
// PASSING (2xx)
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, result)
} else if resp.StatusCode == 429 {
// WARNING
// 429 Too Many Requests (RFC 6585)
// The user has sent too many requests in a given amount of time.
c.StatusHandler.updateCheck(c.CheckID, api.HealthWarning, result)
} else {
// CRITICAL
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, result)
}
}
type CheckH2PING struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
H2PING string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
TLSClientConfig *tls.Config
StatusHandler *StatusHandler
stop bool
stopCh chan struct{}
stopLock sync.Mutex
2021-04-11 19:12:33 +00:00
stopWg sync.WaitGroup
}
func shutdownHTTP2ClientConn(clientConn *http2.ClientConn, timeout time.Duration, checkIDString string, logger hclog.Logger) {
ctx, cancel := context.WithTimeout(context.Background(), timeout/2)
defer cancel()
err := clientConn.Shutdown(ctx)
if err != nil {
logger.Warn("Shutdown of H2Ping check client connection gave an error",
"check", checkIDString,
"error", err)
}
}
func (c *CheckH2PING) check() {
t := &http2.Transport{}
var dialFunc func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error)
if c.TLSClientConfig != nil {
t.TLSClientConfig = c.TLSClientConfig
dialFunc = func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error) {
dialer := &tls.Dialer{Config: tlscfg}
return dialer.DialContext(ctx, network, address)
}
} else {
t.AllowHTTP = true
dialFunc = func(ctx context.Context, network, address string, tlscfg *tls.Config) (net.Conn, error) {
dialer := &net.Dialer{}
return dialer.DialContext(ctx, network, address)
}
}
target := c.H2PING
ctx, cancel := context.WithTimeout(context.Background(), c.Timeout)
defer cancel()
conn, err := dialFunc(ctx, "tcp", target, c.TLSClientConfig)
if err != nil {
message := fmt.Sprintf("Failed to dial to %s: %s", target, err)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message)
return
}
defer conn.Close()
clientConn, err := t.NewClientConn(conn)
if err != nil {
message := fmt.Sprintf("Failed to create client connection %s", err)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message)
return
}
defer shutdownHTTP2ClientConn(clientConn, c.Timeout, c.CheckID.String(), c.Logger)
err = clientConn.Ping(ctx)
if err == nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, "HTTP2 ping was successful")
} else {
message := fmt.Sprintf("HTTP2 ping failed: %s", err)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, message)
}
}
// Stop is used to stop an H2PING check.
func (c *CheckH2PING) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
2021-04-11 19:12:33 +00:00
c.stopWg.Wait()
}
func (c *CheckH2PING) run() {
2021-04-11 19:12:33 +00:00
defer c.stopWg.Done()
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
func (c *CheckH2PING) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.Timeout <= 0 {
c.Timeout = 10 * time.Second
}
c.stop = false
c.stopCh = make(chan struct{})
2021-04-11 19:12:33 +00:00
c.stopWg.Add(1)
go c.run()
}
// CheckTCP is used to periodically make an TCP/UDP connection to
// determine the health of a given check.
// The check is passing if the connection succeeds
// The check is critical if the connection returns an error
// Supports failures_before_critical and success_before_passing.
type CheckTCP struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
TCP string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
StatusHandler *StatusHandler
dialer *net.Dialer
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// Start is used to start a TCP check.
// The check runs until stop is called
func (c *CheckTCP) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.dialer == nil {
// Create the socket dialer
c.dialer = &net.Dialer{
Timeout: 10 * time.Second,
}
if c.Timeout > 0 {
c.dialer.Timeout = c.Timeout
}
}
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}
// Stop is used to stop a TCP check.
func (c *CheckTCP) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
// run is invoked by a goroutine to run until Stop() is called
func (c *CheckTCP) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
// check is invoked periodically to perform the TCP check
func (c *CheckTCP) check() {
conn, err := c.dialer.Dial(`tcp`, c.TCP)
if err != nil {
c.Logger.Warn("Check socket connection failed",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
conn.Close()
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("TCP connect %s: Success", c.TCP))
}
UDP check for service stanza #12221 (#12722) * UDP check for service stanza #12221 * add pass status on timeout condition * delete useless files * Update check_test.go improve comment in test * fix test * fix requested changes and update TestRuntimeConfig_Sanitize.golden * add freeport to TestCheckUDPCritical * improve comment for CheckUDP struct * fix requested changes * fix requested changes * fix requested changes * add UDP to proto * add UDP to proto and add a changelog * add requested test on agent_endpoint_test.go * add test for given endpoints * fix failing tests * add documentation for udp healthcheck * regenerate proto using buf * Update website/content/api-docs/agent/check.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/api-docs/agent/check.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/discovery/checks.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/ecs/configuration-reference.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * Update website/content/docs/ecs/configuration-reference.mdx Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> * add debug echo * add debug circle-ci * add debug circle-ci bash * use echo instead of status_stage * remove debug and status from devtools script and use echo instead * Update website/content/api-docs/agent/check.mdx Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com> * fix test * replace status_stage with status * replace functions with echo Co-authored-by: Dhia Ayachi <dhia@hashicorp.com> Co-authored-by: trujillo-adam <47586768+trujillo-adam@users.noreply.github.com> Co-authored-by: Jared Kirschner <85913323+jkirschner-hashicorp@users.noreply.github.com>
2022-06-06 19:13:19 +00:00
// CheckUDP is used to periodically send a UDP datagram to determine the health of a given check.
// The check is passing if the connection succeeds, the response is bytes.Equal to the bytes passed
// in or if the error returned is a timeout error
// The check is critical if: the connection succeeds but the response is not equal to the bytes passed in,
// the connection succeeds but the error returned is not a timeout error or the connection fails
type CheckUDP struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
UDP string
Message string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
StatusHandler *StatusHandler
dialer *net.Dialer
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
func (c *CheckUDP) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if c.dialer == nil {
// Create the socket dialer
c.dialer = &net.Dialer{
Timeout: 10 * time.Second,
}
if c.Timeout > 0 {
c.dialer.Timeout = c.Timeout
}
}
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}
func (c *CheckUDP) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
func (c *CheckUDP) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
func (c *CheckUDP) check() {
conn, err := c.dialer.Dial(`udp`, c.UDP)
if err != nil {
if e, ok := err.(net.Error); ok && e.Timeout() {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP))
return
} else {
c.Logger.Warn("Check socket connection failed",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
}
defer conn.Close()
n, err := fmt.Fprintf(conn, c.Message)
if err != nil {
c.Logger.Warn("Check socket write failed",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
if n != len(c.Message) {
c.Logger.Warn("Check socket short write",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
if err != nil {
c.Logger.Warn("Check socket write failed",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
_, err = bufio.NewReader(conn).Read(make([]byte, 1))
if err != nil {
if strings.Contains(err.Error(), "i/o timeout") {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP))
return
} else {
c.Logger.Warn("Check socket read failed",
"check", c.CheckID.String(),
"error", err,
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
return
}
} else if err == nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("UDP connect %s: Success", c.UDP))
}
}
// CheckDocker is used to periodically invoke a script to
// determine the health of an application running inside a
// Docker Container. We assume that the script is compatible
// with nagios plugins and expects the output in the same format.
// Supports failures_before_critical and success_before_passing.
type CheckDocker struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
Script string
ScriptArgs []string
DockerContainerID string
Shell string
Interval time.Duration
Logger hclog.Logger
Client *DockerClient
StatusHandler *StatusHandler
stop chan struct{}
}
func (c *CheckDocker) Start() {
if c.stop != nil {
panic("Docker check already started")
}
if c.Logger == nil {
c.Logger = hclog.New(&hclog.LoggerOptions{Output: io.Discard})
}
if c.Shell == "" {
c.Shell = os.Getenv("SHELL")
if c.Shell == "" {
c.Shell = "/bin/sh"
}
}
c.stop = make(chan struct{})
go c.run()
}
func (c *CheckDocker) Stop() {
if c.stop == nil {
panic("Stop called before start")
}
close(c.stop)
}
func (c *CheckDocker) run() {
2017-10-26 09:57:18 +00:00
defer c.Client.Close()
firstWait := lib.RandomStagger(c.Interval)
next := time.After(firstWait)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stop:
return
}
}
}
func (c *CheckDocker) check() {
var out string
status, b, err := c.doCheck()
if err != nil {
c.Logger.Debug("Check failed",
"check", c.CheckID.String(),
"error", err,
)
out = err.Error()
} else {
// out is already limited to CheckBufSize since we're getting a
// limited buffer. So we don't need to truncate it just report
// that it was truncated.
out = string(b.Bytes())
if int(b.TotalWritten()) > len(out) {
out = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", len(out), b.TotalWritten(), out)
}
c.Logger.Trace("Check output",
"check", c.CheckID.String(),
"output", out,
)
}
c.StatusHandler.updateCheck(c.CheckID, status, out)
}
func (c *CheckDocker) doCheck() (string, *circbuf.Buffer, error) {
var cmd []string
if len(c.ScriptArgs) > 0 {
cmd = c.ScriptArgs
} else {
cmd = []string{c.Shell, "-c", c.Script}
}
execID, err := c.Client.CreateExec(c.DockerContainerID, cmd)
if err != nil {
return api.HealthCritical, nil, err
}
buf, err := c.Client.StartExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}
exitCode, err := c.Client.InspectExec(c.DockerContainerID, execID)
if err != nil {
return api.HealthCritical, nil, err
}
switch exitCode {
case 0:
return api.HealthPassing, buf, nil
case 1:
c.Logger.Debug("Check failed",
"check", c.CheckID.String(),
"exit_code", exitCode,
)
return api.HealthWarning, buf, nil
default:
c.Logger.Debug("Check failed",
"check", c.CheckID.String(),
"exit_code", exitCode,
)
return api.HealthCritical, buf, nil
}
}
2017-12-27 04:35:22 +00:00
// CheckGRPC is used to periodically send request to a gRPC server
// application that implements gRPC health-checking protocol.
// The check is passing if returned status is SERVING.
// The check is critical if connection fails or returned status is
// not SERVING.
// Supports failures_before_critical and success_before_passing.
2017-12-27 04:35:22 +00:00
type CheckGRPC struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
2017-12-27 04:35:22 +00:00
GRPC string
Interval time.Duration
Timeout time.Duration
TLSClientConfig *tls.Config
Logger hclog.Logger
StatusHandler *StatusHandler
2017-12-27 04:35:22 +00:00
probe *GrpcHealthProbe
stop bool
stopCh chan struct{}
stopLock sync.Mutex
// Set if checks are exposed through Connect proxies
// If set, this is the target of check()
ProxyGRPC string
}
func (c *CheckGRPC) CheckType() structs.CheckType {
return structs.CheckType{
CheckID: c.CheckID.ID,
GRPC: c.GRPC,
ProxyGRPC: c.ProxyGRPC,
Interval: c.Interval,
Timeout: c.Timeout,
}
2017-12-27 04:35:22 +00:00
}
func (c *CheckGRPC) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
timeout := 10 * time.Second
if c.Timeout > 0 {
timeout = c.Timeout
}
c.probe = NewGrpcHealthProbe(c.GRPC, timeout, c.TLSClientConfig)
c.stop = false
c.stopCh = make(chan struct{})
go c.run()
}
func (c *CheckGRPC) run() {
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
func (c *CheckGRPC) check() {
target := c.GRPC
if c.ProxyGRPC != "" {
target = c.ProxyGRPC
}
err := c.probe.Check(target)
2017-12-27 04:35:22 +00:00
if err != nil {
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, err.Error())
2017-12-27 04:35:22 +00:00
} else {
c.StatusHandler.updateCheck(c.CheckID, api.HealthPassing, fmt.Sprintf("gRPC check %s: success", target))
2017-12-27 04:35:22 +00:00
}
}
func (c *CheckGRPC) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
}
2022-06-07 17:27:14 +00:00
type CheckOSService struct {
CheckID structs.CheckID
ServiceID structs.ServiceID
OSService string
Interval time.Duration
Timeout time.Duration
Logger hclog.Logger
StatusHandler *StatusHandler
Client *OSServiceClient
stop bool
stopCh chan struct{}
stopLock sync.Mutex
stopWg sync.WaitGroup
}
func (c *CheckOSService) CheckType() structs.CheckType {
return structs.CheckType{
CheckID: c.CheckID.ID,
OSService: c.OSService,
Interval: c.Interval,
Timeout: c.Timeout,
}
}
func (c *CheckOSService) Start() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
c.stop = false
c.stopCh = make(chan struct{})
c.stopWg.Add(1)
go c.run()
}
func (c *CheckOSService) Stop() {
c.stopLock.Lock()
defer c.stopLock.Unlock()
if !c.stop {
c.stop = true
close(c.stopCh)
}
// Wait for the c.run() goroutine to complete before returning.
c.stopWg.Wait()
}
func (c *CheckOSService) run() {
defer c.stopWg.Done()
// Get the randomized initial pause time
initialPauseTime := lib.RandomStagger(c.Interval)
next := time.After(initialPauseTime)
for {
select {
case <-next:
c.check()
next = time.After(c.Interval)
case <-c.stopCh:
return
}
}
}
func (c *CheckOSService) doCheck() (string, error) {
err := c.Client.Check(c.OSService)
2022-06-07 17:27:14 +00:00
if err == nil {
return api.HealthPassing, nil
}
if errors.Is(err, ErrOSServiceStatusCritical) {
return api.HealthCritical, err
}
return api.HealthWarning, err
}
func (c *CheckOSService) check() {
var out string
var status string
var err error
waitCh := make(chan error, 1)
go func() {
status, err = c.doCheck()
waitCh <- err
}()
timeout := 30 * time.Second
if c.Timeout > 0 {
timeout = c.Timeout
}
select {
case <-time.After(timeout):
msg := fmt.Sprintf("Timed out (%s) running check", timeout.String())
c.Logger.Warn("Timed out running check",
"check", c.CheckID.String(),
"timeout", timeout.String(),
)
c.StatusHandler.updateCheck(c.CheckID, api.HealthCritical, msg)
// Now wait for the process to exit so we never start another
// instance concurrently.
<-waitCh
return
case err = <-waitCh:
// The process returned before the timeout, proceed normally
}
out = fmt.Sprintf("Service \"%s\" is healthy", c.OSService)
2022-06-07 17:27:14 +00:00
if err != nil {
c.Logger.Debug("Check failed",
"check", c.CheckID.String(),
"error", err,
)
out = err.Error()
}
c.StatusHandler.updateCheck(c.CheckID, status, out)
}
// StatusHandler keep tracks of successive error/success counts and ensures
// that status can be set to critical/passing only once the successive number of event
// reaches the given threshold.
type StatusHandler struct {
inner CheckNotifier
logger hclog.Logger
successBeforePassing int
successCounter int
failuresBeforeWarning int
failuresBeforeCritical int
failuresCounter int
}
// NewStatusHandler set counters values to threshold in order to immediatly update status after first check.
func NewStatusHandler(inner CheckNotifier, logger hclog.Logger, successBeforePassing, failuresBeforeWarning, failuresBeforeCritical int) *StatusHandler {
return &StatusHandler{
logger: logger,
inner: inner,
successBeforePassing: successBeforePassing,
successCounter: successBeforePassing,
failuresBeforeWarning: failuresBeforeWarning,
failuresBeforeCritical: failuresBeforeCritical,
failuresCounter: failuresBeforeCritical,
}
}
func (s *StatusHandler) updateCheck(checkID structs.CheckID, status, output string) {
if status == api.HealthPassing || status == api.HealthWarning {
s.successCounter++
s.failuresCounter = 0
if s.successCounter >= s.successBeforePassing {
s.logger.Debug("Check status updated",
"check", checkID.String(),
"status", status,
)
s.inner.UpdateCheck(checkID, status, output)
return
}
s.logger.Warn("Check passed but has not reached success threshold",
"check", checkID.String(),
"status", status,
"success_count", s.successCounter,
"success_threshold", s.successBeforePassing,
)
} else {
s.failuresCounter++
s.successCounter = 0
if s.failuresCounter >= s.failuresBeforeCritical {
s.logger.Warn("Check is now critical", "check", checkID.String())
s.inner.UpdateCheck(checkID, status, output)
return
}
// Defaults to same value as failuresBeforeCritical if not set.
if s.failuresCounter >= s.failuresBeforeWarning {
s.logger.Warn("Check is now warning", "check", checkID.String())
s.inner.UpdateCheck(checkID, api.HealthWarning, output)
return
}
s.logger.Warn("Check failed but has not reached warning/failure threshold",
"check", checkID.String(),
"status", status,
"failure_count", s.failuresCounter,
"warning_threshold", s.failuresBeforeWarning,
"failure_threshold", s.failuresBeforeCritical,
)
}
}