watch: service watcher

pull/298/head
Armon Dadgar 2014-08-20 15:29:31 -07:00
parent 970c606f1a
commit 00358baa7f
2 changed files with 64 additions and 0 deletions

View File

@ -17,6 +17,10 @@ func init() {
watchFuncFactory = map[string]watchFactory{
"key": keyWatch,
"keyprefix": keyPrefixWatch,
"services": servicesWatch,
"nodes": nil,
"service": nil,
"checks": nil,
}
}
@ -64,3 +68,17 @@ func keyPrefixWatch(params map[string][]string) (WatchFunc, error) {
}
return fn, nil
}
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string][]string) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
catalog := p.client.Catalog()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
services, meta, err := catalog.Services(&opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, services, err
}
return fn, nil
}

View File

@ -80,6 +80,9 @@ func TestKeyPrefixWatch(t *testing.T) {
return
}
v, ok := raw.(consulapi.KVPairs)
if ok && v == nil {
return
}
if !ok || v == nil || string(v[0].Key) != "foo/bar" {
t.Fatalf("Bad: %#v", raw)
}
@ -120,3 +123,46 @@ func TestKeyPrefixWatch(t *testing.T) {
t.Fatalf("bad: %v", invoke)
}
}
func TestServicesWatch(t *testing.T) {
if consulAddr == "" {
t.Skip()
}
plan := mustParse(t, "type:services")
invoke := 0
plan.Handler = func(idx uint64, raw interface{}) {
if invoke == 0 {
if raw == nil {
return
}
v, ok := raw.(map[string][]string)
if !ok || v["consul"] == nil {
t.Fatalf("Bad: %#v", raw)
}
invoke++
}
}
go func() {
time.Sleep(20 * time.Millisecond)
plan.Stop()
agent := plan.client.Agent()
reg := &consulapi.AgentServiceRegistration{
ID: "foo",
Name: "foo",
}
agent.ServiceRegister(reg)
time.Sleep(20 * time.Millisecond)
agent.ServiceDeregister("foo")
}()
err := plan.Run(consulAddr)
if err != nil {
t.Fatalf("err: %v", err)
}
if invoke == 0 {
t.Fatalf("bad: %v", invoke)
}
}