From 5ee737b8d48aa8bd59e4284f1dc69195f21d5cc7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Tue, 19 Aug 2014 14:19:31 -0700 Subject: [PATCH 01/20] agent: Adding watches config --- command/agent/config.go | 8 ++++++++ command/agent/config_test.go | 15 +++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/command/agent/config.go b/command/agent/config.go index 87584e1a2b..e40f4b568b 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -229,6 +229,11 @@ type Config struct { // this acts like deny. ACLDownPolicy string `mapstructure:"acl_down_policy"` + // Watches are used to monitor various endpoints and to invoke a + // handler to act appropriately. These are managed entirely in the + // agent layer using the standard APIs. + Watches []string `mapstructure:"watches"` + // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' // representation of our state. Defaults to every 60s. @@ -648,6 +653,9 @@ func MergeConfig(a, b *Config) *Config { if b.ACLDefaultPolicy != "" { result.ACLDefaultPolicy = b.ACLDefaultPolicy } + if len(b.Watches) != 0 { + result.Watches = append(result.Watches, b.Watches...) + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/config_test.go b/command/agent/config_test.go index 9bc67c69c0..b57037a51e 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -384,6 +384,20 @@ func TestDecodeConfig(t *testing.T) { if config.ACLDefaultPolicy != "deny" { t.Fatalf("bad: %#v", config) } + + // Watches + input = `{"watches": ["type:keyprefix prefix:foo/ handler:foobar"]}` + config, err = DecodeConfig(bytes.NewReader([]byte(input))) + if err != nil { + t.Fatalf("err: %s", err) + } + + if len(config.Watches) != 1 { + t.Fatalf("bad: %#v", config) + } + if config.Watches[0] != "type:keyprefix prefix:foo/ handler:foobar" { + t.Fatalf("bad: %#v", config) + } } func TestDecodeConfig_Service(t *testing.T) { @@ -538,6 +552,7 @@ func TestMergeConfig(t *testing.T) { ACLTTLRaw: "15s", ACLDownPolicy: "deny", ACLDefaultPolicy: "deny", + Watches: []string{"type:keyprefix prefix:foobar/ handler:foo"}, } c := MergeConfig(a, b) From 2b07355b946769c1204fbb9e78ef923f46923b92 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 11:19:43 -0700 Subject: [PATCH 02/20] watch: First pass at query parsing --- watch/watch.go | 160 ++++++++++++++++++++++++++++++++++++++++++++ watch/watch_test.go | 106 +++++++++++++++++++++++++++++ 2 files changed, 266 insertions(+) create mode 100644 watch/watch.go create mode 100644 watch/watch_test.go diff --git a/watch/watch.go b/watch/watch.go new file mode 100644 index 0000000000..c2785bfe1f --- /dev/null +++ b/watch/watch.go @@ -0,0 +1,160 @@ +package watch + +import ( + "fmt" + "strings" +) + +// WatchPlan is the parsed version of a watch specification. A watch provides +// the details of a query, which generates a view into the Consul data store. +// This view is watched for changes and a handler is invoked to take any +// appropriate actions. +type WatchPlan struct { + Datacenter string + Token string + Type string +} + +// Parse takes a watch query and compiles it into a WatchPlan or an error +func Parse(query string) (*WatchPlan, error) { + tokens, err := tokenize(query) + if err != nil { + return nil, fmt.Errorf("Failed to parse: %v", err) + } + params := collapse(tokens) + plan := &WatchPlan{} + + if err := assignValue(params, "type", &plan.Type); err != nil { + return nil, err + } + if plan.Type == "" { + return nil, fmt.Errorf("Watch type must be specified") + } + if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { + return nil, err + } + if err := assignValue(params, "token", &plan.Token); err != nil { + return nil, err + } + + return plan, nil +} + +// assignValue is used to extract a value ensuring it is only +// defined once +func assignValue(params map[string][]string, name string, out *string) error { + if vals, ok := params[name]; ok { + if len(vals) != 1 { + return fmt.Errorf("Multiple definitions of %s", name) + } + *out = vals[0] + delete(params, name) + } + return nil +} + +// token is used to represent a "datacenter:foobar" pair, where +// datacenter is the param and foobar is the value +type token struct { + param string + val string +} + +func (t *token) GoString() string { + return fmt.Sprintf("%#v", *t) +} + +// tokenize splits a query string into a slice of tokens +func tokenize(query string) ([]*token, error) { + var tokens []*token + for i := 0; i < len(query); i++ { + char := query[i] + + // Ignore whitespace + if char == ' ' || char == '\t' || char == '\n' { + continue + } + + // Read the next token + next, offset, err := readToken(query[i:]) + if err != nil { + return nil, err + } + + // Store the token + tokens = append(tokens, next) + + // Increment the offset + i += offset + } + return tokens, nil +} + +// readToken is used to read a single token +func readToken(query string) (*token, int, error) { + // Get the token + param, offset, err := readParameter(query) + if err != nil { + return nil, 0, err + } + + // Get the value + query = query[offset:] + val, offset2, err := readValue(query) + if err != nil { + return nil, 0, err + } + + // Return the new token + token := &token{ + param: param, + val: val, + } + return token, offset + offset2, nil +} + +// readParameter scans for the next parameter +func readParameter(query string) (string, int, error) { + for i := 0; i < len(query); i++ { + char := query[i] + if char == ':' { + if i == 0 { + return "", 0, fmt.Errorf("Missing parameter name") + } else { + return query[:i], i + 1, nil + } + } + } + return "", 0, fmt.Errorf("Parameter delimiter not found") +} + +// readValue is used to scan for the next value +func readValue(query string) (string, int, error) { + // Handle quoted values + if query[0] == '\'' || query[0] == '"' { + quoteChar := query[0:1] + endChar := strings.Index(query[1:], quoteChar) + if endChar == -1 { + return "", 0, fmt.Errorf("Missing end of quotation") + } + return query[1 : endChar+1], endChar + 2, nil + } + + // Look for white space + endChar := strings.IndexAny(query, " \t\n") + if endChar == -1 { + return query, len(query), nil + } + return query[:endChar], endChar, nil +} + +// collapse is used to collapse a token stream into a map +// of parameter name to list of values. +func collapse(tokens []*token) map[string][]string { + out := make(map[string][]string) + for _, t := range tokens { + existing := out[t.param] + out[t.param] = append(existing, t.val) + } + return out +} diff --git a/watch/watch_test.go b/watch/watch_test.go new file mode 100644 index 0000000000..35dc6151eb --- /dev/null +++ b/watch/watch_test.go @@ -0,0 +1,106 @@ +package watch + +import ( + "fmt" + "reflect" + "testing" +) + +func TestTokenize(t *testing.T) { + type tcase struct { + in string + out []*token + err error + } + cases := []tcase{ + tcase{ + "", + nil, + nil, + }, + tcase{ + "foo:bar bar:baz zip:zap", + []*token{ + &token{"foo", "bar"}, + &token{"bar", "baz"}, + &token{"zip", "zap"}, + }, + nil, + }, + tcase{ + "foo:\"long input here\" after:this", + []*token{ + &token{"foo", "long input here"}, + &token{"after", "this"}, + }, + nil, + }, + tcase{ + "foo:'long input here' after:this", + []*token{ + &token{"foo", "long input here"}, + &token{"after", "this"}, + }, + nil, + }, + tcase{ + "foo:'long input here after:this", + nil, + fmt.Errorf("Missing end of quotation"), + }, + tcase{ + "foo", + nil, + fmt.Errorf("Parameter delimiter not found"), + }, + tcase{ + ":val", + nil, + fmt.Errorf("Missing parameter name"), + }, + } + + for _, tc := range cases { + tokens, err := tokenize(tc.in) + if err != nil && tc.err == nil { + t.Fatalf("%s: err: %v", tc.in, err) + } else if tc.err != nil && (err == nil || err.Error() != tc.err.Error()) { + t.Fatalf("%s: bad err: %v", tc.in, err) + } + if !reflect.DeepEqual(tokens, tc.out) { + t.Fatalf("%s: bad: %#v %#v", tc.in, tokens, tc.out) + } + } +} + +func TestCollapse(t *testing.T) { + inp := "type:key key:foo key:bar" + tokens, err := tokenize(inp) + if err != nil { + t.Fatalf("err: %v", err) + } + out := collapse(tokens) + expect := map[string][]string{ + "type": []string{"key"}, + "key": []string{"foo", "bar"}, + } + if !reflect.DeepEqual(out, expect) { + t.Fatalf("bad: %#v", out) + } +} + +func TestParseBasic(t *testing.T) { + p, err := Parse("type:key datacenter:dc2 token:12345") + if err != nil { + t.Fatalf("err: %v", err) + } + if p.Datacenter != "dc2" { + t.Fatalf("Bad: %#v", p) + } + if p.Token != "12345" { + t.Fatalf("Bad: %#v", p) + } + if p.Type != "key" { + t.Fatalf("Bad: %#v", p) + } +} From 66edf0075af575fb143dec8ac5313835b712f513 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 13:45:34 -0700 Subject: [PATCH 03/20] watch: Testing plan execution --- watch/funcs.go | 41 +++++++++++++++++ watch/plan.go | 104 ++++++++++++++++++++++++++++++++++++++++++++ watch/plan_test.go | 48 ++++++++++++++++++++ watch/watch.go | 62 +++++++++++++++++++++++--- watch/watch_test.go | 2 +- 5 files changed, 249 insertions(+), 8 deletions(-) create mode 100644 watch/funcs.go create mode 100644 watch/plan.go create mode 100644 watch/plan_test.go diff --git a/watch/funcs.go b/watch/funcs.go new file mode 100644 index 0000000000..7593a468fb --- /dev/null +++ b/watch/funcs.go @@ -0,0 +1,41 @@ +package watch + +import ( + "fmt" + + "github.com/armon/consul-api" +) + +// watchFactory is a function that can create a new WatchFunc +// from a parameter configuration +type watchFactory func(params map[string][]string) (WatchFunc, error) + +// watchFuncFactory maps each type to a factory function +var watchFuncFactory map[string]watchFactory + +func init() { + watchFuncFactory = map[string]watchFactory{ + "key": keyWatch, + } +} + +// keyWatch is used to return a key watching function +func keyWatch(params map[string][]string) (WatchFunc, error) { + keys := params["key"] + delete(params, "key") + if len(keys) != 1 { + return nil, fmt.Errorf("Must specify a single key to watch") + } + key := keys[0] + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pair, meta, err := kv.Get(key, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, pair, err + } + return fn, nil +} diff --git a/watch/plan.go b/watch/plan.go new file mode 100644 index 0000000000..a43849fc30 --- /dev/null +++ b/watch/plan.go @@ -0,0 +1,104 @@ +package watch + +import ( + "fmt" + "log" + "reflect" + "time" + + "github.com/armon/consul-api" +) + +const ( + // retryInterval is the base retry value + retryInterval = 5 * time.Second + + // maximum back off time, this is to prevent + // exponential runaway + maxBackoffTime = 180 * time.Second +) + +// Run is used to run a watch plan +func (p *WatchPlan) Run(address string) error { + // Setup the client + p.address = address + conf := consulapi.DefaultConfig() + conf.Address = address + conf.Datacenter = p.Datacenter + // TODO: conf.Token = p.Token + client, err := consulapi.NewClient(conf) + if err != nil { + return fmt.Errorf("Failed to connect to agent: %v", err) + } + p.client = client + + // Loop until we are canceled + failures := 0 + for !p.shouldStop() { + // Invoke the handler + index, result, err := p.Func(p) + + // Check if we should terminate since the function + // could have blocked for a while + if p.shouldStop() { + break + } + + // Handle an error in the watch function + if err != nil { + log.Printf("consul.watch: Watch '%s' errored: %v", p.Query, err) + + // Perform an exponential backoff + failures++ + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + select { + case <-time.After(retry): + continue + case <-p.stopCh: + return nil + } + } + + // Clear the failures + failures = 0 + + // If the index is unchanged do nothing + if index == p.lastIndex { + continue + } + + // Update the index, look for change + p.lastIndex = index + if reflect.DeepEqual(p.lastResult, result) { + continue + } + + // Handle the updated result + p.lastResult = result + p.Handler(index, result) + } + return nil +} + +// Stop is used to stop running the watch plan +func (p *WatchPlan) Stop() { + p.stopLock.Lock() + defer p.stopLock.Unlock() + if p.stop { + return + } + p.stop = true + close(p.stopCh) +} + +func (p *WatchPlan) shouldStop() bool { + select { + case <-p.stopCh: + return true + default: + return false + } +} diff --git a/watch/plan_test.go b/watch/plan_test.go new file mode 100644 index 0000000000..99cae3df53 --- /dev/null +++ b/watch/plan_test.go @@ -0,0 +1,48 @@ +package watch + +import ( + "testing" + "time" +) + +func init() { + watchFuncFactory["noop"] = noopWatch +} + +func noopWatch(params map[string][]string) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + idx := p.lastIndex + 1 + return idx, idx, nil + } + return fn, nil +} + +func TestRun_Stop(t *testing.T) { + plan, err := Parse("type:noop") + if err != nil { + t.Fatalf("err: %v", err) + } + var expect uint64 = 1 + plan.Handler = func(idx uint64, val interface{}) { + if idx != expect { + t.Fatalf("Bad: %d %d", expect, idx) + } + if val != expect { + t.Fatalf("Bad: %d %d", expect, val) + } + expect++ + } + + time.AfterFunc(10*time.Millisecond, func() { + plan.Stop() + }) + + err = plan.Run("127.0.0.1:8500") + if err != nil { + t.Fatalf("err: %v", err) + } + + if expect == 1 { + t.Fatalf("Bad: %d", expect) + } +} diff --git a/watch/watch.go b/watch/watch.go index c2785bfe1f..9d88f9a35d 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -3,6 +3,9 @@ package watch import ( "fmt" "strings" + "sync" + + "github.com/armon/consul-api" ) // WatchPlan is the parsed version of a watch specification. A watch provides @@ -10,11 +13,29 @@ import ( // This view is watched for changes and a handler is invoked to take any // appropriate actions. type WatchPlan struct { + Query string Datacenter string Token string Type string + Func WatchFunc + Handler HandlerFunc + + address string + client *consulapi.Client + lastIndex uint64 + lastResult interface{} + + stop bool + stopCh chan struct{} + stopLock sync.Mutex } +// WatchFunc is used to watch for a diff +type WatchFunc func(*WatchPlan) (uint64, interface{}, error) + +// HandlerFunc is used to handle new data +type HandlerFunc func(uint64, interface{}) + // Parse takes a watch query and compiles it into a WatchPlan or an error func Parse(query string) (*WatchPlan, error) { tokens, err := tokenize(query) @@ -22,21 +43,48 @@ func Parse(query string) (*WatchPlan, error) { return nil, fmt.Errorf("Failed to parse: %v", err) } params := collapse(tokens) - plan := &WatchPlan{} + plan := &WatchPlan{ + Query: query, + stopCh: make(chan struct{}), + } - if err := assignValue(params, "type", &plan.Type); err != nil { - return nil, err - } - if plan.Type == "" { - return nil, fmt.Errorf("Watch type must be specified") - } + // Parse the generic parameters if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil { return nil, err } if err := assignValue(params, "token", &plan.Token); err != nil { return nil, err } + if err := assignValue(params, "type", &plan.Type); err != nil { + return nil, err + } + // Ensure there is a watch type + if plan.Type == "" { + return nil, fmt.Errorf("Watch type must be specified") + } + + // Look for a factory function + factory := watchFuncFactory[plan.Type] + if factory == nil { + return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type) + } + + // Get the watch func + fn, err := factory(params) + if err != nil { + return nil, err + } + plan.Func = fn + + // Ensure all parameters are consumed + if len(params) != 0 { + var bad []string + for key := range params { + bad = append(bad, key) + } + return nil, fmt.Errorf("Invalid parameters: %v", bad) + } return plan, nil } diff --git a/watch/watch_test.go b/watch/watch_test.go index 35dc6151eb..121da7378c 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -90,7 +90,7 @@ func TestCollapse(t *testing.T) { } func TestParseBasic(t *testing.T) { - p, err := Parse("type:key datacenter:dc2 token:12345") + p, err := Parse("type:key datacenter:dc2 token:12345 key:foo") if err != nil { t.Fatalf("err: %v", err) } From 68a829119ea457e1324ee2ec91106a36804b9a57 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:18:08 -0700 Subject: [PATCH 04/20] watch: test key watch --- watch/funcs.go | 3 +++ watch/plan.go | 7 +++++-- watch/plan_test.go | 11 ++++++++--- 3 files changed, 16 insertions(+), 5 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index 7593a468fb..5b39320f71 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -35,6 +35,9 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { if err != nil { return 0, nil, err } + if pair == nil { + return meta.LastIndex, nil, err + } return meta.LastIndex, pair, err } return fn, nil diff --git a/watch/plan.go b/watch/plan.go index a43849fc30..2faea27d85 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -34,6 +34,7 @@ func (p *WatchPlan) Run(address string) error { // Loop until we are canceled failures := 0 +OUTER: for !p.shouldStop() { // Invoke the handler index, result, err := p.Func(p) @@ -56,7 +57,7 @@ func (p *WatchPlan) Run(address string) error { } select { case <-time.After(retry): - continue + continue OUTER case <-p.stopCh: return nil } @@ -78,7 +79,9 @@ func (p *WatchPlan) Run(address string) error { // Handle the updated result p.lastResult = result - p.Handler(index, result) + if p.Handler != nil { + p.Handler(index, result) + } } return nil } diff --git a/watch/plan_test.go b/watch/plan_test.go index 99cae3df53..d898495005 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -17,11 +17,16 @@ func noopWatch(params map[string][]string) (WatchFunc, error) { return fn, nil } -func TestRun_Stop(t *testing.T) { - plan, err := Parse("type:noop") +func mustParse(t *testing.T, q string) *WatchPlan { + plan, err := Parse(q) if err != nil { t.Fatalf("err: %v", err) } + return plan +} + +func TestRun_Stop(t *testing.T) { + plan := mustParse(t, "type:noop") var expect uint64 = 1 plan.Handler = func(idx uint64, val interface{}) { if idx != expect { @@ -37,7 +42,7 @@ func TestRun_Stop(t *testing.T) { plan.Stop() }) - err = plan.Run("127.0.0.1:8500") + err := plan.Run("127.0.0.1:8500") if err != nil { t.Fatalf("err: %v", err) } From 970c606f1a3a1d26520ece0ecf5e393e65270e93 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:22:22 -0700 Subject: [PATCH 05/20] watch: Support for key prefix --- watch/funcs.go | 24 ++++++++- watch/funcs_test.go | 122 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 145 insertions(+), 1 deletion(-) create mode 100644 watch/funcs_test.go diff --git a/watch/funcs.go b/watch/funcs.go index 5b39320f71..8b1e67624e 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -15,7 +15,8 @@ var watchFuncFactory map[string]watchFactory func init() { watchFuncFactory = map[string]watchFactory{ - "key": keyWatch, + "key": keyWatch, + "keyprefix": keyPrefixWatch, } } @@ -42,3 +43,24 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// keyPrefixWatch is used to return a key prefix watching function +func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { + list := params["prefix"] + delete(params, "prefix") + if len(list) != 1 { + return nil, fmt.Errorf("Must specify a single prefix to watch") + } + prefix := list[0] + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + kv := p.client.KV() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + pairs, meta, err := kv.List(prefix, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, pairs, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go new file mode 100644 index 0000000000..bf52a13fa8 --- /dev/null +++ b/watch/funcs_test.go @@ -0,0 +1,122 @@ +package watch + +import ( + "os" + "testing" + "time" + + "github.com/armon/consul-api" +) + +var consulAddr string + +func init() { + consulAddr = os.Getenv("CONSUL_ADDR") +} + +func TestKeyWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:key key:foo/bar/baz") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(*consulapi.KVPair) + if !ok || v == nil || string(v.Value) != "test" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar/baz", + Value: []byte("test"), + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar/baz", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestKeyPrefixWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:keyprefix prefix:foo/") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(consulapi.KVPairs) + if !ok || v == nil || string(v[0].Key) != "foo/bar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + defer plan.Stop() + time.Sleep(20 * time.Millisecond) + + kv := plan.client.KV() + pair := &consulapi.KVPair{ + Key: "foo/bar", + } + _, err := kv.Put(pair, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the query to run + time.Sleep(20 * time.Millisecond) + plan.Stop() + + // Delete the key + _, err = kv.Delete("foo/bar", nil) + if err != nil { + t.Fatalf("err: %v", err) + } + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} From 00358baa7f448bbc1f0f834a4fd8fa02369aedeb Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:29:31 -0700 Subject: [PATCH 06/20] watch: service watcher --- watch/funcs.go | 18 ++++++++++++++++++ watch/funcs_test.go | 46 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+) diff --git a/watch/funcs.go b/watch/funcs.go index 8b1e67624e..d031d94a76 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -17,6 +17,10 @@ func init() { watchFuncFactory = map[string]watchFactory{ "key": keyWatch, "keyprefix": keyPrefixWatch, + "services": servicesWatch, + "nodes": nil, + "service": nil, + "checks": nil, } } @@ -64,3 +68,17 @@ func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// servicesWatch is used to watch the list of available services +func servicesWatch(params map[string][]string) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + catalog := p.client.Catalog() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + services, meta, err := catalog.Services(&opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, services, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index bf52a13fa8..1cfc06c6db 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -80,6 +80,9 @@ func TestKeyPrefixWatch(t *testing.T) { return } v, ok := raw.(consulapi.KVPairs) + if ok && v == nil { + return + } if !ok || v == nil || string(v[0].Key) != "foo/bar" { t.Fatalf("Bad: %#v", raw) } @@ -120,3 +123,46 @@ func TestKeyPrefixWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestServicesWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:services") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.(map[string][]string) + if !ok || v["consul"] == nil { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + plan.Stop() + + agent := plan.client.Agent() + reg := &consulapi.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + } + agent.ServiceRegister(reg) + time.Sleep(20 * time.Millisecond) + agent.ServiceDeregister("foo") + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} From 8d70128761f7c4b2e4520926c74c9d84edb66821 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:33:13 -0700 Subject: [PATCH 07/20] watch: node watcher --- watch/funcs.go | 16 ++++++++++++++- watch/funcs_test.go | 49 +++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 64 insertions(+), 1 deletion(-) diff --git a/watch/funcs.go b/watch/funcs.go index d031d94a76..a63527af24 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -18,7 +18,7 @@ func init() { "key": keyWatch, "keyprefix": keyPrefixWatch, "services": servicesWatch, - "nodes": nil, + "nodes": nodesWatch, "service": nil, "checks": nil, } @@ -82,3 +82,17 @@ func servicesWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// nodesWatch is used to watch the list of available nodes +func nodesWatch(params map[string][]string) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + catalog := p.client.Catalog() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := catalog.Nodes(&opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 1cfc06c6db..2b06d2a71a 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -166,3 +166,52 @@ func TestServicesWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestNodesWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:nodes") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.Node) + if !ok || len(v) == 0 { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + plan.Stop() + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Register(reg, nil) + time.Sleep(20 * time.Millisecond) + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} From f3c8873009dcb4312214c185aa891d4f66659428 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 15:50:32 -0700 Subject: [PATCH 08/20] watch: supporting service watch --- watch/funcs.go | 59 ++++++++++++++++++++++++++++++++++++++------- watch/funcs_test.go | 49 +++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 9 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index a63527af24..a3203b3d6e 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "strconv" "github.com/armon/consul-api" ) @@ -19,19 +20,20 @@ func init() { "keyprefix": keyPrefixWatch, "services": servicesWatch, "nodes": nodesWatch, - "service": nil, + "service": serviceWatch, "checks": nil, } } // keyWatch is used to return a key watching function func keyWatch(params map[string][]string) (WatchFunc, error) { - keys := params["key"] - delete(params, "key") - if len(keys) != 1 { + var key string + if err := assignValue(params, "key", &key); err != nil { + return nil, err + } + if key == "" { return nil, fmt.Errorf("Must specify a single key to watch") } - key := keys[0] fn := func(p *WatchPlan) (uint64, interface{}, error) { kv := p.client.KV() @@ -50,12 +52,13 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { // keyPrefixWatch is used to return a key prefix watching function func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { - list := params["prefix"] - delete(params, "prefix") - if len(list) != 1 { + var prefix string + if err := assignValue(params, "prefix", &prefix); err != nil { + return nil, err + } + if prefix == "" { return nil, fmt.Errorf("Must specify a single prefix to watch") } - prefix := list[0] fn := func(p *WatchPlan) (uint64, interface{}, error) { kv := p.client.KV() @@ -96,3 +99,41 @@ func nodesWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// serviceWatch is used to watch a specific service for changes +func serviceWatch(params map[string][]string) (WatchFunc, error) { + var service, tag, passingRaw string + if err := assignValue(params, "service", &service); err != nil { + return nil, err + } + if service == "" { + return nil, fmt.Errorf("Must specify a single service to watch") + } + + if err := assignValue(params, "tag", &tag); err != nil { + return nil, err + } + + if err := assignValue(params, "passingonly", &passingRaw); err != nil { + return nil, err + } + passingOnly := false + if passingRaw != "" { + b, err := strconv.ParseBool(passingRaw) + if err != nil { + return nil, fmt.Errorf("Failed to parse passingonly value: %v", err) + } + passingOnly = b + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + health := p.client.Health() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := health.Service(service, tag, passingOnly, &opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 2b06d2a71a..8b13f27a30 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -215,3 +215,52 @@ func TestNodesWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestServiceWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:service service:foo tag:bar passingonly:true") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.ServiceEntry) + if ok && len(v) == 0 { + return + } + if !ok || v[0].Service.ID != "foo" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + agent := plan.client.Agent() + reg := &consulapi.AgentServiceRegistration{ + ID: "foo", + Name: "foo", + Tags: []string{"bar"}, + } + agent.ServiceRegister(reg) + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + agent.ServiceDeregister("foo") + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} From 5aefdf01a08a81d7646c64370f3c8c65c23ec9ec Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 16:32:12 -0700 Subject: [PATCH 09/20] watch: support checks watch --- watch/funcs.go | 37 ++++++++++++- watch/funcs_test.go | 128 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 164 insertions(+), 1 deletion(-) diff --git a/watch/funcs.go b/watch/funcs.go index a3203b3d6e..43d3742d0c 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -21,7 +21,7 @@ func init() { "services": servicesWatch, "nodes": nodesWatch, "service": serviceWatch, - "checks": nil, + "checks": checksWatch, } } @@ -137,3 +137,38 @@ func serviceWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// checksWatch is used to watch a specific checks in a given state +func checksWatch(params map[string][]string) (WatchFunc, error) { + var service, state string + if err := assignValue(params, "service", &service); err != nil { + return nil, err + } + if err := assignValue(params, "state", &state); err != nil { + return nil, err + } + if service != "" && state != "" { + return nil, fmt.Errorf("Cannot specify service and state") + } + if service == "" && state == "" { + state = "any" + } + + fn := func(p *WatchPlan) (uint64, interface{}, error) { + health := p.client.Health() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + var checks []*consulapi.HealthCheck + var meta *consulapi.QueryMeta + var err error + if state != "" { + checks, meta, err = health.State(state, &opts) + } else { + checks, meta, err = health.Checks(service, &opts) + } + if err != nil { + return 0, nil, err + } + return meta.LastIndex, checks, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 8b13f27a30..fddf9af429 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -264,3 +264,131 @@ func TestServiceWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestChecksWatch_State(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:checks state:warning") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.HealthCheck) + if len(v) == 0 { + return + } + if !ok || v[0].CheckID != "foobar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Check: &consulapi.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: "warning", + }, + } + catalog.Register(reg, nil) + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} + +func TestChecksWatch_Service(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:checks service:foobar") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.HealthCheck) + if len(v) == 0 { + return + } + if !ok || v[0].CheckID != "foobar" { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + Service: &consulapi.AgentService{ + ID: "foobar", + Service: "foobar", + }, + Check: &consulapi.AgentCheck{ + Node: "foobar", + CheckID: "foobar", + Name: "foobar", + Status: "passing", + ServiceID: "foobar", + }, + } + _, err := catalog.Register(reg, nil) + if err != nil { + t.Fatalf("err: %v", err) + } + + time.Sleep(20 * time.Millisecond) + plan.Stop() + + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +} From 723352fc3774daf373587419f43b4fbf3345b30c Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 16:38:15 -0700 Subject: [PATCH 10/20] watch: support parameter exemption --- watch/watch.go | 19 +++++++++++++++++++ watch/watch_test.go | 14 ++++++++++++++ 2 files changed, 33 insertions(+) diff --git a/watch/watch.go b/watch/watch.go index 9d88f9a35d..91ed51c4a2 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -17,6 +17,7 @@ type WatchPlan struct { Datacenter string Token string Type string + Exempt map[string][]string Func WatchFunc Handler HandlerFunc @@ -38,6 +39,12 @@ type HandlerFunc func(uint64, interface{}) // Parse takes a watch query and compiles it into a WatchPlan or an error func Parse(query string) (*WatchPlan, error) { + return ParseExempt(query, nil) +} + +// ParseExempt takes a watch query and compiles it into a WatchPlan or an error +// Any exempt parameters are stored in the Exempt map +func ParseExempt(query string, exempt []string) (*WatchPlan, error) { tokens, err := tokenize(query) if err != nil { return nil, fmt.Errorf("Failed to parse: %v", err) @@ -77,6 +84,18 @@ func Parse(query string) (*WatchPlan, error) { } plan.Func = fn + // Remove the exempt parameters + if len(exempt) > 0 { + plan.Exempt = make(map[string][]string) + for _, ex := range exempt { + val, ok := params[ex] + if ok { + plan.Exempt[ex] = val + delete(params, ex) + } + } + } + // Ensure all parameters are consumed if len(params) != 0 { var bad []string diff --git a/watch/watch_test.go b/watch/watch_test.go index 121da7378c..dd0f80f891 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -104,3 +104,17 @@ func TestParseBasic(t *testing.T) { t.Fatalf("Bad: %#v", p) } } + +func TestParse_exempt(t *testing.T) { + p, err := ParseExempt("type:key key:foo handler:foobar", []string{"handler"}) + if err != nil { + t.Fatalf("err: %v", err) + } + if p.Type != "key" { + t.Fatalf("Bad: %#v", p) + } + ex := p.Exempt["handler"] + if len(ex) != 1 && ex[0] != "foobar" { + t.Fatalf("bad: %v", ex) + } +} From ee4a1a960f49808131ae0a0993bed8f77db59ca9 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Wed, 20 Aug 2014 16:45:37 -0700 Subject: [PATCH 11/20] watch: Set the ACL token --- watch/plan.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/watch/plan.go b/watch/plan.go index 2faea27d85..d496f9809a 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -25,7 +25,7 @@ func (p *WatchPlan) Run(address string) error { conf := consulapi.DefaultConfig() conf.Address = address conf.Datacenter = p.Datacenter - // TODO: conf.Token = p.Token + conf.Token = p.Token client, err := consulapi.NewClient(conf) if err != nil { return fmt.Errorf("Failed to connect to agent: %v", err) From ad40ddf361f929263a88f0feb0e7a383fd791571 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 11:38:44 -0700 Subject: [PATCH 12/20] watch: Remove DSL in place of JSON --- watch/funcs.go | 28 ++++----- watch/funcs_test.go | 14 ++--- watch/plan.go | 2 +- watch/plan_test.go | 7 ++- watch/watch.go | 143 +++++++------------------------------------- watch/watch_test.go | 104 +++++--------------------------- 6 files changed, 59 insertions(+), 239 deletions(-) diff --git a/watch/funcs.go b/watch/funcs.go index 43d3742d0c..a6d1015964 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -2,14 +2,13 @@ package watch import ( "fmt" - "strconv" "github.com/armon/consul-api" ) // watchFactory is a function that can create a new WatchFunc // from a parameter configuration -type watchFactory func(params map[string][]string) (WatchFunc, error) +type watchFactory func(params map[string]interface{}) (WatchFunc, error) // watchFuncFactory maps each type to a factory function var watchFuncFactory map[string]watchFactory @@ -26,7 +25,7 @@ func init() { } // keyWatch is used to return a key watching function -func keyWatch(params map[string][]string) (WatchFunc, error) { +func keyWatch(params map[string]interface{}) (WatchFunc, error) { var key string if err := assignValue(params, "key", &key); err != nil { return nil, err @@ -51,7 +50,7 @@ func keyWatch(params map[string][]string) (WatchFunc, error) { } // keyPrefixWatch is used to return a key prefix watching function -func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { +func keyPrefixWatch(params map[string]interface{}) (WatchFunc, error) { var prefix string if err := assignValue(params, "prefix", &prefix); err != nil { return nil, err @@ -73,7 +72,7 @@ func keyPrefixWatch(params map[string][]string) (WatchFunc, error) { } // servicesWatch is used to watch the list of available services -func servicesWatch(params map[string][]string) (WatchFunc, error) { +func servicesWatch(params map[string]interface{}) (WatchFunc, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} @@ -87,7 +86,7 @@ func servicesWatch(params map[string][]string) (WatchFunc, error) { } // nodesWatch is used to watch the list of available nodes -func nodesWatch(params map[string][]string) (WatchFunc, error) { +func nodesWatch(params map[string]interface{}) (WatchFunc, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) { catalog := p.client.Catalog() opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} @@ -101,8 +100,8 @@ func nodesWatch(params map[string][]string) (WatchFunc, error) { } // serviceWatch is used to watch a specific service for changes -func serviceWatch(params map[string][]string) (WatchFunc, error) { - var service, tag, passingRaw string +func serviceWatch(params map[string]interface{}) (WatchFunc, error) { + var service, tag string if err := assignValue(params, "service", &service); err != nil { return nil, err } @@ -114,16 +113,9 @@ func serviceWatch(params map[string][]string) (WatchFunc, error) { return nil, err } - if err := assignValue(params, "passingonly", &passingRaw); err != nil { - return nil, err - } passingOnly := false - if passingRaw != "" { - b, err := strconv.ParseBool(passingRaw) - if err != nil { - return nil, fmt.Errorf("Failed to parse passingonly value: %v", err) - } - passingOnly = b + if err := assignValueBool(params, "passingonly", &passingOnly); err != nil { + return nil, err } fn := func(p *WatchPlan) (uint64, interface{}, error) { @@ -139,7 +131,7 @@ func serviceWatch(params map[string][]string) (WatchFunc, error) { } // checksWatch is used to watch a specific checks in a given state -func checksWatch(params map[string][]string) (WatchFunc, error) { +func checksWatch(params map[string]interface{}) (WatchFunc, error) { var service, state string if err := assignValue(params, "service", &service); err != nil { return nil, err diff --git a/watch/funcs_test.go b/watch/funcs_test.go index fddf9af429..d0dbb0c8f2 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -18,7 +18,7 @@ func TestKeyWatch(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:key key:foo/bar/baz") + plan := mustParse(t, `{"type":"key", "key":"foo/bar/baz"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -72,7 +72,7 @@ func TestKeyPrefixWatch(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:keyprefix prefix:foo/") + plan := mustParse(t, `{"type":"keyprefix", "prefix":"foo/"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -128,7 +128,7 @@ func TestServicesWatch(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:services") + plan := mustParse(t, `{"type":"services"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -171,7 +171,7 @@ func TestNodesWatch(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:nodes") + plan := mustParse(t, `{"type":"nodes"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -220,7 +220,7 @@ func TestServiceWatch(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:service service:foo tag:bar passingonly:true") + plan := mustParse(t, `{"type":"service", "service":"foo", "tag":"bar", "passingonly":true}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -269,7 +269,7 @@ func TestChecksWatch_State(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:checks state:warning") + plan := mustParse(t, `{"type":"checks", "state":"warning"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { @@ -329,7 +329,7 @@ func TestChecksWatch_Service(t *testing.T) { if consulAddr == "" { t.Skip() } - plan := mustParse(t, "type:checks service:foobar") + plan := mustParse(t, `{"type":"checks", "service":"foobar"}`) invoke := 0 plan.Handler = func(idx uint64, raw interface{}) { if invoke == 0 { diff --git a/watch/plan.go b/watch/plan.go index d496f9809a..07576dfe11 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -47,7 +47,7 @@ OUTER: // Handle an error in the watch function if err != nil { - log.Printf("consul.watch: Watch '%s' errored: %v", p.Query, err) + log.Printf("consul.watch: Watch (type: %s) errored: %v", p.Type, err) // Perform an exponential backoff failures++ diff --git a/watch/plan_test.go b/watch/plan_test.go index d898495005..e2dad6d79b 100644 --- a/watch/plan_test.go +++ b/watch/plan_test.go @@ -9,7 +9,7 @@ func init() { watchFuncFactory["noop"] = noopWatch } -func noopWatch(params map[string][]string) (WatchFunc, error) { +func noopWatch(params map[string]interface{}) (WatchFunc, error) { fn := func(p *WatchPlan) (uint64, interface{}, error) { idx := p.lastIndex + 1 return idx, idx, nil @@ -18,7 +18,8 @@ func noopWatch(params map[string][]string) (WatchFunc, error) { } func mustParse(t *testing.T, q string) *WatchPlan { - plan, err := Parse(q) + params := makeParams(t, q) + plan, err := Parse(params) if err != nil { t.Fatalf("err: %v", err) } @@ -26,7 +27,7 @@ func mustParse(t *testing.T, q string) *WatchPlan { } func TestRun_Stop(t *testing.T) { - plan := mustParse(t, "type:noop") + plan := mustParse(t, `{"type":"noop"}`) var expect uint64 = 1 plan.Handler = func(idx uint64, val interface{}) { if idx != expect { diff --git a/watch/watch.go b/watch/watch.go index 91ed51c4a2..58281faae1 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -2,7 +2,6 @@ package watch import ( "fmt" - "strings" "sync" "github.com/armon/consul-api" @@ -13,11 +12,10 @@ import ( // This view is watched for changes and a handler is invoked to take any // appropriate actions. type WatchPlan struct { - Query string Datacenter string Token string Type string - Exempt map[string][]string + Exempt map[string]interface{} Func WatchFunc Handler HandlerFunc @@ -38,20 +36,14 @@ type WatchFunc func(*WatchPlan) (uint64, interface{}, error) type HandlerFunc func(uint64, interface{}) // Parse takes a watch query and compiles it into a WatchPlan or an error -func Parse(query string) (*WatchPlan, error) { - return ParseExempt(query, nil) +func Parse(params map[string]interface{}) (*WatchPlan, error) { + return ParseExempt(params, nil) } // ParseExempt takes a watch query and compiles it into a WatchPlan or an error // Any exempt parameters are stored in the Exempt map -func ParseExempt(query string, exempt []string) (*WatchPlan, error) { - tokens, err := tokenize(query) - if err != nil { - return nil, fmt.Errorf("Failed to parse: %v", err) - } - params := collapse(tokens) +func ParseExempt(params map[string]interface{}, exempt []string) (*WatchPlan, error) { plan := &WatchPlan{ - Query: query, stopCh: make(chan struct{}), } @@ -86,7 +78,7 @@ func ParseExempt(query string, exempt []string) (*WatchPlan, error) { // Remove the exempt parameters if len(exempt) > 0 { - plan.Exempt = make(map[string][]string) + plan.Exempt = make(map[string]interface{}) for _, ex := range exempt { val, ok := params[ex] if ok { @@ -107,121 +99,28 @@ func ParseExempt(query string, exempt []string) (*WatchPlan, error) { return plan, nil } -// assignValue is used to extract a value ensuring it is only -// defined once -func assignValue(params map[string][]string, name string, out *string) error { - if vals, ok := params[name]; ok { - if len(vals) != 1 { - return fmt.Errorf("Multiple definitions of %s", name) +// assignValue is used to extract a value ensuring it is a string +func assignValue(params map[string]interface{}, name string, out *string) error { + if raw, ok := params[name]; ok { + val, ok := raw.(string) + if !ok { + return fmt.Errorf("Expecting %s to be a string") } - *out = vals[0] + *out = val delete(params, name) } return nil } -// token is used to represent a "datacenter:foobar" pair, where -// datacenter is the param and foobar is the value -type token struct { - param string - val string -} - -func (t *token) GoString() string { - return fmt.Sprintf("%#v", *t) -} - -// tokenize splits a query string into a slice of tokens -func tokenize(query string) ([]*token, error) { - var tokens []*token - for i := 0; i < len(query); i++ { - char := query[i] - - // Ignore whitespace - if char == ' ' || char == '\t' || char == '\n' { - continue +// assignValueBool is used to extract a value ensuring it is a bool +func assignValueBool(params map[string]interface{}, name string, out *bool) error { + if raw, ok := params[name]; ok { + val, ok := raw.(bool) + if !ok { + return fmt.Errorf("Expecting %s to be a boolean") } - - // Read the next token - next, offset, err := readToken(query[i:]) - if err != nil { - return nil, err - } - - // Store the token - tokens = append(tokens, next) - - // Increment the offset - i += offset + *out = val + delete(params, name) } - return tokens, nil -} - -// readToken is used to read a single token -func readToken(query string) (*token, int, error) { - // Get the token - param, offset, err := readParameter(query) - if err != nil { - return nil, 0, err - } - - // Get the value - query = query[offset:] - val, offset2, err := readValue(query) - if err != nil { - return nil, 0, err - } - - // Return the new token - token := &token{ - param: param, - val: val, - } - return token, offset + offset2, nil -} - -// readParameter scans for the next parameter -func readParameter(query string) (string, int, error) { - for i := 0; i < len(query); i++ { - char := query[i] - if char == ':' { - if i == 0 { - return "", 0, fmt.Errorf("Missing parameter name") - } else { - return query[:i], i + 1, nil - } - } - } - return "", 0, fmt.Errorf("Parameter delimiter not found") -} - -// readValue is used to scan for the next value -func readValue(query string) (string, int, error) { - // Handle quoted values - if query[0] == '\'' || query[0] == '"' { - quoteChar := query[0:1] - endChar := strings.Index(query[1:], quoteChar) - if endChar == -1 { - return "", 0, fmt.Errorf("Missing end of quotation") - } - return query[1 : endChar+1], endChar + 2, nil - } - - // Look for white space - endChar := strings.IndexAny(query, " \t\n") - if endChar == -1 { - return query, len(query), nil - } - return query[:endChar], endChar, nil -} - -// collapse is used to collapse a token stream into a map -// of parameter name to list of values. -func collapse(tokens []*token) map[string][]string { - out := make(map[string][]string) - for _, t := range tokens { - existing := out[t.param] - out[t.param] = append(existing, t.val) - } - return out + return nil } diff --git a/watch/watch_test.go b/watch/watch_test.go index dd0f80f891..f4597b46f3 100644 --- a/watch/watch_test.go +++ b/watch/watch_test.go @@ -1,96 +1,14 @@ package watch import ( - "fmt" - "reflect" + "bytes" + "encoding/json" "testing" ) -func TestTokenize(t *testing.T) { - type tcase struct { - in string - out []*token - err error - } - cases := []tcase{ - tcase{ - "", - nil, - nil, - }, - tcase{ - "foo:bar bar:baz zip:zap", - []*token{ - &token{"foo", "bar"}, - &token{"bar", "baz"}, - &token{"zip", "zap"}, - }, - nil, - }, - tcase{ - "foo:\"long input here\" after:this", - []*token{ - &token{"foo", "long input here"}, - &token{"after", "this"}, - }, - nil, - }, - tcase{ - "foo:'long input here' after:this", - []*token{ - &token{"foo", "long input here"}, - &token{"after", "this"}, - }, - nil, - }, - tcase{ - "foo:'long input here after:this", - nil, - fmt.Errorf("Missing end of quotation"), - }, - tcase{ - "foo", - nil, - fmt.Errorf("Parameter delimiter not found"), - }, - tcase{ - ":val", - nil, - fmt.Errorf("Missing parameter name"), - }, - } - - for _, tc := range cases { - tokens, err := tokenize(tc.in) - if err != nil && tc.err == nil { - t.Fatalf("%s: err: %v", tc.in, err) - } else if tc.err != nil && (err == nil || err.Error() != tc.err.Error()) { - t.Fatalf("%s: bad err: %v", tc.in, err) - } - if !reflect.DeepEqual(tokens, tc.out) { - t.Fatalf("%s: bad: %#v %#v", tc.in, tokens, tc.out) - } - } -} - -func TestCollapse(t *testing.T) { - inp := "type:key key:foo key:bar" - tokens, err := tokenize(inp) - if err != nil { - t.Fatalf("err: %v", err) - } - out := collapse(tokens) - expect := map[string][]string{ - "type": []string{"key"}, - "key": []string{"foo", "bar"}, - } - if !reflect.DeepEqual(out, expect) { - t.Fatalf("bad: %#v", out) - } -} - func TestParseBasic(t *testing.T) { - p, err := Parse("type:key datacenter:dc2 token:12345 key:foo") + params := makeParams(t, `{"type":"key", "datacenter":"dc2", "token":"12345", "key":"foo"}`) + p, err := Parse(params) if err != nil { t.Fatalf("err: %v", err) } @@ -106,7 +24,8 @@ func TestParseBasic(t *testing.T) { } func TestParse_exempt(t *testing.T) { - p, err := ParseExempt("type:key key:foo handler:foobar", []string{"handler"}) + params := makeParams(t, `{"type":"key", "key":"foo", "handler": "foobar"}`) + p, err := ParseExempt(params, []string{"handler"}) if err != nil { t.Fatalf("err: %v", err) } @@ -114,7 +33,16 @@ func TestParse_exempt(t *testing.T) { t.Fatalf("Bad: %#v", p) } ex := p.Exempt["handler"] - if len(ex) != 1 && ex[0] != "foobar" { + if ex != "foobar" { t.Fatalf("bad: %v", ex) } } + +func makeParams(t *testing.T, s string) map[string]interface{} { + var out map[string]interface{} + dec := json.NewDecoder(bytes.NewReader([]byte(s))) + if err := dec.Decode(&out); err != nil { + t.Fatalf("err: %v", err) + } + return out +} From e877753162096049b0c8ef3d37ef3a7379e4aad1 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 11:52:36 -0700 Subject: [PATCH 13/20] agent: Changing to use nested JSON for watches --- command/agent/config.go | 2 +- command/agent/config_test.go | 19 ++++++++++++++++--- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/command/agent/config.go b/command/agent/config.go index e40f4b568b..cba6f3e8f2 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -232,7 +232,7 @@ type Config struct { // Watches are used to monitor various endpoints and to invoke a // handler to act appropriately. These are managed entirely in the // agent layer using the standard APIs. - Watches []string `mapstructure:"watches"` + Watches []map[string]interface{} `mapstructure:"watches"` // AEInterval controls the anti-entropy interval. This is how often // the agent attempts to reconcile it's local state with the server' diff --git a/command/agent/config_test.go b/command/agent/config_test.go index b57037a51e..75c0610a9e 100644 --- a/command/agent/config_test.go +++ b/command/agent/config_test.go @@ -386,7 +386,7 @@ func TestDecodeConfig(t *testing.T) { } // Watches - input = `{"watches": ["type:keyprefix prefix:foo/ handler:foobar"]}` + input = `{"watches": [{"type":"keyprefix", "prefix":"foo/", "handler":"foobar"}]}` config, err = DecodeConfig(bytes.NewReader([]byte(input))) if err != nil { t.Fatalf("err: %s", err) @@ -395,7 +395,14 @@ func TestDecodeConfig(t *testing.T) { if len(config.Watches) != 1 { t.Fatalf("bad: %#v", config) } - if config.Watches[0] != "type:keyprefix prefix:foo/ handler:foobar" { + + out := config.Watches[0] + exp := map[string]interface{}{ + "type": "keyprefix", + "prefix": "foo/", + "handler": "foobar", + } + if !reflect.DeepEqual(out, exp) { t.Fatalf("bad: %#v", config) } } @@ -552,7 +559,13 @@ func TestMergeConfig(t *testing.T) { ACLTTLRaw: "15s", ACLDownPolicy: "deny", ACLDefaultPolicy: "deny", - Watches: []string{"type:keyprefix prefix:foobar/ handler:foo"}, + Watches: []map[string]interface{}{ + map[string]interface{}{ + "type": "keyprefix", + "prefix": "foo/", + "handler": "foobar", + }, + }, } c := MergeConfig(a, b) From 4b547a43d0e399747ae49d86d3f50c23003dbf75 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 13:09:13 -0700 Subject: [PATCH 14/20] agent: First pass at agent-based watches --- command/agent/command.go | 61 +++++++++++++++++++++++ command/agent/config.go | 20 ++++++++ command/agent/dns.go | 2 +- command/agent/watch_handler.go | 88 ++++++++++++++++++++++++++++++++++ watch/plan.go | 12 ++++- watch/watch.go | 7 ++- 6 files changed, 185 insertions(+), 5 deletions(-) create mode 100644 command/agent/watch_handler.go diff --git a/command/agent/command.go b/command/agent/command.go index 6410383eaf..2a580eaf79 100644 --- a/command/agent/command.go +++ b/command/agent/command.go @@ -14,6 +14,7 @@ import ( "time" "github.com/armon/go-metrics" + "github.com/hashicorp/consul/watch" "github.com/hashicorp/go-syslog" "github.com/hashicorp/logutils" "github.com/mitchellh/cli" @@ -37,6 +38,7 @@ type Command struct { ShutdownCh <-chan struct{} args []string logFilter *logutils.LevelFilter + logOutput io.Writer agent *Agent rpcServer *AgentRPC httpServer *HTTPServer @@ -141,6 +143,25 @@ func (c *Command) readConfig() *Config { return nil } + // Compile all the watches + for _, params := range config.Watches { + // Parse the watches, excluding the handler + wp, err := watch.ParseExempt(params, []string{"handler"}) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse watch (%#v): %v", params, err)) + return nil + } + + // Get the handler + if err := verifyWatchHandler(wp.Exempt["handler"]); err != nil { + c.Ui.Error(fmt.Sprintf("Failed to setup watch handler (%#v): %v", params, err)) + return nil + } + + // Store the watch plan + config.WatchPlans = append(config.WatchPlans, wp) + } + // Warn if we are in expect mode if config.BootstrapExpect == 1 { c.Ui.Error("WARNING: BootstrapExpect Mode is specified as 1; this is the same as Bootstrap mode.") @@ -206,6 +227,7 @@ func (c *Command) setupLoggers(config *Config) (*GatedWriter, *logWriter, io.Wri } else { logOutput = io.MultiWriter(c.logFilter, logWriter) } + c.logOutput = logOutput return logGate, logWriter, logOutput } @@ -377,6 +399,23 @@ func (c *Command) Run(args []string) int { } } + // Get the new client listener addr + httpAddr, err := config.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Register the watches + for _, wp := range config.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + // Let the agent know we've finished registration c.agent.StartSync() @@ -518,6 +557,28 @@ func (c *Command) handleReload(config *Config) *Config { } } + // Get the new client listener addr + httpAddr, err := newConf.ClientListenerAddr(config.Ports.HTTP) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to determine HTTP address: %v", err)) + } + + // Deregister the old watches + for _, wp := range config.WatchPlans { + wp.Stop() + } + + // Register the new watches + for _, wp := range newConf.WatchPlans { + go func() { + wp.Handler = makeWatchHandler(c.logOutput, wp.Exempt["handler"]) + wp.LogOutput = c.logOutput + if err := wp.Run(httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error running watch: %v", err)) + } + }() + } + return newConf } diff --git a/command/agent/config.go b/command/agent/config.go index cba6f3e8f2..f91853cdde 100644 --- a/command/agent/config.go +++ b/command/agent/config.go @@ -13,6 +13,7 @@ import ( "time" "github.com/hashicorp/consul/consul" + "github.com/hashicorp/consul/watch" "github.com/mitchellh/mapstructure" ) @@ -256,6 +257,9 @@ type Config struct { // VersionPrerelease is a label for pre-release builds VersionPrerelease string `mapstructure:"-"` + + // WatchPlans contains the compiled watches + WatchPlans []*watch.WatchPlan `mapstructure:"-" json:"-"` } type dirEnts []os.FileInfo @@ -307,6 +311,19 @@ func (c *Config) ClientListener(port int) (*net.TCPAddr, error) { return &net.TCPAddr{IP: ip, Port: port}, nil } +// ClientListenerAddr is used to format an address for a +// port on a ClientAddr, handling the zero IP. +func (c *Config) ClientListenerAddr(port int) (string, error) { + addr, err := c.ClientListener(port) + if err != nil { + return "", err + } + if addr.IP.IsUnspecified() { + addr.IP = net.ParseIP("127.0.0.1") + } + return addr.String(), nil +} + // DecodeConfig reads the configuration from the given reader in JSON // format and decodes it into a proper Config structure. func DecodeConfig(r io.Reader) (*Config, error) { @@ -656,6 +673,9 @@ func MergeConfig(a, b *Config) *Config { if len(b.Watches) != 0 { result.Watches = append(result.Watches, b.Watches...) } + if len(b.WatchPlans) != 0 { + result.WatchPlans = append(result.WatchPlans, b.WatchPlans...) + } // Copy the start join addresses result.StartJoin = make([]string, 0, len(a.StartJoin)+len(b.StartJoin)) diff --git a/command/agent/dns.go b/command/agent/dns.go index 5e4480ac23..18e2b928b4 100644 --- a/command/agent/dns.go +++ b/command/agent/dns.go @@ -280,7 +280,7 @@ PARSE: // _name._tag.service.consul d.serviceLookup(network, datacenter, labels[n-3][1:], tag, req, resp) - // Consul 0.3 and prior format for SRV queries + // Consul 0.3 and prior format for SRV queries } else { // Support "." in the label, re-join all the parts diff --git a/command/agent/watch_handler.go b/command/agent/watch_handler.go new file mode 100644 index 0000000000..ef9f8a9cf4 --- /dev/null +++ b/command/agent/watch_handler.go @@ -0,0 +1,88 @@ +package agent + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "log" + "os" + "os/exec" + "runtime" + "strconv" + + "github.com/armon/circbuf" + "github.com/hashicorp/consul/watch" +) + +const ( + // Limit the size of a watch handlers's output to the + // last WatchBufSize. Prevents an enormous buffer + // from being captured + WatchBufSize = 4 * 1024 // 4KB +) + +// verifyWatchHandler does the pre-check for our handler configuration +func verifyWatchHandler(params interface{}) error { + if params == nil { + return fmt.Errorf("Must provide watch handler") + } + _, ok := params.(string) + if !ok { + return fmt.Errorf("Watch handler must be a string") + } + return nil +} + +// makeWatchHandler returns a handler for the given watch +func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc { + script := params.(string) + logger := log.New(logOutput, "", log.LstdFlags) + fn := func(idx uint64, data interface{}) { + // Determine the shell invocation based on OS + var shell, flag string + if runtime.GOOS == "windows" { + shell = "cmd" + flag = "/C" + } else { + shell = "/bin/sh" + flag = "-c" + } + + // Create the command + cmd := exec.Command(shell, flag, script) + cmd.Env = append(os.Environ(), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + ) + + // Collect the output + output, _ := circbuf.NewBuffer(WatchBufSize) + cmd.Stdout = output + cmd.Stderr = output + + // Setup the input + var inp bytes.Buffer + enc := json.NewEncoder(&inp) + if err := enc.Encode(data); err != nil { + logger.Printf("[ERR] agent: Failed to encode data for watch '%s': %v", script, err) + return + } + cmd.Stdin = &inp + + // Run the handler + if err := cmd.Run(); err != nil { + logger.Printf("[ERR] agent: Failed to invoke watch handler '%s': %v", script, err) + } + + // Get the output, add a message about truncation + outputStr := string(output.Bytes()) + if output.TotalWritten() > output.Size() { + outputStr = fmt.Sprintf("Captured %d of %d bytes\n...\n%s", + output.Size(), output.TotalWritten(), outputStr) + } + + // Log the output + logger.Printf("[DEBUG] agent: watch handler '%s' output: %s", script, outputStr) + } + return fn +} diff --git a/watch/plan.go b/watch/plan.go index 07576dfe11..a6dc057730 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -3,6 +3,7 @@ package watch import ( "fmt" "log" + "os" "reflect" "time" @@ -32,6 +33,13 @@ func (p *WatchPlan) Run(address string) error { } p.client = client + // Create the logger + output := p.LogOutput + if output == nil { + output = os.Stderr + } + logger := log.New(output, "", log.LstdFlags) + // Loop until we are canceled failures := 0 OUTER: @@ -47,14 +55,14 @@ OUTER: // Handle an error in the watch function if err != nil { - log.Printf("consul.watch: Watch (type: %s) errored: %v", p.Type, err) - // Perform an exponential backoff failures++ retry := retryInterval * time.Duration(failures*failures) if retry > maxBackoffTime { retry = maxBackoffTime } + logger.Printf("consul.watch: Watch (type: %s) errored: %v, retry in %v", + p.Type, err, retry) select { case <-time.After(retry): continue OUTER diff --git a/watch/watch.go b/watch/watch.go index 58281faae1..0b0a69a32e 100644 --- a/watch/watch.go +++ b/watch/watch.go @@ -2,6 +2,7 @@ package watch import ( "fmt" + "io" "sync" "github.com/armon/consul-api" @@ -16,8 +17,10 @@ type WatchPlan struct { Token string Type string Exempt map[string]interface{} - Func WatchFunc - Handler HandlerFunc + + Func WatchFunc + Handler HandlerFunc + LogOutput io.Writer address string client *consulapi.Client From 46a96d9c42645b5262452febd21cc995a32bb3e2 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 14:28:16 -0700 Subject: [PATCH 15/20] agent: Refactor script invoke --- command/agent/check.go | 20 +++++-------- command/agent/util.go | 16 +++++++++++ command/agent/watch_handler.go | 18 ++++-------- command/agent/watch_handler_test.go | 44 +++++++++++++++++++++++++++++ 4 files changed, 72 insertions(+), 26 deletions(-) create mode 100644 command/agent/watch_handler_test.go diff --git a/command/agent/check.go b/command/agent/check.go index 02c3bd16d7..cf38f30cd6 100644 --- a/command/agent/check.go +++ b/command/agent/check.go @@ -6,7 +6,6 @@ import ( "github.com/hashicorp/consul/consul/structs" "log" "os/exec" - "runtime" "sync" "syscall" "time" @@ -106,18 +105,13 @@ func (c *CheckMonitor) run() { // check is invoked periodically to perform the script check func (c *CheckMonitor) check() { - // Determine the shell invocation based on OS - var shell, flag string - if runtime.GOOS == "windows" { - shell = "cmd" - flag = "/C" - } else { - shell = "/bin/sh" - flag = "-c" - } - // Create the command - cmd := exec.Command(shell, flag, c.Script) + cmd, err := ExecScript(c.Script) + if err != nil { + c.Logger.Printf("[ERR] agent: failed to setup invoke '%s': %s", c.Script, err) + c.Notify.UpdateCheck(c.CheckID, structs.HealthUnknown, err.Error()) + return + } // Collect the output output, _ := circbuf.NewBuffer(CheckBufSize) @@ -140,7 +134,7 @@ func (c *CheckMonitor) check() { time.Sleep(30 * time.Second) errCh <- fmt.Errorf("Timed out running check '%s'", c.Script) }() - err := <-errCh + err = <-errCh // Get the output, add a message about truncation outputStr := string(output.Bytes()) diff --git a/command/agent/util.go b/command/agent/util.go index 8f6103a804..16b3b01907 100644 --- a/command/agent/util.go +++ b/command/agent/util.go @@ -3,6 +3,8 @@ package agent import ( "math" "math/rand" + "os/exec" + "runtime" "time" ) @@ -39,3 +41,17 @@ func strContains(l []string, s string) bool { } return false } + +// ExecScript returns a command to execute a script +func ExecScript(script string) (*exec.Cmd, error) { + var shell, flag string + if runtime.GOOS == "windows" { + shell = "cmd" + flag = "/C" + } else { + shell = "/bin/sh" + flag = "-c" + } + cmd := exec.Command(shell, flag, script) + return cmd, nil +} diff --git a/command/agent/watch_handler.go b/command/agent/watch_handler.go index ef9f8a9cf4..afc4fb94d2 100644 --- a/command/agent/watch_handler.go +++ b/command/agent/watch_handler.go @@ -7,8 +7,6 @@ import ( "io" "log" "os" - "os/exec" - "runtime" "strconv" "github.com/armon/circbuf" @@ -39,18 +37,12 @@ func makeWatchHandler(logOutput io.Writer, params interface{}) watch.HandlerFunc script := params.(string) logger := log.New(logOutput, "", log.LstdFlags) fn := func(idx uint64, data interface{}) { - // Determine the shell invocation based on OS - var shell, flag string - if runtime.GOOS == "windows" { - shell = "cmd" - flag = "/C" - } else { - shell = "/bin/sh" - flag = "-c" - } - // Create the command - cmd := exec.Command(shell, flag, script) + cmd, err := ExecScript(script) + if err != nil { + logger.Printf("[ERR] agent: Failed to setup watch: %v", err) + return + } cmd.Env = append(os.Environ(), "CONSUL_INDEX="+strconv.FormatUint(idx, 10), ) diff --git a/command/agent/watch_handler_test.go b/command/agent/watch_handler_test.go new file mode 100644 index 0000000000..28f1e425f5 --- /dev/null +++ b/command/agent/watch_handler_test.go @@ -0,0 +1,44 @@ +package agent + +import ( + "io/ioutil" + "os" + "testing" +) + +func TestVerifyWatchHandler(t *testing.T) { + if err := verifyWatchHandler(nil); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler(123); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler([]string{"foo"}); err == nil { + t.Fatalf("should err") + } + if err := verifyWatchHandler("foo"); err != nil { + t.Fatalf("err: %v", err) + } +} + +func TestMakeWatchHandler(t *testing.T) { + defer os.Remove("handler_out") + defer os.Remove("handler_index_out") + script := "echo $CONSUL_INDEX >> handler_index_out && cat >> handler_out" + handler := makeWatchHandler(os.Stderr, script) + handler(100, []string{"foo", "bar", "baz"}) + raw, err := ioutil.ReadFile("handler_out") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(raw) != "[\"foo\",\"bar\",\"baz\"]\n" { + t.Fatalf("bad: %s", raw) + } + raw, err = ioutil.ReadFile("handler_index_out") + if err != nil { + t.Fatalf("err: %v", err) + } + if string(raw) != "100\n" { + t.Fatalf("bad: %s", raw) + } +} From dc5dee5ce4f2b416b12e75c0b5d28d4985caf0b7 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 16:02:41 -0700 Subject: [PATCH 16/20] command/watch: First pass at command --- command/rpc.go | 15 ++++ command/watch.go | 211 +++++++++++++++++++++++++++++++++++++++++++++++ commands.go | 7 ++ 3 files changed, 233 insertions(+) create mode 100644 command/watch.go diff --git a/command/rpc.go b/command/rpc.go index 7b4411b2a6..97a31d083c 100644 --- a/command/rpc.go +++ b/command/rpc.go @@ -2,6 +2,7 @@ package command import ( "flag" + "github.com/armon/consul-api" "github.com/hashicorp/consul/command/agent" ) @@ -16,3 +17,17 @@ func RPCAddrFlag(f *flag.FlagSet) *string { func RPCClient(addr string) (*agent.RPCClient, error) { return agent.NewRPCClient(addr) } + +// HTTPAddrFlag returns a pointer to a string that will be populated +// when the given flagset is parsed with the HTTP address of the Consul. +func HTTPAddrFlag(f *flag.FlagSet) *string { + return f.String("http-addr", "127.0.0.1:8500", + "HTTP address of the Consul agent") +} + +// HTTPClient returns a new Consul HTTP client with the given address. +func HTTPClient(addr string) (*consulapi.Client, error) { + conf := consulapi.DefaultConfig() + conf.Address = addr + return consulapi.NewClient(conf) +} diff --git a/command/watch.go b/command/watch.go new file mode 100644 index 0000000000..30bb57fab6 --- /dev/null +++ b/command/watch.go @@ -0,0 +1,211 @@ +package command + +import ( + "bytes" + "encoding/json" + "flag" + "fmt" + "os" + "strconv" + "strings" + + "github.com/hashicorp/consul/command/agent" + "github.com/hashicorp/consul/watch" + "github.com/mitchellh/cli" +) + +// WatchCommand is a Command implementation that is used to setup +// a "watch" which uses a sub-process +type WatchCommand struct { + ShutdownCh <-chan struct{} + Ui cli.Ui +} + +func (c *WatchCommand) Help() string { + helpText := ` +Usage: consul watch [options] [child...] + + Watches for changes in a given data view from Consul. If a child process + is specified, it will be invoked with the latest results on changes. Otherwise, + the latest values are dumped to stdout and the watch terminates. + + Providing the watch type is required, and other parameters may be required + or supported depending on the watch type. + +Options: + + -http-addr=127.0.0.1:8500 HTTP address of the Consul agent. + -datacenter="" Datacenter to query. Defaults to that of agent. + -token="" ACL token to use. Defaults to that of agent. + +Watch Specification: + + -key=val Specifies the key to watch. Only for 'key' type. + -passingonly=[true|false] Specifies if only hosts passing all checks are displayed. + Optional for 'service' type. Defaults false. + -prefix=val Specifies the key prefix to watch. Only for 'keyprefix' type. + -service=val Specifies the service to watch. Required for 'service' type, + optional for 'checks' type. + -state=val Specifies the states to watch. Optional for 'checks' type. + -tag=val Specifies the service tag to filter on. Optional for 'service' + type. + -type=val Specifies the watch type. One of key, keyprefix + services, nodes, service, or checks. +` + return strings.TrimSpace(helpText) +} + +func (c *WatchCommand) Run(args []string) int { + var watchType, datacenter, token, key, prefix, service, tag, passingOnly, state string + cmdFlags := flag.NewFlagSet("watch", flag.ContinueOnError) + cmdFlags.Usage = func() { c.Ui.Output(c.Help()) } + cmdFlags.StringVar(&watchType, "type", "", "") + cmdFlags.StringVar(&datacenter, "datacenter", "", "") + cmdFlags.StringVar(&token, "token", "", "") + cmdFlags.StringVar(&key, "key", "", "") + cmdFlags.StringVar(&prefix, "prefix", "", "") + cmdFlags.StringVar(&service, "service", "", "") + cmdFlags.StringVar(&tag, "tag", "", "") + cmdFlags.StringVar(&passingOnly, "passingonly", "", "") + cmdFlags.StringVar(&state, "state", "", "") + httpAddr := HTTPAddrFlag(cmdFlags) + if err := cmdFlags.Parse(args); err != nil { + return 1 + } + + // Check for a type + if watchType == "" { + c.Ui.Error("Watch type must be specified") + c.Ui.Error("") + c.Ui.Error(c.Help()) + return 1 + } + + // Grab the script to execute if any + script := strings.Join(cmdFlags.Args(), " ") + + // Compile the watch parameters + params := make(map[string]interface{}) + if watchType != "" { + params["type"] = watchType + } + if datacenter != "" { + params["datacenter"] = datacenter + } + if token != "" { + params["token"] = token + } + if key != "" { + params["key"] = key + } + if prefix != "" { + params["prefix"] = prefix + } + if service != "" { + params["service"] = service + } + if tag != "" { + params["tag"] = tag + } + if state != "" { + params["state"] = state + } + if passingOnly != "" { + b, err := strconv.ParseBool(passingOnly) + if err != nil { + c.Ui.Error(fmt.Sprintf("Failed to parse passingonly flag: %s", err)) + return 1 + } + params["passingonly"] = b + } + + // Create the watch + wp, err := watch.Parse(params) + if err != nil { + c.Ui.Error(fmt.Sprintf("%s", err)) + return 1 + } + + // Create and test the HTTP client + client, err := HTTPClient(*httpAddr) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error connecting to Consul agent: %s", err)) + return 1 + } + _, err = client.Agent().NodeName() + if err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Setup handler + errExit := false + if script == "" { + wp.Handler = func(idx uint64, data interface{}) { + defer wp.Stop() + buf, err := json.MarshalIndent(data, "", " ") + if err != nil { + c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err)) + errExit = true + } + c.Ui.Output(string(buf)) + } + } else { + wp.Handler = func(idx uint64, data interface{}) { + // Create the command + var buf bytes.Buffer + var err error + cmd, err := agent.ExecScript(script) + if err != nil { + c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err)) + goto ERR + } + cmd.Env = append(os.Environ(), + "CONSUL_INDEX="+strconv.FormatUint(idx, 10), + ) + + // Encode the input + if err = json.NewEncoder(&buf).Encode(data); err != nil { + c.Ui.Error(fmt.Sprintf("Error encoding output: %s", err)) + goto ERR + } + cmd.Stdin = &buf + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + // Run the handler + if err := cmd.Run(); err != nil { + c.Ui.Error(fmt.Sprintf("Error executing handler: %s", err)) + goto ERR + } + return + ERR: + wp.Stop() + errExit = true + } + } + + // Watch for a shutdown + go func() { + <-c.ShutdownCh + wp.Stop() + os.Exit(0) + }() + + // Run the watch + if err := wp.Run(*httpAddr); err != nil { + c.Ui.Error(fmt.Sprintf("Error querying Consul agent: %s", err)) + return 1 + } + + // Handle an error exit + if errExit { + return 1 + } else { + return 0 + } +} + +func (c *WatchCommand) Synopsis() string { + return "Watch for changes in Consul" +} diff --git a/commands.go b/commands.go index 1bb6af8941..1b98e013af 100644 --- a/commands.go +++ b/commands.go @@ -82,6 +82,13 @@ func init() { Ui: ui, }, nil }, + + "watch": func() (cli.Command, error) { + return &command.WatchCommand{ + ShutdownCh: makeShutdownCh(), + Ui: ui, + }, nil + }, } } From 0cd9faf3a20adc9a8174f87dd407119c1bb68172 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 16:08:21 -0700 Subject: [PATCH 17/20] command/watch: Adding tests --- command/util_test.go | 33 +++++++++++++++++++++++---------- command/watch_test.go | 29 +++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+), 10 deletions(-) create mode 100644 command/watch_test.go diff --git a/command/util_test.go b/command/util_test.go index 8d492b1733..0366f760bb 100644 --- a/command/util_test.go +++ b/command/util_test.go @@ -22,16 +22,19 @@ func init() { } type agentWrapper struct { - dir string - config *agent.Config - agent *agent.Agent - rpc *agent.AgentRPC - addr string + dir string + config *agent.Config + agent *agent.Agent + rpc *agent.AgentRPC + http *agent.HTTPServer + addr string + httpAddr string } func (a *agentWrapper) Shutdown() { a.rpc.Shutdown() a.agent.Shutdown() + a.http.Shutdown() os.RemoveAll(a.dir) } @@ -59,12 +62,22 @@ func testAgent(t *testing.T) *agentWrapper { } rpc := agent.NewAgentRPC(a, l, mult, lw) + + httpAddr := fmt.Sprintf("127.0.0.1:%d", conf.Ports.HTTP) + http, err := agent.NewHTTPServer(a, "", false, os.Stderr, httpAddr) + if err != nil { + os.RemoveAll(dir) + t.Fatalf(fmt.Sprintf("err: %v", err)) + } + return &agentWrapper{ - dir: dir, - config: conf, - agent: a, - rpc: rpc, - addr: l.Addr().String(), + dir: dir, + config: conf, + agent: a, + rpc: rpc, + http: http, + addr: l.Addr().String(), + httpAddr: httpAddr, } } diff --git a/command/watch_test.go b/command/watch_test.go new file mode 100644 index 0000000000..eaa9376d5a --- /dev/null +++ b/command/watch_test.go @@ -0,0 +1,29 @@ +package command + +import ( + "github.com/mitchellh/cli" + "strings" + "testing" +) + +func TestWatchCommand_implements(t *testing.T) { + var _ cli.Command = &WatchCommand{} +} + +func TestWatchCommandRun(t *testing.T) { + a1 := testAgent(t) + defer a1.Shutdown() + + ui := new(cli.MockUi) + c := &WatchCommand{Ui: ui} + args := []string{"-http-addr=" + a1.httpAddr, "-type=nodes"} + + code := c.Run(args) + if code != 0 { + t.Fatalf("bad: %d. %#v", code, ui.ErrorWriter.String()) + } + + if !strings.Contains(ui.OutputWriter.String(), a1.config.NodeName) { + t.Fatalf("bad: %#v", ui.OutputWriter.String()) + } +} From 94615b0b7e066649d39577f4d9998c6eb9574d03 Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 17:24:20 -0700 Subject: [PATCH 18/20] watch: Fixing bug with null keys --- watch/plan.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/watch/plan.go b/watch/plan.go index a6dc057730..f0c4e9ff7c 100644 --- a/watch/plan.go +++ b/watch/plan.go @@ -80,8 +80,9 @@ OUTER: } // Update the index, look for change + oldIndex := p.lastIndex p.lastIndex = index - if reflect.DeepEqual(p.lastResult, result) { + if oldIndex != 0 && reflect.DeepEqual(p.lastResult, result) { continue } From dce716f4b2f0f80a6b3aa661b604f3916b6b182a Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 17:25:42 -0700 Subject: [PATCH 19/20] website: Document watches --- .../source/docs/agent/options.html.markdown | 13 +- .../source/docs/agent/watches.html.markdown | 286 ++++++++++++++++++ .../source/docs/commands/index.html.markdown | 1 + .../source/docs/commands/watch.html.markdown | 53 ++++ website/source/layouts/docs.erb | 8 + 5 files changed, 360 insertions(+), 1 deletion(-) create mode 100644 website/source/docs/agent/watches.html.markdown create mode 100644 website/source/docs/commands/watch.html.markdown diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index 4e6d66590b..bc37837ccb 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -145,7 +145,13 @@ definitions support being updated during a reload. "data_dir": "/opt/consul", "log_level": "INFO", "node_name": "foobar", - "server": true + "server": true, + "watches": [ + { + "type": "checks", + "handler": "/usr/bin/health-check-handler.sh" + } + ] } @@ -316,6 +322,11 @@ definitions support being updated during a reload. However, because the caches are not actively invalidated, ACL policy may be stale up to the TTL value. +* `watches` - Watches is a list of watch specifications. + These allow an external process to be automatically invoked when a particular + data view is updated. See the [watch documentation](/docs/agent/watches.html) for + more documentation. Watches can be modified when the configuration is reloaded. + ## Ports Used Consul requires up to 5 different ports to work properly, some requiring diff --git a/website/source/docs/agent/watches.html.markdown b/website/source/docs/agent/watches.html.markdown new file mode 100644 index 0000000000..790be11cec --- /dev/null +++ b/website/source/docs/agent/watches.html.markdown @@ -0,0 +1,286 @@ +--- +layout: "docs" +page_title: "Watches" +sidebar_current: "docs-agent-watches" +--- + +# Watches + +Watches are a way of specifying a view of data (list of nodes, KV pairs, +health checks, etc) which is monitored for any updates. When an update +is detected, an external handler handler is invoked. A handler can be any +executable. As an example, you could watch the status of health checks and +notify an external system when a check is critical. + +Watches are implemented using blocking queries in the [HTTP API](/docs/agent/http.html). +Agent's automatically make the proper API calls to watch for changes, +and inform a handler when the data view has updated. + +Watches can can be configured as part of the [agent's configuration](/docs/agent/options.html), +causing them to run once the agent is initialized. Reloading the agent configuration +allows for adding or removing watches dynamically. + +Alternatively, the [watch command](/docs/commands/watch.html) enables a watch to be +started outside of the agent. This can be used by an operator to inspect data in Consul, +or to easily pipe data into processes without being tied to the agent lifecycle. + +In either case, the `type` of the watch must be specified. Each type of watch +supports different parameters, both required and optional. These options are specified +in a JSON body when using agent configuration, or as CLI flags for the watch command. + +## Handlers + +The watch specifiation specifies the view of data to be monitored. +Once that view is updated the specified handler is invoked. The handler +can be any executable. + +A handler should read it's input from stdin, and expect to read +JSON formatted data. The format of the data depends on the type of the +watch. Each watch type documents the format type, and because they +map directly to an HTTP API, handlers should expect the input to +match the format of the API. + +Additionally, the `CONSUL_INDEX` environmental variable will be set. +This maps to the `X-Consul-Index` value from the [HTTP API](/docs/agent/http.html). + +## Global Parameters + +In addition to the parameters supported by each option type, there +are a few global parameters that all watches support: + +* `datacenter` - Can be provided to override the agent's default datacenter. +* `token` - Can be provided to override the agent's default ACL token. +* `handler` - The handler to invoke when the data view updates. + +## Watch Types + +The following types are supported, with more documentation below: + +* `key` - Watch a specific KV pair +* `keyprefix` - Watch a prefix in the KV store +* `services` - Watch the list of available services +* `nodes` - Watch the list of nodes +* `service`- Watch the instances of a service +* `checks` - Watch the value of health checks + + +### Type: key + +The "key" watch type is used to watch a specific key in the KV store. +It requires that the "key" parameter be specified. + +This maps to the `/v1/kv/` API internally. + +Here is an example configuration: + + { + "type": "key", + "key": "foo/bar/baz", + "handler": "/usr/bin/my-key-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type key -key foo/bar/baz /usr/bin/my-key-handler.sh + +An example of the output of this command: + + { + "Key": "foo/bar/baz", + "CreateIndex": 1793, + "ModifyIndex": 1793, + "LockIndex": 0, + "Flags": 0, + "Value": "aGV5", + "Session": "" + } + +### Type: keyprefix + +The "keyprefix" watch type is used to watch a prefix of keys in the KV store. +It requires that the "prefix" parameter be specified. + +This maps to the `/v1/kv/` API internally. + +Here is an example configuration: + + { + "type": "keyprefix", + "prefix": "foo/", + "handler": "/usr/bin/my-prefix-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type keyprefix -prefix foo/ /usr/bin/my-prefix-handler.sh + +An example of the output of this command: + + [ + { + "Key": "foo/bar", + "CreateIndex": 1796, + "ModifyIndex": 1796, + "LockIndex": 0, + "Flags": 0, + "Value": "TU9BUg==", + "Session": "" + }, + { + "Key": "foo/baz", + "CreateIndex": 1795, + "ModifyIndex": 1795, + "LockIndex": 0, + "Flags": 0, + "Value": "YXNkZg==", + "Session": "" + }, + { + "Key": "foo/test", + "CreateIndex": 1793, + "ModifyIndex": 1793, + "LockIndex": 0, + "Flags": 0, + "Value": "aGV5", + "Session": "" + } + ] + + +### Type: services + +The "services" watch type is used to watch the list of available +services. It has no parameters. + +This maps to the `/v1/catalog/services` API internally. + +An example of the output of this command: + + { + "consul": [], + "redis": [], + "web": [] + } + +### Type: nodes + +The "nodes" watch type is used to watch the list of available +nodes. It has no parameters. + +This maps to the `/v1/catalog/nodes` API internally. + +An example of the output of this command: + + [ + { + "Node": "nyc1-consul-1", + "Address": "192.241.159.115" + }, + { + "Node": "nyc1-consul-2", + "Address": "192.241.158.205" + }, + { + "Node": "nyc1-consul-3", + "Address": "198.199.77.133" + }, + { + "Node": "nyc1-worker-1", + "Address": "162.243.162.228" + }, + { + "Node": "nyc1-worker-2", + "Address": "162.243.162.226" + }, + { + "Node": "nyc1-worker-3", + "Address": "162.243.162.229" + } + ] + +### Type: service + +The "service" watch type is used to monitor the providers +of a single service. It requires the "service" parameter, +but optionally takes "tag" and "passingonly". The "tag" parameter +will filter by tag, and "passingonly" is a boolean that will +filter to only the instances passing all health checks. + +This maps to the `/v1/health/service` API internally. + +Here is an example configuration: + + { + "type": "service", + "key": "redis", + "handler": "/usr/bin/my-service-handler.sh" + } + +Or, using the watch command: + + $ consul watch -type service -service redis /usr/bin/my-service-handler.sh + +An example of the output of this command: + + [ + { + "Node": { + "Node": "foobar", + "Address": "10.1.10.12" + }, + "Service": { + "ID": "redis", + "Service": "redis", + "Tags": null, + "Port": 8000 + }, + "Checks": [ + { + "Node": "foobar", + "CheckID": "service:redis", + "Name": "Service 'redis' check", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "redis", + "ServiceName": "redis" + }, + { + "Node": "foobar", + "CheckID": "serfHealth", + "Name": "Serf Health Status", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "", + "ServiceName": "" + } + ] + } + ] + +### Type: checks + +The "checks" watch type is used to monitor the checks of a given +service or in a specific state. It optionally takes the "service" +parameter to filter to a specific service, or "state" to filter +to a specific state. By default, it will watch all checks. + +This maps to the `/v1/health/state/` API if monitoring by state, +or `/v1/health/checks/` if monitoring by service. + +An example of the output of this command: + + [ + { + "Node": "foobar", + "CheckID": "service:redis", + "Name": "Service 'redis' check", + "Status": "passing", + "Notes": "", + "Output": "", + "ServiceID": "redis", + "ServiceName": "redis" + } + ] + diff --git a/website/source/docs/commands/index.html.markdown b/website/source/docs/commands/index.html.markdown index afc3d62105..3c43cac470 100644 --- a/website/source/docs/commands/index.html.markdown +++ b/website/source/docs/commands/index.html.markdown @@ -34,6 +34,7 @@ Available commands are: monitor Stream logs from a Consul agent reload Triggers the agent to reload configuration files version Prints the Consul version + watch Watch for changes in Consul ``` To get help for any specific command, pass the `-h` flag to the relevant diff --git a/website/source/docs/commands/watch.html.markdown b/website/source/docs/commands/watch.html.markdown new file mode 100644 index 0000000000..7f565a3594 --- /dev/null +++ b/website/source/docs/commands/watch.html.markdown @@ -0,0 +1,53 @@ +--- +layout: "docs" +page_title: "Commands: Watch" +sidebar_current: "docs-commands-watch" +--- + +# Consul Watch + +Command: `consul watch` + +The watch command provides a mechanism to watch for changes in a particular +data view (list of nodes, service members, key value, etc) and to invoke +a process with the latest values of the view. If no process is specified, +the current values are dumped to stdout which can be a useful way to inspect +data in Consul. + +There is more [documentation on watches here](/docs/agent/watches.html). + +## Usage + +Usage: `consul watch [options] [child...]` + +The only required option is `-type` which specifies the particular +data view. Depending on the type, various options may be required +or optionally provided. There is more documentation on watch +[specifications here](/docs/agent/watches.html). + +The list of available flags are: + +* `-http-addr` - Address to the HTTP server of the agent you want to contact + to send this command. If this isn't specified, the command will contact + "127.0.0.1:8500" which is the default HTTP address of a Consul agent. + +* `-datacenter` - Datacenter to query. Defaults to that of agent. + +* `-token` - ACL token to use. Defaults to that of agent. + +* `-key` - Key to watch. Only for `key` type. + +* `-passingonly=[true|false]` - Should only passing entries be returned. Default false. + only for `service` type. + +* `-prefix` - Key prefix to watch. Only for `keyprefix` type. + +* `-service` - Service to watch. Required for `service` type, optional for `checks` type. + +* `-state` - Check state to filter on. Optional for `checks` type. + +* `-tag` - Service tag to filter on. Optional for `service` type. + +* `-type` - Watch type. Required, one of "key", "keyprefix", "services", + "nodes", "services", or "checks". + diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index fcad0930ea..427a7be74c 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -89,6 +89,10 @@ > reload + + + > + watch @@ -130,6 +134,10 @@ > Telemetry + + + > + Watches From 54316d2b3066e7cb0a579ce6e1599e3c76d412df Mon Sep 17 00:00:00 2001 From: Armon Dadgar Date: Thu, 21 Aug 2014 17:35:05 -0700 Subject: [PATCH 20/20] website: Minor cleanups --- website/source/docs/agent/watches.html.markdown | 2 +- website/source/intro/getting-started/install.html.markdown | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/website/source/docs/agent/watches.html.markdown b/website/source/docs/agent/watches.html.markdown index 790be11cec..a3b03157a6 100644 --- a/website/source/docs/agent/watches.html.markdown +++ b/website/source/docs/agent/watches.html.markdown @@ -8,7 +8,7 @@ sidebar_current: "docs-agent-watches" Watches are a way of specifying a view of data (list of nodes, KV pairs, health checks, etc) which is monitored for any updates. When an update -is detected, an external handler handler is invoked. A handler can be any +is detected, an external handler is invoked. A handler can be any executable. As an example, you could watch the status of health checks and notify an external system when a check is critical. diff --git a/website/source/intro/getting-started/install.html.markdown b/website/source/intro/getting-started/install.html.markdown index 4b72e73ce8..9ef7acaaa3 100644 --- a/website/source/intro/getting-started/install.html.markdown +++ b/website/source/intro/getting-started/install.html.markdown @@ -57,6 +57,7 @@ Available commands are: members Lists the members of a Consul cluster monitor Stream logs from a Consul agent version Prints the Consul version + watch Watch for changes in Consul ``` If you get an error that `consul` could not be found, then your PATH