Backport of watch: support -filter for consul watch: checks, services, nodes, service into release/1.15.x (#17986)

* backport to 1.15.x

---------

Co-authored-by: cskh <hui.kang@hashicorp.com>
pull/17994/head
hc-github-team-consul-core 2023-06-30 15:26:31 -05:00 committed by GitHub
parent 20b8427f8c
commit 0a465dd0d2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 466 additions and 1 deletions

3
.changelog/17780.txt Normal file
View File

@ -0,0 +1,3 @@
```release-note:feature
cli: `consul watch` command uses `-filter` expression to filter response from checks, services, nodes, and service.
```

View File

@ -1765,5 +1765,11 @@ func TestHealth_RPC_Filter(t *testing.T) {
out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1)
args.State = api.HealthAny
args.Filter = "connect in ServiceTags and v2 in ServiceTags"
out = new(structs.IndexedHealthChecks)
require.NoError(t, msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &args, out))
require.Len(t, out.HealthChecks, 1)
})
}

View File

@ -89,13 +89,20 @@ func keyPrefixWatch(params map[string]interface{}) (WatcherFunc, error) {
// servicesWatch is used to watch the list of available services
func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
services, meta, err := catalog.Services(&opts)
if err != nil {
@ -109,13 +116,20 @@ func servicesWatch(params map[string]interface{}) (WatcherFunc, error) {
// nodesWatch is used to watch the list of available nodes
func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
catalog := p.client.Catalog()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := catalog.Nodes(&opts)
if err != nil {
@ -129,9 +143,13 @@ func nodesWatch(params map[string]interface{}) (WatcherFunc, error) {
// serviceWatch is used to watch a specific service for changes
func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
stale := false
filter := ""
if err := assignValueBool(params, "stale", &stale); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
var (
service string
@ -155,6 +173,9 @@ func serviceWatch(params map[string]interface{}) (WatcherFunc, error) {
fn := func(p *Plan) (BlockingParamVal, interface{}, error) {
health := p.client.Health()
opts := makeQueryOptionsWithContext(p, stale)
if filter != "" {
opts.Filter = filter
}
defer p.cancelFunc()
nodes, meta, err := health.ServiceMultipleTags(service, tags, passingOnly, &opts)
if err != nil {
@ -172,13 +193,16 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
return nil, err
}
var service, state string
var service, state, filter string
if err := assignValue(params, "service", &service); err != nil {
return nil, err
}
if err := assignValue(params, "state", &state); err != nil {
return nil, err
}
if err := assignValue(params, "filter", &filter); err != nil {
return nil, err
}
if service != "" && state != "" {
return nil, fmt.Errorf("Cannot specify service and state")
}
@ -193,6 +217,9 @@ func checksWatch(params map[string]interface{}) (WatcherFunc, error) {
var checks []*consulapi.HealthCheck
var meta *consulapi.QueryMeta
var err error
if filter != "" {
opts.Filter = filter
}
if state != "" {
checks, meta, err = health.State(state, &opts)
} else {

View File

@ -375,6 +375,82 @@ func TestServicesWatch(t *testing.T) {
}
func TestServicesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups []map[string][]string
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"services", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.(map[string][]string)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// Register some services
{
agent := c.Agent()
// we don't want to find this
reg := &api.AgentServiceRegistration{
ID: "foo",
Name: "foo",
Tags: []string{"b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// // we want to find this
reg = &api.AgentServiceRegistration{
ID: "bar",
Name: "bar",
Tags: []string{"a", "b"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
_, ok := v["bar"]
require.True(t, ok)
}
}
func TestNodesWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -450,6 +526,82 @@ func TestNodesWatch(t *testing.T) {
}
}
func TestNodesWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t) // wait for AE to sync
var (
wakeups [][]*api.Node
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"nodes", "filter":"Node == foo"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.Node)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// Register 2 nodes
{
catalog := c.Catalog()
// we want to find this node
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
// we don't want to find this node
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
// Start the watch nodes plan
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foo", v[0].Node)
}
}
func TestServiceWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -613,6 +765,94 @@ func TestServiceMultipleTagsWatch(t *testing.T) {
}
}
func TestServiceWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.ServiceEntry
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"service", "service":"foo", "filter":"bar in Service.Tags and buzz in Service.Tags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.ServiceEntry)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
// register some services
{
agent := c.Agent()
// we do not want to find this one.
reg := &api.AgentServiceRegistration{
ID: "foobarbiff",
Name: "foo",
Tags: []string{"bar", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// we do not want to find this one.
reg = &api.AgentServiceRegistration{
ID: "foobuzzbiff",
Name: "foo",
Tags: []string{"buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
// we want to find this one
reg = &api.AgentServiceRegistration{
ID: "foobarbuzzbiff",
Name: "foo",
Tags: []string{"bar", "buzz", "biff"},
}
if err := agent.ServiceRegister(reg); err != nil {
t.Fatalf("err: %v", err)
}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 1)
{
v := wakeups[0]
require.Len(t, v, 1)
require.Equal(t, "foobarbuzzbiff", v[0].Service.ID)
require.ElementsMatch(t, []string{"bar", "buzz", "biff"}, v[0].Service.Tags)
}
}
func TestChecksWatch_State(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
@ -769,6 +1009,190 @@ func TestChecksWatch_Service(t *testing.T) {
}
}
func TestChecksWatch_Service_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()
reg := &api.CatalogRegistration{
Node: "foobar",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foobar",
Service: "foobar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "foobar",
CheckID: "foobar",
Name: "foobar",
Status: api.HealthPassing,
ServiceID: "foobar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 2)
{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "foobar", v[0].CheckID)
}
}
func TestChecksWatch_Filter(t *testing.T) {
t.Parallel()
c, s := makeClient(t)
defer s.Stop()
s.WaitForSerfCheck(t)
var (
wakeups [][]*api.HealthCheck
notifyCh = make(chan struct{})
)
plan := mustParse(t, `{"type":"checks", "filter":"b in ServiceTags and a in ServiceTags"}`)
plan.Handler = func(idx uint64, raw interface{}) {
if raw == nil {
return // ignore
}
v, ok := raw.([]*api.HealthCheck)
if !ok {
return // ignore
}
wakeups = append(wakeups, v)
notifyCh <- struct{}{}
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
if err := plan.Run(s.HTTPAddr); err != nil {
t.Errorf("err: %v", err)
}
}()
defer plan.Stop()
// Wait for first wakeup.
<-notifyCh
{
catalog := c.Catalog()
// we don't want to find this one
reg := &api.CatalogRegistration{
Node: "foo",
Address: "1.1.1.1",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "foo",
Service: "foo",
Tags: []string{"a"},
},
Check: &api.AgentCheck{
Node: "foo",
CheckID: "foo",
Name: "foo",
Status: api.HealthPassing,
ServiceID: "foo",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
// we want to find this one
reg = &api.CatalogRegistration{
Node: "bar",
Address: "2.2.2.2",
Datacenter: "dc1",
Service: &api.AgentService{
ID: "bar",
Service: "bar",
Tags: []string{"a", "b"},
},
Check: &api.AgentCheck{
Node: "bar",
CheckID: "bar",
Name: "bar",
Status: api.HealthPassing,
ServiceID: "bar",
},
}
if _, err := catalog.Register(reg, nil); err != nil {
t.Fatalf("err: %v", err)
}
}
// Wait for second wakeup.
<-notifyCh
plan.Stop()
wg.Wait()
require.Len(t, wakeups, 2)
{
v := wakeups[0]
require.Len(t, v, 0)
}
{
v := wakeups[1]
require.Len(t, v, 1)
require.Equal(t, "bar", v[0].CheckID)
}
}
func TestEventWatch(t *testing.T) {
t.Parallel()
c, s := makeClient(t)

View File

@ -42,6 +42,7 @@ type cmd struct {
state string
name string
shell bool
filter string
}
func (c *cmd) init() {
@ -68,6 +69,7 @@ func (c *cmd) init() {
"Specifies the states to watch. Optional for 'checks' type.")
c.flags.StringVar(&c.name, "name", "",
"Specifies an event name to watch. Only for 'event' type.")
c.flags.StringVar(&c.filter, "filter", "", "Filter to use with the request")
c.http = &flags.HTTPFlags{}
flags.Merge(c.flags, c.http.ClientFlags())
@ -125,6 +127,9 @@ func (c *cmd) Run(args []string) int {
if c.service != "" {
params["service"] = c.service
}
if c.filter != "" {
params["filter"] = c.filter
}
if len(c.tag) > 0 {
params["tag"] = c.tag
}