// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package watch
import (
"context"
"fmt"
"io"
"sync"
"time"
consulapi "github.com/hashicorp/consul/api"
"github.com/hashicorp/go-hclog"
"github.com/mitchellh/mapstructure"
)
const DefaultTimeout = 10 * time . Second
// Plan is the parsed version of a watch specification. A watch provides
// the details of a query, which generates a view into the Consul data store.
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type Plan struct {
Datacenter string
Token string
Type string
HandlerType string
Exempt map [ string ] interface { }
Watcher WatcherFunc
// Handler is kept for backward compatibility but only supports watches based
// on index param. To support hash based watches, set HybridHandler instead.
Handler HandlerFunc
HybridHandler HybridHandlerFunc
Logger hclog . Logger
// Deprecated: use Logger
LogOutput io . Writer
address string
client * consulapi . Client
lastParamVal BlockingParamVal
lastResult interface { }
stop bool
stopCh chan struct { }
stopLock sync . Mutex
cancelFunc context . CancelFunc
}
type HttpHandlerConfig struct {
Path string ` mapstructure:"path" `
Method string ` mapstructure:"method" `
Timeout time . Duration ` mapstructure:"-" `
TimeoutRaw string ` mapstructure:"timeout" `
Header map [ string ] [ ] string ` mapstructure:"header" `
TLSSkipVerify bool ` mapstructure:"tls_skip_verify" `
}
// BlockingParamVal is an interface representing the common operations needed for
// different styles of blocking. It's used to abstract the core watch plan from
// whether we are performing index-based or hash-based blocking.
type BlockingParamVal interface {
// Equal returns whether the other param value should be considered equal
// (i.e. representing no change in the watched resource). Equal must not panic
// if other is nil.
Equal ( other BlockingParamVal ) bool
// Next is called when deciding which value to use on the next blocking call.
// It assumes the BlockingParamVal value it is called on is the most recent one
// returned and passes the previous one which may be nil as context. This
// allows types to customize logic around ordering without assuming there is
// an order. For example WaitIndexVal can check that the index didn't go
// backwards and if it did then reset to 0. Most other cases should just
// return themselves (the most recent value) to be used in the next request.
Next ( previous BlockingParamVal ) BlockingParamVal
}
// WaitIndexVal is a type representing a Consul index that implements
// BlockingParamVal.
type WaitIndexVal uint64
// Equal implements BlockingParamVal
func ( idx WaitIndexVal ) Equal ( other BlockingParamVal ) bool {
if otherIdx , ok := other . ( WaitIndexVal ) ; ok {
return idx == otherIdx
}
return false
}
// Next implements BlockingParamVal
func ( idx WaitIndexVal ) Next ( previous BlockingParamVal ) BlockingParamVal {
if previous == nil {
return idx
}
prevIdx , ok := previous . ( WaitIndexVal )
if ok && prevIdx == idx {
// This value is the same as the previous index, reset
return WaitIndexVal ( 0 )
}
return idx
}
// WaitHashVal is a type representing a Consul content hash that implements
// BlockingParamVal.
type WaitHashVal string
// Equal implements BlockingParamVal
func ( h WaitHashVal ) Equal ( other BlockingParamVal ) bool {
if otherHash , ok := other . ( WaitHashVal ) ; ok {
return h == otherHash
}
return false
}
// Next implements BlockingParamVal
func ( h WaitHashVal ) Next ( previous BlockingParamVal ) BlockingParamVal {
return h
}
// WatcherFunc is used to watch for a diff.
type WatcherFunc func ( * Plan ) ( BlockingParamVal , interface { } , error )
// HandlerFunc is used to handle new data. It only works for index-based watches
// (which is almost all end points currently) and is kept for backwards
// compatibility until more places can make use of hash-based watches too.
type HandlerFunc func ( uint64 , interface { } )
// HybridHandlerFunc is used to handle new data. It can support either
// index-based or hash-based watches via the BlockingParamVal.
type HybridHandlerFunc func ( BlockingParamVal , interface { } )
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse ( params map [ string ] interface { } ) ( * Plan , error ) {
return ParseExempt ( params , nil )
}
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error
// Any exempt parameters are stored in the Exempt map
func ParseExempt ( params map [ string ] interface { } , exempt [ ] string ) ( * Plan , error ) {
plan := & Plan {
stopCh : make ( chan struct { } ) ,
Exempt : make ( map [ string ] interface { } ) ,
}
// Parse the generic parameters
if err := assignValue ( params , "datacenter" , & plan . Datacenter ) ; err != nil {
return nil , err
}
if err := assignValue ( params , "token" , & plan . Token ) ; err != nil {
return nil , err
}
if err := assignValue ( params , "type" , & plan . Type ) ; err != nil {
return nil , err
}
// Ensure there is a watch type
if plan . Type == "" {
return nil , fmt . Errorf ( "Watch type must be specified" )
}
// Get the specific handler
if err := assignValue ( params , "handler_type" , & plan . HandlerType ) ; err != nil {
return nil , err
}
switch plan . HandlerType {
case "http" :
if _ , ok := params [ "http_handler_config" ] ; ! ok {
return nil , fmt . Errorf ( "Handler type 'http' requires 'http_handler_config' to be set" )
}
config , err := parseHttpHandlerConfig ( params [ "http_handler_config" ] )
if err != nil {
return nil , fmt . Errorf ( fmt . Sprintf ( "Failed to parse 'http_handler_config': %v" , err ) )
}
plan . Exempt [ "http_handler_config" ] = config
delete ( params , "http_handler_config" )
case "script" :
// Let the caller check for configuration in exempt parameters
}
// Look for a factory function
factory := watchFuncFactory [ plan . Type ]
if factory == nil {
return nil , fmt . Errorf ( "Unsupported watch type: %s" , plan . Type )
}
// Get the watch func
fn , err := factory ( params )
if err != nil {
return nil , err
}
plan . Watcher = fn
// Remove the exempt parameters
if len ( exempt ) > 0 {
for _ , ex := range exempt {
val , ok := params [ ex ]
if ok {
plan . Exempt [ ex ] = val
delete ( params , ex )
}
}
}
// Ensure all parameters are consumed
if len ( params ) != 0 {
var bad [ ] string
for key := range params {
bad = append ( bad , key )
}
return nil , fmt . Errorf ( "Invalid parameters: %v" , bad )
}
return plan , nil
}
// assignValue is used to extract a value ensuring it is a string
func assignValue ( params map [ string ] interface { } , name string , out * string ) error {
if raw , ok := params [ name ] ; ok {
val , ok := raw . ( string )
if ! ok {
return fmt . Errorf ( "Expecting %s to be a string" , name )
}
* out = val
delete ( params , name )
}
return nil
}
// assignValueBool is used to extract a value ensuring it is a bool
func assignValueBool ( params map [ string ] interface { } , name string , out * bool ) error {
if raw , ok := params [ name ] ; ok {
val , ok := raw . ( bool )
if ! ok {
return fmt . Errorf ( "Expecting %s to be a boolean" , name )
}
* out = val
delete ( params , name )
}
return nil
}
// assignValueStringSlice is used to extract a value ensuring it is either a string or a slice of strings
func assignValueStringSlice ( params map [ string ] interface { } , name string , out * [ ] string ) error {
if raw , ok := params [ name ] ; ok {
var tmp [ ] string
switch raw . ( type ) {
case string :
tmp = make ( [ ] string , 1 , 1 )
tmp [ 0 ] = raw . ( string )
case [ ] string :
l := len ( raw . ( [ ] string ) )
tmp = make ( [ ] string , l , l )
copy ( tmp , raw . ( [ ] string ) )
case [ ] interface { } :
l := len ( raw . ( [ ] interface { } ) )
tmp = make ( [ ] string , l , l )
for i , v := range raw . ( [ ] interface { } ) {
if s , ok := v . ( string ) ; ok {
tmp [ i ] = s
} else {
return fmt . Errorf ( "Index %d of %s expected to be string" , i , name )
}
}
default :
return fmt . Errorf ( "Expecting %s to be a string or []string" , name )
}
* out = tmp
delete ( params , name )
}
return nil
}
// Parse the 'http_handler_config' parameters
func parseHttpHandlerConfig ( configParams interface { } ) ( * HttpHandlerConfig , error ) {
var config HttpHandlerConfig
if err := mapstructure . Decode ( configParams , & config ) ; err != nil {
return nil , err
}
if config . Path == "" {
return nil , fmt . Errorf ( "Requires 'path' to be set" )
}
if config . Method == "" {
config . Method = "POST"
}
if config . TimeoutRaw == "" {
config . Timeout = DefaultTimeout
} else if timeout , err := time . ParseDuration ( config . TimeoutRaw ) ; err != nil {
return nil , fmt . Errorf ( fmt . Sprintf ( "Failed to parse timeout: %v" , err ) )
} else {
config . Timeout = timeout
}
return & config , nil
}