diff --git a/watch/funcs.go b/watch/funcs.go index d031d94a76..a63527af24 100644 --- a/watch/funcs.go +++ b/watch/funcs.go @@ -18,7 +18,7 @@ func init() { "key": keyWatch, "keyprefix": keyPrefixWatch, "services": servicesWatch, - "nodes": nil, + "nodes": nodesWatch, "service": nil, "checks": nil, } @@ -82,3 +82,17 @@ func servicesWatch(params map[string][]string) (WatchFunc, error) { } return fn, nil } + +// nodesWatch is used to watch the list of available nodes +func nodesWatch(params map[string][]string) (WatchFunc, error) { + fn := func(p *WatchPlan) (uint64, interface{}, error) { + catalog := p.client.Catalog() + opts := consulapi.QueryOptions{WaitIndex: p.lastIndex} + nodes, meta, err := catalog.Nodes(&opts) + if err != nil { + return 0, nil, err + } + return meta.LastIndex, nodes, err + } + return fn, nil +} diff --git a/watch/funcs_test.go b/watch/funcs_test.go index 1cfc06c6db..2b06d2a71a 100644 --- a/watch/funcs_test.go +++ b/watch/funcs_test.go @@ -166,3 +166,52 @@ func TestServicesWatch(t *testing.T) { t.Fatalf("bad: %v", invoke) } } + +func TestNodesWatch(t *testing.T) { + if consulAddr == "" { + t.Skip() + } + plan := mustParse(t, "type:nodes") + invoke := 0 + plan.Handler = func(idx uint64, raw interface{}) { + if invoke == 0 { + if raw == nil { + return + } + v, ok := raw.([]*consulapi.Node) + if !ok || len(v) == 0 { + t.Fatalf("Bad: %#v", raw) + } + invoke++ + } + } + + go func() { + time.Sleep(20 * time.Millisecond) + plan.Stop() + + catalog := plan.client.Catalog() + reg := &consulapi.CatalogRegistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Register(reg, nil) + time.Sleep(20 * time.Millisecond) + dereg := &consulapi.CatalogDeregistration{ + Node: "foobar", + Address: "1.1.1.1", + Datacenter: "dc1", + } + catalog.Deregister(dereg, nil) + }() + + err := plan.Run(consulAddr) + if err != nil { + t.Fatalf("err: %v", err) + } + + if invoke == 0 { + t.Fatalf("bad: %v", invoke) + } +}