mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
293 lines
8.5 KiB
293 lines
8.5 KiB
package watch |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"io" |
|
"sync" |
|
"time" |
|
|
|
consulapi "github.com/hashicorp/consul/api" |
|
"github.com/hashicorp/go-hclog" |
|
"github.com/mitchellh/mapstructure" |
|
) |
|
|
|
const DefaultTimeout = 10 * time.Second |
|
|
|
// Plan is the parsed version of a watch specification. A watch provides |
|
// the details of a query, which generates a view into the Consul data store. |
|
// This view is watched for changes and a handler is invoked to take any |
|
// appropriate actions. |
|
type Plan struct { |
|
Datacenter string |
|
Token string |
|
Type string |
|
HandlerType string |
|
Exempt map[string]interface{} |
|
|
|
Watcher WatcherFunc |
|
// Handler is kept for backward compatibility but only supports watches based |
|
// on index param. To support hash based watches, set HybridHandler instead. |
|
Handler HandlerFunc |
|
HybridHandler HybridHandlerFunc |
|
|
|
Logger hclog.Logger |
|
// Deprecated: use Logger |
|
LogOutput io.Writer |
|
|
|
address string |
|
client *consulapi.Client |
|
lastParamVal BlockingParamVal |
|
lastResult interface{} |
|
|
|
stop bool |
|
stopCh chan struct{} |
|
stopLock sync.Mutex |
|
cancelFunc context.CancelFunc |
|
} |
|
|
|
type HttpHandlerConfig struct { |
|
Path string `mapstructure:"path"` |
|
Method string `mapstructure:"method"` |
|
Timeout time.Duration `mapstructure:"-"` |
|
TimeoutRaw string `mapstructure:"timeout"` |
|
Header map[string][]string `mapstructure:"header"` |
|
TLSSkipVerify bool `mapstructure:"tls_skip_verify"` |
|
} |
|
|
|
// BlockingParamVal is an interface representing the common operations needed for |
|
// different styles of blocking. It's used to abstract the core watch plan from |
|
// whether we are performing index-based or hash-based blocking. |
|
type BlockingParamVal interface { |
|
// Equal returns whether the other param value should be considered equal |
|
// (i.e. representing no change in the watched resource). Equal must not panic |
|
// if other is nil. |
|
Equal(other BlockingParamVal) bool |
|
|
|
// Next is called when deciding which value to use on the next blocking call. |
|
// It assumes the BlockingParamVal value it is called on is the most recent one |
|
// returned and passes the previous one which may be nil as context. This |
|
// allows types to customize logic around ordering without assuming there is |
|
// an order. For example WaitIndexVal can check that the index didn't go |
|
// backwards and if it did then reset to 0. Most other cases should just |
|
// return themselves (the most recent value) to be used in the next request. |
|
Next(previous BlockingParamVal) BlockingParamVal |
|
} |
|
|
|
// WaitIndexVal is a type representing a Consul index that implements |
|
// BlockingParamVal. |
|
type WaitIndexVal uint64 |
|
|
|
// Equal implements BlockingParamVal |
|
func (idx WaitIndexVal) Equal(other BlockingParamVal) bool { |
|
if otherIdx, ok := other.(WaitIndexVal); ok { |
|
return idx == otherIdx |
|
} |
|
return false |
|
} |
|
|
|
// Next implements BlockingParamVal |
|
func (idx WaitIndexVal) Next(previous BlockingParamVal) BlockingParamVal { |
|
if previous == nil { |
|
return idx |
|
} |
|
prevIdx, ok := previous.(WaitIndexVal) |
|
if ok && prevIdx == idx { |
|
// This value is the same as the previous index, reset |
|
return WaitIndexVal(0) |
|
} |
|
return idx |
|
} |
|
|
|
// WaitHashVal is a type representing a Consul content hash that implements |
|
// BlockingParamVal. |
|
type WaitHashVal string |
|
|
|
// Equal implements BlockingParamVal |
|
func (h WaitHashVal) Equal(other BlockingParamVal) bool { |
|
if otherHash, ok := other.(WaitHashVal); ok { |
|
return h == otherHash |
|
} |
|
return false |
|
} |
|
|
|
// Next implements BlockingParamVal |
|
func (h WaitHashVal) Next(previous BlockingParamVal) BlockingParamVal { |
|
return h |
|
} |
|
|
|
// WatcherFunc is used to watch for a diff. |
|
type WatcherFunc func(*Plan) (BlockingParamVal, interface{}, error) |
|
|
|
// HandlerFunc is used to handle new data. It only works for index-based watches |
|
// (which is almost all end points currently) and is kept for backwards |
|
// compatibility until more places can make use of hash-based watches too. |
|
type HandlerFunc func(uint64, interface{}) |
|
|
|
// HybridHandlerFunc is used to handle new data. It can support either |
|
// index-based or hash-based watches via the BlockingParamVal. |
|
type HybridHandlerFunc func(BlockingParamVal, interface{}) |
|
|
|
// Parse takes a watch query and compiles it into a WatchPlan or an error |
|
func Parse(params map[string]interface{}) (*Plan, error) { |
|
return ParseExempt(params, nil) |
|
} |
|
|
|
// ParseExempt takes a watch query and compiles it into a WatchPlan or an error |
|
// Any exempt parameters are stored in the Exempt map |
|
func ParseExempt(params map[string]interface{}, exempt []string) (*Plan, error) { |
|
plan := &Plan{ |
|
stopCh: make(chan struct{}), |
|
Exempt: make(map[string]interface{}), |
|
} |
|
|
|
// Parse the generic parameters |
|
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { |
|
return nil, err |
|
} |
|
if err := assignValue(params, "token", &plan.Token); err != nil { |
|
return nil, err |
|
} |
|
if err := assignValue(params, "type", &plan.Type); err != nil { |
|
return nil, err |
|
} |
|
// Ensure there is a watch type |
|
if plan.Type == "" { |
|
return nil, fmt.Errorf("Watch type must be specified") |
|
} |
|
|
|
// Get the specific handler |
|
if err := assignValue(params, "handler_type", &plan.HandlerType); err != nil { |
|
return nil, err |
|
} |
|
switch plan.HandlerType { |
|
case "http": |
|
if _, ok := params["http_handler_config"]; !ok { |
|
return nil, fmt.Errorf("Handler type 'http' requires 'http_handler_config' to be set") |
|
} |
|
config, err := parseHttpHandlerConfig(params["http_handler_config"]) |
|
if err != nil { |
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse 'http_handler_config': %v", err)) |
|
} |
|
plan.Exempt["http_handler_config"] = config |
|
delete(params, "http_handler_config") |
|
|
|
case "script": |
|
// Let the caller check for configuration in exempt parameters |
|
} |
|
|
|
// Look for a factory function |
|
factory := watchFuncFactory[plan.Type] |
|
if factory == nil { |
|
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) |
|
} |
|
|
|
// Get the watch func |
|
fn, err := factory(params) |
|
if err != nil { |
|
return nil, err |
|
} |
|
plan.Watcher = fn |
|
|
|
// Remove the exempt parameters |
|
if len(exempt) > 0 { |
|
for _, ex := range exempt { |
|
val, ok := params[ex] |
|
if ok { |
|
plan.Exempt[ex] = val |
|
delete(params, ex) |
|
} |
|
} |
|
} |
|
|
|
// Ensure all parameters are consumed |
|
if len(params) != 0 { |
|
var bad []string |
|
for key := range params { |
|
bad = append(bad, key) |
|
} |
|
return nil, fmt.Errorf("Invalid parameters: %v", bad) |
|
} |
|
return plan, nil |
|
} |
|
|
|
// assignValue is used to extract a value ensuring it is a string |
|
func assignValue(params map[string]interface{}, name string, out *string) error { |
|
if raw, ok := params[name]; ok { |
|
val, ok := raw.(string) |
|
if !ok { |
|
return fmt.Errorf("Expecting %s to be a string", name) |
|
} |
|
*out = val |
|
delete(params, name) |
|
} |
|
return nil |
|
} |
|
|
|
// assignValueBool is used to extract a value ensuring it is a bool |
|
func assignValueBool(params map[string]interface{}, name string, out *bool) error { |
|
if raw, ok := params[name]; ok { |
|
val, ok := raw.(bool) |
|
if !ok { |
|
return fmt.Errorf("Expecting %s to be a boolean", name) |
|
} |
|
*out = val |
|
delete(params, name) |
|
} |
|
return nil |
|
} |
|
|
|
// assignValueStringSlice is used to extract a value ensuring it is either a string or a slice of strings |
|
func assignValueStringSlice(params map[string]interface{}, name string, out *[]string) error { |
|
if raw, ok := params[name]; ok { |
|
var tmp []string |
|
switch raw.(type) { |
|
case string: |
|
tmp = make([]string, 1, 1) |
|
tmp[0] = raw.(string) |
|
case []string: |
|
l := len(raw.([]string)) |
|
tmp = make([]string, l, l) |
|
copy(tmp, raw.([]string)) |
|
case []interface{}: |
|
l := len(raw.([]interface{})) |
|
tmp = make([]string, l, l) |
|
for i, v := range raw.([]interface{}) { |
|
if s, ok := v.(string); ok { |
|
tmp[i] = s |
|
} else { |
|
return fmt.Errorf("Index %d of %s expected to be string", i, name) |
|
} |
|
} |
|
default: |
|
return fmt.Errorf("Expecting %s to be a string or []string", name) |
|
} |
|
*out = tmp |
|
delete(params, name) |
|
} |
|
return nil |
|
} |
|
|
|
// Parse the 'http_handler_config' parameters |
|
func parseHttpHandlerConfig(configParams interface{}) (*HttpHandlerConfig, error) { |
|
var config HttpHandlerConfig |
|
if err := mapstructure.Decode(configParams, &config); err != nil { |
|
return nil, err |
|
} |
|
|
|
if config.Path == "" { |
|
return nil, fmt.Errorf("Requires 'path' to be set") |
|
} |
|
if config.Method == "" { |
|
config.Method = "POST" |
|
} |
|
if config.TimeoutRaw == "" { |
|
config.Timeout = DefaultTimeout |
|
} else if timeout, err := time.ParseDuration(config.TimeoutRaw); err != nil { |
|
return nil, fmt.Errorf(fmt.Sprintf("Failed to parse timeout: %v", err)) |
|
} else { |
|
config.Timeout = timeout |
|
} |
|
|
|
return &config, nil |
|
}
|
|
|