Browse Source

watch: support -filter for consul watch: checks, services, nodes, service (#17780)

* watch: support -filter for watch checks

* Add filter for watch nodes, services, and service
- unit test added
- Add changelog
- update doc
pull/17851/head
cskh 1 year ago committed by GitHub
parent
commit
f16c5d87ab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      .changelog/17780.txt
  2. 6
      agent/consul/health_endpoint_test.go
  3. 29
      api/watch/funcs.go
  4. 528
      api/watch/funcs_test.go
  5. 5
      command/watch/watch.go
  6. 5
      website/content/commands/watch.mdx

3
.changelog/17780.txt

@ -0,0 +1,3 @@
```release-note:feature
cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service.
```

6
agent/consul/health_endpoint_test.go

@ -1767,5 +1767,11 @@ func TestHealth_RPC_Filter(t *testing.T) {
out = new(structs.IndexedHealthChecks) out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out)) require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1) require.Len(t, out.HealthChecks, 1)
args.State = api.HealthAny
args.Filter = "connect in ServiceTags and v2 in ServiceTags"
out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1)
}) })
} }

29
api/watch/funcs.go

@ -92,13 +92,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
// servicesWatch is used to watch the list of available services // servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) { func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil { if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err return nil, err
} }
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale) opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc() defer p.cancelFunc()
services, meta, err := catalog.Services(&opts) services, meta, err := catalog.Services(&opts)
if err != nil { if err != nil {
@ -112,13 +119,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
// nodesWatch is used to watch the list of available nodes // nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) { func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil { if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err return nil, err
} }
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog() catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale) opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc() defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts) nodes, meta, err := catalog.Nodes(&opts)
if err != nil { if err != nil {
@ -132,9 +146,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
// serviceWatch is used to watch a specific service for changes // serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) { func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil { if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err return nil, err
} }
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
var ( var (
service string service string
@ -158,6 +176,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (BlockingParamVal, interface{}, error) { fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health() health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale) opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc() defer p.cancelFunc()
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts) nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
if err != nil { if err != nil {
@ -175,13 +196,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil, err return nil, err
} }
var service, state string var service, state, filter string
if err := assignValue(params, "service", &service); err != nil { if err := assignValue(params, "service", &service); err != nil {
return nil, err return nil, err
} }
if err := assignValue(params, "state", &state); err != nil { if err := assignValue(params, "state", &state); err != nil {
return nil, err return nil, err
} }
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
if service != "" && state != "" { if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state") return nil, fmt.Errorf("Cannot specify service and state")
} }
@ -196,6 +220,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
var checks []*consulapi.HealthCheck var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta var meta *consulapi.QueryMeta
var err error var err error
if filter != "" {
opts.Filter = filter
}
if state != "" { if state != "" {
checks, meta, err = health.State(state, &opts) checks, meta, err = health.State(state, &opts)
} else { } else {

528
api/watch/funcs_test.go

@ -378,6 +378,82 @@ func TestServicesWatch(t *testing.T) {
} }
func TestServicesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups []map[string][]string
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(map[string][]string)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// Register some services
{
agent := c.Agent()
// we don't want to find this
reg := &api.AgentServiceRegistration{
ID: "foo",
Name: "foo",
Tags: []string{"b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// // we want to find this
reg = &api.AgentServiceRegistration{
ID: "bar",
Name: "bar",
Tags: []string{"a", "b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
_, ok := v["bar"]
require.True(t, ok)
}
}
func TestNodesWatch(t *testing.T) { func TestNodesWatch(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)
@ -453,6 +529,82 @@ func TestNodesWatch(t *testing.T) {
} }
} }
func TestNodesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t) // wait for AE to sync
var (
wakeups [][]*api.Node
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.Node)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// Register 2 nodes
{
catalog := c.Catalog()
// we want to find this node
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
// we don't want to find this node
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
// Start the watch nodes plan
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foo", v[0].Node)
}
}
func TestServiceWatch(t *testing.T) { func TestServiceWatch(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)
@ -616,6 +768,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) {
} }
} }
func TestServiceWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.ServiceEntry
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.ServiceEntry)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// register some services
{
agent := c.Agent()
// we do not want to find this one.
reg := &api.AgentServiceRegistration{
ID: "foobarbiff",
Name: "foo",
Tags: []string{"bar", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// we do not want to find this one.
reg = &api.AgentServiceRegistration{
ID: "foobuzzbiff",
Name: "foo",
Tags: []string{"buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// we want to find this one
reg = &api.AgentServiceRegistration{
ID: "foobarbuzzbiff",
Name: "foo",
Tags: []string{"bar", "buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foobarbuzzbiff", v[0].Service.ID)
require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags)
}
}
func TestChecksWatch_State(t *testing.T) { func TestChecksWatch_State(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)
@ -772,6 +1012,294 @@ func TestChecksWatch_Service(t *testing.T) {
} }
} }
func TestChecksWatch_Service_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()
reg := &api.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 2)
{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "foobar", v[0].CheckID)
}
}
func TestChecksWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()
// we don't want to find this one
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foo",
Service: "foo",
Tags: []string{"a"},
},
Check: &api.AgentCheck{
Node: "foo",
CheckID: "foo",
Name: "foo",
Status: api.HealthPassing,
ServiceID: "foo",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
// we want to find this one
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "bar",
Service: "bar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "bar",
CheckID: "bar",
Name: "bar",
Status: api.HealthPassing,
ServiceID: "bar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 2)
{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "bar", v[0].CheckID)
}
}
func TestChecksWatch_Filter_by_ServiceNameStatus(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"checks", "filter":"ServiceName == bar and Status == critical"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()
// we don't want to find this one
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foo",
Service: "foo",
Tags: []string{"a"},
},
Check: &api.AgentCheck{
Node: "foo",
CheckID: "foo",
Name: "foo",
Status: api.HealthPassing,
ServiceID: "foo",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
// we want to find this one
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "bar",
Service: "bar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "bar",
CheckID: "bar",
Name: "bar",
Status: api.HealthCritical,
ServiceID: "bar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 2)
{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "bar", v[0].CheckID)
}
}
func TestEventWatch(t *testing.T) { func TestEventWatch(t *testing.T) {
t.Parallel() t.Parallel()
c, s := makeClient(t) c, s := makeClient(t)

5
command/watch/watch.go

@ -45,6 +45,7 @@ type cmd struct {
state string state string
name string name string
shell bool shell bool
filter string
} }
func (c *cmd) init() { func (c *cmd) init() {
@ -71,6 +72,7 @@ func (c *cmd) init() {
"Specifies the states to watch. Optional for 'checks' type.") "Specifies the states to watch. Optional for 'checks' type.")
c.flags.StringVar(&c.name, "name", "", c.flags.StringVar(&c.name, "name", "",
"Specifies an event name to watch. Only for 'event' type.") "Specifies an event name to watch. Only for 'event' type.")
c.flags.StringVar(&c.filter, "filter", "", "Filter to use with the request")
c.http = &flags.HTTPFlags{} c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags()) flags.Merge(c.flags, c.http.ClientFlags())
@ -128,6 +130,9 @@ func (c *cmd) Run(args []string) int {
if c.service != "" { if c.service != "" {
params["service"] = c.service params["service"] = c.service
} }
if c.filter != "" {
params["filter"] = c.filter
}
if len(c.tag) > 0 { if len(c.tag) > 0 {
params["tag"] = c.tag params["tag"] = c.tag
} }

5
website/content/commands/watch.mdx

@ -53,6 +53,11 @@ or optionally provided. There is more documentation on watch
- `-type` - Watch type. Required, one of "`key`, `keyprefix`, `services`, - `-type` - Watch type. Required, one of "`key`, `keyprefix`, `services`,
`nodes`, `service`, `checks`, or `event`. `nodes`, `service`, `checks`, or `event`.
- `-filter=<filter>` - Expression to use for filtering the results. Optional for
`checks` `nodes`, `services`, and `service` type.
See the [`/catalog/nodes` API documentation](/consul/api-docs/catalog#filtering) for a
description of what is filterable.
#### API Options #### API Options
@include 'http_api_options_client.mdx' @include 'http_api_options_client.mdx'

Loading…
Cancel
Save