mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
239 lines
6.1 KiB
239 lines
6.1 KiB
package watch |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
|
|
consulapi "github.com/hashicorp/consul/api" |
|
) |
|
|
|
// watchFactory is a function that can create a new WatchFunc |
|
// from a parameter configuration |
|
type watchFactory func(params map[string]interface{}) (WatcherFunc, error) |
|
|
|
// watchFuncFactory maps each type to a factory function |
|
var watchFuncFactory map[string]watchFactory |
|
|
|
func init() { |
|
watchFuncFactory = map[string]watchFactory{ |
|
"key": keyWatch, |
|
"keyprefix": keyPrefixWatch, |
|
"services": servicesWatch, |
|
"nodes": nodesWatch, |
|
"service": serviceWatch, |
|
"checks": checksWatch, |
|
"event": eventWatch, |
|
} |
|
} |
|
|
|
// keyWatch is used to return a key watching function |
|
func keyWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
var key string |
|
if err := assignValue(params, "key", &key); err != nil { |
|
return nil, err |
|
} |
|
if key == "" { |
|
return nil, fmt.Errorf("Must specify a single key to watch") |
|
} |
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
kv := p.client.KV() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
pair, meta, err := kv.Get(key, &opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
if pair == nil { |
|
return meta.LastIndex, nil, err |
|
} |
|
return meta.LastIndex, pair, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// keyPrefixWatch is used to return a key prefix watching function |
|
func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
var prefix string |
|
if err := assignValue(params, "prefix", &prefix); err != nil { |
|
return nil, err |
|
} |
|
if prefix == "" { |
|
return nil, fmt.Errorf("Must specify a single prefix to watch") |
|
} |
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
kv := p.client.KV() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
pairs, meta, err := kv.List(prefix, &opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
return meta.LastIndex, pairs, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// servicesWatch is used to watch the list of available services |
|
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
catalog := p.client.Catalog() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
services, meta, err := catalog.Services(&opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
return meta.LastIndex, services, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// nodesWatch is used to watch the list of available nodes |
|
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
catalog := p.client.Catalog() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
nodes, meta, err := catalog.Nodes(&opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
return meta.LastIndex, nodes, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// serviceWatch is used to watch a specific service for changes |
|
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
var service, tag string |
|
if err := assignValue(params, "service", &service); err != nil { |
|
return nil, err |
|
} |
|
if service == "" { |
|
return nil, fmt.Errorf("Must specify a single service to watch") |
|
} |
|
|
|
if err := assignValue(params, "tag", &tag); err != nil { |
|
return nil, err |
|
} |
|
|
|
passingOnly := false |
|
if err := assignValueBool(params, "passingonly", &passingOnly); err != nil { |
|
return nil, err |
|
} |
|
|
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
health := p.client.Health() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
nodes, meta, err := health.Service(service, tag, passingOnly, &opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
return meta.LastIndex, nodes, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// checksWatch is used to watch a specific checks in a given state |
|
func checksWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
stale := false |
|
if err := assignValueBool(params, "stale", &stale); err != nil { |
|
return nil, err |
|
} |
|
|
|
var service, state string |
|
if err := assignValue(params, "service", &service); err != nil { |
|
return nil, err |
|
} |
|
if err := assignValue(params, "state", &state); err != nil { |
|
return nil, err |
|
} |
|
if service != "" && state != "" { |
|
return nil, fmt.Errorf("Cannot specify service and state") |
|
} |
|
if service == "" && state == "" { |
|
state = "any" |
|
} |
|
|
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
health := p.client.Health() |
|
opts := makeQueryOptionsWithContext(p, stale) |
|
defer p.cancelFunc() |
|
var checks []*consulapi.HealthCheck |
|
var meta *consulapi.QueryMeta |
|
var err error |
|
if state != "" { |
|
checks, meta, err = health.State(state, &opts) |
|
} else { |
|
checks, meta, err = health.Checks(service, &opts) |
|
} |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
return meta.LastIndex, checks, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
// eventWatch is used to watch for events, optionally filtering on name |
|
func eventWatch(params map[string]interface{}) (WatcherFunc, error) { |
|
// The stale setting doesn't apply to events. |
|
|
|
var name string |
|
if err := assignValue(params, "name", &name); err != nil { |
|
return nil, err |
|
} |
|
|
|
fn := func(p *Plan) (uint64, interface{}, error) { |
|
event := p.client.Event() |
|
opts := makeQueryOptionsWithContext(p, false) |
|
defer p.cancelFunc() |
|
events, meta, err := event.List(name, &opts) |
|
if err != nil { |
|
return 0, nil, err |
|
} |
|
|
|
// Prune to only the new events |
|
for i := 0; i < len(events); i++ { |
|
if event.IDToIndex(events[i].ID) == p.lastIndex { |
|
events = events[i+1:] |
|
break |
|
} |
|
} |
|
return meta.LastIndex, events, err |
|
} |
|
return fn, nil |
|
} |
|
|
|
func makeQueryOptionsWithContext(p *Plan, stale bool) consulapi.QueryOptions { |
|
ctx, cancel := context.WithCancel(context.Background()) |
|
p.cancelFunc = cancel |
|
opts := consulapi.QueryOptions{AllowStale: stale, WaitIndex: p.lastIndex} |
|
return *opts.WithContext(ctx) |
|
}
|
|
|