watch: Testing plan execution

pull/298/head
Armon Dadgar 10 years ago
parent 2b07355b94
commit 66edf0075a

@ -0,0 +1,41 @@
package watch
import (
"fmt"
"github.com/armon/consul-api"
)
// watchFactory is a function that can create a new WatchFunc
// from a parameter configuration
type watchFactory func(params map[string][]string) (WatchFunc, error)
// watchFuncFactory maps each type to a factory function
var watchFuncFactory map[string]watchFactory
func init() {
watchFuncFactory = map[string]watchFactory{
"key": keyWatch,
}
}
// keyWatch is used to return a key watching function
func keyWatch(params map[string][]string) (WatchFunc, error) {
keys := params["key"]
delete(params, "key")
if len(keys) != 1 {
return nil, fmt.Errorf("Must specify a single key to watch")
}
key := keys[0]
fn := func(p *WatchPlan) (uint64, interface{}, error) {
kv := p.client.KV()
opts := consulapi.QueryOptions{WaitIndex: p.lastIndex}
pair, meta, err := kv.Get(key, &opts)
if err != nil {
return 0, nil, err
}
return meta.LastIndex, pair, err
}
return fn, nil
}

@ -0,0 +1,104 @@
package watch
import (
"fmt"
"log"
"reflect"
"time"
"github.com/armon/consul-api"
)
const (
// retryInterval is the base retry value
retryInterval = 5 * time.Second
// maximum back off time, this is to prevent
// exponential runaway
maxBackoffTime = 180 * time.Second
)
// Run is used to run a watch plan
func (p *WatchPlan) Run(address string) error {
// Setup the client
p.address = address
conf := consulapi.DefaultConfig()
conf.Address = address
conf.Datacenter = p.Datacenter
// TODO: conf.Token = p.Token
client, err := consulapi.NewClient(conf)
if err != nil {
return fmt.Errorf("Failed to connect to agent: %v", err)
}
p.client = client
// Loop until we are canceled
failures := 0
for !p.shouldStop() {
// Invoke the handler
index, result, err := p.Func(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
}
// Handle an error in the watch function
if err != nil {
log.Printf("consul.watch: Watch '%s' errored: %v", p.Query, err)
// Perform an exponential backoff
failures++
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
select {
case <-time.After(retry):
continue
case <-p.stopCh:
return nil
}
}
// Clear the failures
failures = 0
// If the index is unchanged do nothing
if index == p.lastIndex {
continue
}
// Update the index, look for change
p.lastIndex = index
if reflect.DeepEqual(p.lastResult, result) {
continue
}
// Handle the updated result
p.lastResult = result
p.Handler(index, result)
}
return nil
}
// Stop is used to stop running the watch plan
func (p *WatchPlan) Stop() {
p.stopLock.Lock()
defer p.stopLock.Unlock()
if p.stop {
return
}
p.stop = true
close(p.stopCh)
}
func (p *WatchPlan) shouldStop() bool {
select {
case <-p.stopCh:
return true
default:
return false
}
}

@ -0,0 +1,48 @@
package watch
import (
"testing"
"time"
)
func init() {
watchFuncFactory["noop"] = noopWatch
}
func noopWatch(params map[string][]string) (WatchFunc, error) {
fn := func(p *WatchPlan) (uint64, interface{}, error) {
idx := p.lastIndex + 1
return idx, idx, nil
}
return fn, nil
}
func TestRun_Stop(t *testing.T) {
plan, err := Parse("type:noop")
if err != nil {
t.Fatalf("err: %v", err)
}
var expect uint64 = 1
plan.Handler = func(idx uint64, val interface{}) {
if idx != expect {
t.Fatalf("Bad: %d %d", expect, idx)
}
if val != expect {
t.Fatalf("Bad: %d %d", expect, val)
}
expect++
}
time.AfterFunc(10*time.Millisecond, func() {
plan.Stop()
})
err = plan.Run("127.0.0.1:8500")
if err != nil {
t.Fatalf("err: %v", err)
}
if expect == 1 {
t.Fatalf("Bad: %d", expect)
}
}

@ -3,6 +3,9 @@ package watch
import (
"fmt"
"strings"
"sync"
"github.com/armon/consul-api"
)
// WatchPlan is the parsed version of a watch specification. A watch provides
@ -10,11 +13,29 @@ import (
// This view is watched for changes and a handler is invoked to take any
// appropriate actions.
type WatchPlan struct {
Query string
Datacenter string
Token string
Type string
Func WatchFunc
Handler HandlerFunc
address string
client *consulapi.Client
lastIndex uint64
lastResult interface{}
stop bool
stopCh chan struct{}
stopLock sync.Mutex
}
// WatchFunc is used to watch for a diff
type WatchFunc func(*WatchPlan) (uint64, interface{}, error)
// HandlerFunc is used to handle new data
type HandlerFunc func(uint64, interface{})
// Parse takes a watch query and compiles it into a WatchPlan or an error
func Parse(query string) (*WatchPlan, error) {
tokens, err := tokenize(query)
@ -22,21 +43,48 @@ func Parse(query string) (*WatchPlan, error) {
return nil, fmt.Errorf("Failed to parse: %v", err)
}
params := collapse(tokens)
plan := &WatchPlan{}
plan := &WatchPlan{
Query: query,
stopCh: make(chan struct{}),
}
// Parse the generic parameters
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
}
if err := assignValue(params, "token", &plan.Token); err != nil {
return nil, err
}
if err := assignValue(params, "type", &plan.Type); err != nil {
return nil, err
}
// Ensure there is a watch type
if plan.Type == "" {
return nil, fmt.Errorf("Watch type must be specified")
}
if err := assignValue(params, "datacenter", &plan.Datacenter); err != nil {
return nil, err
// Look for a factory function
factory := watchFuncFactory[plan.Type]
if factory == nil {
return nil, fmt.Errorf("Unsupported watch type: %s", plan.Type)
}
if err := assignValue(params, "token", &plan.Token); err != nil {
// Get the watch func
fn, err := factory(params)
if err != nil {
return nil, err
}
plan.Func = fn
// Ensure all parameters are consumed
if len(params) != 0 {
var bad []string
for key := range params {
bad = append(bad, key)
}
return nil, fmt.Errorf("Invalid parameters: %v", bad)
}
return plan, nil
}

@ -90,7 +90,7 @@ func TestCollapse(t *testing.T) {
}
func TestParseBasic(t *testing.T) {
p, err := Parse("type:key datacenter:dc2 token:12345")
p, err := Parse("type:key datacenter:dc2 token:12345 key:foo")
if err != nil {
t.Fatalf("err: %v", err)
}

Loading…
Cancel
Save