mirror of https://github.com/hashicorp/consul
watch: supporting service watch
parent
8d70128761
commit
f3c8873009
|
@ -2,6 +2,7 @@ package watch
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
|
||||||
"github.com/armon/consul-api"
|
"github.com/armon/consul-api"
|
||||||
)
|
)
|
||||||
|
@ -19,19 +20,20 @@ func init() {
|
||||||
"keyprefix": keyPrefixWatch,
|
"keyprefix": keyPrefixWatch,
|
||||||
"services": servicesWatch,
|
"services": servicesWatch,
|
||||||
"nodes": nodesWatch,
|
"nodes": nodesWatch,
|
||||||
"service": nil,
|
"service": serviceWatch,
|
||||||
"checks": nil,
|
"checks": nil,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// keyWatch is used to return a key watching function
|
// keyWatch is used to return a key watching function
|
||||||
func keyWatch(params map[string][]string) (WatchFunc, error) {
|
func keyWatch(params map[string][]string) (WatchFunc, error) {
|
||||||
keys := params["key"]
|
var key string
|
||||||
delete(params, "key")
|
if err := assignValue(params, "key", &key); err != nil {
|
||||||
if len(keys) != 1 {
|
return nil, err
|
||||||
|
}
|
||||||
|
if key == "" {
|
||||||
return nil, fmt.Errorf("Must specify a single key to watch")
|
return nil, fmt.Errorf("Must specify a single key to watch")
|
||||||
}
|
}
|
||||||
key := keys[0]
|
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
|
@ -50,12 +52,13 @@ func keyWatch(params map[string][]string) (WatchFunc, error) {
|
||||||
|
|
||||||
// keyPrefixWatch is used to return a key prefix watching function
|
// keyPrefixWatch is used to return a key prefix watching function
|
||||||
func keyPrefixWatch(params map[string][]string) (WatchFunc, error) {
|
func keyPrefixWatch(params map[string][]string) (WatchFunc, error) {
|
||||||
list := params["prefix"]
|
var prefix string
|
||||||
delete(params, "prefix")
|
if err := assignValue(params, "prefix", &prefix); err != nil {
|
||||||
if len(list) != 1 {
|
return nil, err
|
||||||
|
}
|
||||||
|
if prefix == "" {
|
||||||
return nil, fmt.Errorf("Must specify a single prefix to watch")
|
return nil, fmt.Errorf("Must specify a single prefix to watch")
|
||||||
}
|
}
|
||||||
prefix := list[0]
|
|
||||||
|
|
||||||
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
||||||
kv := p.client.KV()
|
kv := p.client.KV()
|
||||||
|
@ -96,3 +99,41 @@ func nodesWatch(params map[string][]string) (WatchFunc, error) {
|
||||||
}
|
}
|
||||||
return fn, nil
|
return fn, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// serviceWatch is used to watch a specific service for changes
|
||||||
|
func serviceWatch(params map[string][]string) (WatchFunc, error) {
|
||||||
|
var service, tag, passingRaw string
|
||||||
|
if err := assignValue(params, "service", &service); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if service == "" {
|
||||||
|
return nil, fmt.Errorf("Must specify a single service to watch")
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := assignValue(params, "tag", &tag); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := assignValue(params, "passingonly", &passingRaw); err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
passingOnly := false
|
||||||
|
if passingRaw != "" {
|
||||||
|
b, err := strconv.ParseBool(passingRaw)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("Failed to parse passingonly value: %v", err)
|
||||||
|
}
|
||||||
|
passingOnly = b
|
||||||
|
}
|
||||||
|
|
||||||
|
fn := func(p *WatchPlan) (uint64, interface{}, error) {
|
||||||
|
health := p.client.Health()
|
||||||
|
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
|
||||||
|
nodes, meta, err := health.Service(service, tag, passingOnly, &opts)
|
||||||
|
if err != nil {
|
||||||
|
return 0, nil, err
|
||||||
|
}
|
||||||
|
return meta.LastIndex, nodes, err
|
||||||
|
}
|
||||||
|
return fn, nil
|
||||||
|
}
|
||||||
|
|
|
@ -215,3 +215,52 @@ func TestNodesWatch(t *testing.T) {
|
||||||
t.Fatalf("bad: %v", invoke)
|
t.Fatalf("bad: %v", invoke)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestServiceWatch(t *testing.T) {
|
||||||
|
if consulAddr == "" {
|
||||||
|
t.Skip()
|
||||||
|
}
|
||||||
|
plan := mustParse(t, "type:service service:foo tag:bar passingonly:true")
|
||||||
|
invoke := 0
|
||||||
|
plan.Handler = func(idx uint64, raw interface{}) {
|
||||||
|
if invoke == 0 {
|
||||||
|
if raw == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
v, ok := raw.([]*consulapi.ServiceEntry)
|
||||||
|
if ok && len(v) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if !ok || v[0].Service.ID != "foo" {
|
||||||
|
t.Fatalf("Bad: %#v", raw)
|
||||||
|
}
|
||||||
|
invoke++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
|
||||||
|
agent := plan.client.Agent()
|
||||||
|
reg := &consulapi.AgentServiceRegistration{
|
||||||
|
ID: "foo",
|
||||||
|
Name: "foo",
|
||||||
|
Tags: []string{"bar"},
|
||||||
|
}
|
||||||
|
agent.ServiceRegister(reg)
|
||||||
|
|
||||||
|
time.Sleep(20 * time.Millisecond)
|
||||||
|
plan.Stop()
|
||||||
|
|
||||||
|
agent.ServiceDeregister("foo")
|
||||||
|
}()
|
||||||
|
|
||||||
|
err := plan.Run(consulAddr)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if invoke == 0 {
|
||||||
|
t.Fatalf("bad: %v", invoke)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue