@ -3,6 +3,7 @@ package watch
import (
"context"
"fmt"
"log"
consulapi "github.com/hashicorp/consul/api"
)
@ -16,13 +17,16 @@ var watchFuncFactory map[string]watchFactory
func init ( ) {
watchFuncFactory = map [ string ] watchFactory {
"key" : keyWatch ,
"keyprefix" : keyPrefixWatch ,
"services" : servicesWatch ,
"nodes" : nodesWatch ,
"service" : serviceWatch ,
"checks" : checksWatch ,
"event" : eventWatch ,
"key" : keyWatch ,
"keyprefix" : keyPrefixWatch ,
"services" : servicesWatch ,
"nodes" : nodesWatch ,
"service" : serviceWatch ,
"checks" : checksWatch ,
"event" : eventWatch ,
"connect_roots" : connectRootsWatch ,
"connect_leaf" : connectLeafWatch ,
"connect_proxy_config" : connectProxyConfigWatch ,
}
}
@ -40,18 +44,18 @@ func keyWatch(params map[string]interface{}) (WatcherFunc, error) {
if key == "" {
return nil , fmt . Errorf ( "Must specify a single key to watch" )
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
kv := p . client . KV ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
pair , meta , err := kv . Get ( key , & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
if pair == nil {
return meta . LastIndex , nil , err
return WaitIndexVal ( meta . LastIndex ) , nil , err
}
return meta . LastIndex , pair , err
return WaitIndexVal ( meta . LastIndex ) , pair , err
}
return fn , nil
}
@ -70,15 +74,15 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
if prefix == "" {
return nil , fmt . Errorf ( "Must specify a single prefix to watch" )
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
kv := p . client . KV ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
pairs , meta , err := kv . List ( prefix , & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
return meta . LastIndex , pairs , err
return WaitIndexVal ( meta . LastIndex ) , pairs , err
}
return fn , nil
}
@ -90,15 +94,15 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil , err
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
catalog := p . client . Catalog ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
services , meta , err := catalog . Services ( & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
return meta . LastIndex , services , err
return WaitIndexVal ( meta . LastIndex ) , services , err
}
return fn , nil
}
@ -110,15 +114,15 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil , err
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
catalog := p . client . Catalog ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
nodes , meta , err := catalog . Nodes ( & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
return meta . LastIndex , nodes , err
return WaitIndexVal ( meta . LastIndex ) , nodes , err
}
return fn , nil
}
@ -147,15 +151,15 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil , err
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
health := p . client . Health ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
nodes , meta , err := health . Service ( service , tag , passingOnly , & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
return meta . LastIndex , nodes , err
return WaitIndexVal ( meta . LastIndex ) , nodes , err
}
return fn , nil
}
@ -181,7 +185,7 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
state = "any"
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
health := p . client . Health ( )
opts := makeQueryOptionsWithContext ( p , stale )
defer p . cancelFunc ( )
@ -194,9 +198,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
checks , meta , err = health . Checks ( service , & opts )
}
if err != nil {
return 0 , nil , err
return nil , nil , err
}
return meta . LastIndex , checks , err
return WaitIndexVal ( meta . LastIndex ) , checks , err
}
return fn , nil
}
@ -210,23 +214,98 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil , err
}
fn := func ( p * Plan ) ( uint64 , interface { } , error ) {
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
event := p . client . Event ( )
opts := makeQueryOptionsWithContext ( p , false )
defer p . cancelFunc ( )
events , meta , err := event . List ( name , & opts )
if err != nil {
return 0 , nil , err
return nil , nil , err
}
// Prune to only the new events
for i := 0 ; i < len ( events ) ; i ++ {
if event . IDToIndex ( events [ i ] . ID ) == p . lastIndex {
if WaitIndexVal ( event . IDToIndex ( events [ i ] . ID ) ) . Equal ( p . lastParamVal ) {
events = events [ i + 1 : ]
break
}
}
return meta . LastIndex , events , err
return WaitIndexVal ( meta . LastIndex ) , events , err
}
return fn , nil
}
// connectRootsWatch is used to watch for changes to Connect Root certificates.
func connectRootsWatch ( params map [ string ] interface { } ) ( WatcherFunc , error ) {
// We don't support stale since roots are likely to be cached locally in the
// agent anyway.
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
agent := p . client . Agent ( )
opts := makeQueryOptionsWithContext ( p , false )
defer p . cancelFunc ( )
roots , meta , err := agent . ConnectCARoots ( & opts )
if err != nil {
return nil , nil , err
}
return WaitIndexVal ( meta . LastIndex ) , roots , err
}
return fn , nil
}
// connectLeafWatch is used to watch for changes to Connect Leaf certificates
// for given local service id.
func connectLeafWatch ( params map [ string ] interface { } ) ( WatcherFunc , error ) {
// We don't support stale since certs are likely to be cached locally in the
// agent anyway.
var serviceID string
if err := assignValue ( params , "service_id" , & serviceID ) ; err != nil {
return nil , err
}
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
agent := p . client . Agent ( )
opts := makeQueryOptionsWithContext ( p , false )
defer p . cancelFunc ( )
leaf , meta , err := agent . ConnectCALeaf ( serviceID , & opts )
if err != nil {
return nil , nil , err
}
return WaitIndexVal ( meta . LastIndex ) , leaf , err
}
return fn , nil
}
// connectProxyConfigWatch is used to watch for changes to Connect managed proxy
// configuration. Note that this state is agent-local so the watch mechanism
// uses `hash` rather than `index` for deciding whether to block.
func connectProxyConfigWatch ( params map [ string ] interface { } ) ( WatcherFunc , error ) {
// We don't support consistency modes since it's agent local data
var proxyServiceID string
if err := assignValue ( params , "proxy_service_id" , & proxyServiceID ) ; err != nil {
return nil , err
}
fn := func ( p * Plan ) ( BlockingParam , interface { } , error ) {
agent := p . client . Agent ( )
opts := makeQueryOptionsWithContext ( p , false )
defer p . cancelFunc ( )
log . Printf ( "DEBUG: id: %s, opts: %v" , proxyServiceID , opts )
config , _ , err := agent . ConnectProxyConfig ( proxyServiceID , & opts )
if err != nil {
return nil , nil , err
}
// Return string ContentHash since we don't have Raft indexes to block on.
return WaitHashVal ( config . ContentHash ) , config , err
}
return fn , nil
}
@ -234,6 +313,12 @@ func eventWatch(params map[string]interface{}) (WatcherFunc, error) {
func makeQueryOptionsWithContext ( p * Plan , stale bool ) consulapi . QueryOptions {
ctx , cancel := context . WithCancel ( context . Background ( ) )
p . cancelFunc = cancel
opts := consulapi . QueryOptions { AllowStale : stale , WaitIndex : p . lastIndex }
opts := consulapi . QueryOptions { AllowStale : stale }
switch param := p . lastParamVal . ( type ) {
case WaitIndexVal :
opts . WaitIndex = uint64 ( param )
case WaitHashVal :
opts . WaitHash = string ( param )
}
return * opts . WithContext ( ctx )
}