mirror of https://github.com/hashicorp/consul
Merge pull request #2654 from hashicorp/f-metafilter-endpoints
Add node metadata filtering to remaining endpointspull/2656/head
commit
1380fdbb92
|
@ -222,6 +222,36 @@ func TestCatalog_Service(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestCatalog_Service_NodeMetaFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
meta := map[string]string{"somekey": "somevalue"}
|
||||
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
|
||||
conf.NodeMeta = meta
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
catalog := c.Catalog()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
services, meta, err := catalog.Service("consul", "", &QueryOptions{NodeMeta: meta})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if meta.LastIndex == 0 {
|
||||
return false, fmt.Errorf("Bad: %v", meta)
|
||||
}
|
||||
|
||||
if len(services) == 0 {
|
||||
return false, fmt.Errorf("Bad: %v", services)
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestCatalog_Node(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
|
|
@ -208,6 +208,46 @@ func TestHealth_Checks(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestHealth_Checks_NodeMetaFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
meta := map[string]string{"somekey": "somevalue"}
|
||||
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
|
||||
conf.NodeMeta = meta
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
agent := c.Agent()
|
||||
health := c.Health()
|
||||
|
||||
// Make a service with a check
|
||||
reg := &AgentServiceRegistration{
|
||||
Name: "foo",
|
||||
Check: &AgentServiceCheck{
|
||||
TTL: "15s",
|
||||
},
|
||||
}
|
||||
if err := agent.ServiceRegister(reg); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
defer agent.ServiceDeregister("foo")
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
checks, meta, err := health.Checks("foo", &QueryOptions{NodeMeta: meta})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if meta.LastIndex == 0 {
|
||||
return false, fmt.Errorf("bad: %v", meta)
|
||||
}
|
||||
if len(checks) == 0 {
|
||||
return false, fmt.Errorf("Bad: %v", checks)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHealth_Service(t *testing.T) {
|
||||
c, s := makeClient(t)
|
||||
defer s.Stop()
|
||||
|
@ -235,6 +275,36 @@ func TestHealth_Service(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestHealth_Service_NodeMetaFilter(t *testing.T) {
|
||||
meta := map[string]string{"somekey": "somevalue"}
|
||||
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
|
||||
conf.NodeMeta = meta
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
health := c.Health()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
// consul service should always exist...
|
||||
checks, meta, err := health.Service("consul", "", true, &QueryOptions{NodeMeta: meta})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if meta.LastIndex == 0 {
|
||||
return false, fmt.Errorf("bad: %v", meta)
|
||||
}
|
||||
if len(checks) == 0 {
|
||||
return false, fmt.Errorf("Bad: %v", checks)
|
||||
}
|
||||
if _, ok := checks[0].Node.TaggedAddresses["wan"]; !ok {
|
||||
return false, fmt.Errorf("Bad: %v", checks[0].Node)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHealth_State(t *testing.T) {
|
||||
t.Parallel()
|
||||
c, s := makeClient(t)
|
||||
|
@ -258,3 +328,30 @@ func TestHealth_State(t *testing.T) {
|
|||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
||||
func TestHealth_State_NodeMetaFilter(t *testing.T) {
|
||||
t.Parallel()
|
||||
meta := map[string]string{"somekey": "somevalue"}
|
||||
c, s := makeClientWithConfig(t, nil, func(conf *testutil.TestServerConfig) {
|
||||
conf.NodeMeta = meta
|
||||
})
|
||||
defer s.Stop()
|
||||
|
||||
health := c.Health()
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
checks, meta, err := health.State("any", &QueryOptions{NodeMeta: meta})
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if meta.LastIndex == 0 {
|
||||
return false, fmt.Errorf("bad: %v", meta)
|
||||
}
|
||||
if len(checks) == 0 {
|
||||
return false, fmt.Errorf("Bad: %v", checks)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) {
|
||||
t.Fatalf("err: %s", err)
|
||||
})
|
||||
}
|
||||
|
|
|
@ -103,6 +103,7 @@ func (s *HTTPServer) CatalogServiceNodes(resp http.ResponseWriter, req *http.Req
|
|||
// Set default DC
|
||||
args := structs.ServiceSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -608,6 +608,72 @@ func TestCatalogServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalogServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
// Make sure an empty list is returned, not a nil
|
||||
{
|
||||
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.CatalogServiceNodes(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
assertIndex(t, resp)
|
||||
|
||||
nodes := obj.(structs.ServiceNodes)
|
||||
if nodes == nil || len(nodes) != 0 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
// Register node
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
Service: "api",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/catalog/service/api?node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.CatalogServiceNodes(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
assertIndex(t, resp)
|
||||
|
||||
nodes := obj.(structs.ServiceNodes)
|
||||
if len(nodes) != 1 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalogServiceNodes_WanTranslation(t *testing.T) {
|
||||
dir1, srv1 := makeHTTPServerWithConfig(t,
|
||||
func(c *Config) {
|
||||
|
|
|
@ -11,6 +11,7 @@ func (s *HTTPServer) HealthChecksInState(resp http.ResponseWriter, req *http.Req
|
|||
// Set default DC
|
||||
args := structs.ChecksInStateRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -70,6 +71,7 @@ func (s *HTTPServer) HealthServiceChecks(resp http.ResponseWriter, req *http.Req
|
|||
// Set default DC
|
||||
args := structs.ServiceSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
@ -100,6 +102,7 @@ func (s *HTTPServer) HealthServiceNodes(resp http.ResponseWriter, req *http.Requ
|
|||
// Set default DC
|
||||
args := structs.ServiceSpecificRequest{}
|
||||
s.parseSource(req, &args.Source)
|
||||
args.NodeMetaFilters = s.parseMetaFilter(req)
|
||||
if done := s.parse(resp, req, &args.Datacenter, &args.QueryOptions); done {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -65,6 +65,49 @@ func TestHealthChecksInState(t *testing.T) {
|
|||
})
|
||||
}
|
||||
|
||||
func TestHealthChecksInState_NodeMetaFilter(t *testing.T) {
|
||||
httpTest(t, func(srv *HTTPServer) {
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
Check: &structs.HealthCheck{
|
||||
Node: "bar",
|
||||
Name: "node check",
|
||||
Status: structs.HealthCritical,
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/health/state/critical?node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
testutil.WaitForResult(func() (bool, error) {
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.HealthChecksInState(resp, req)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if err := checkIndex(resp); err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
// Should be 1 health check for the server
|
||||
nodes := obj.(structs.HealthChecks)
|
||||
if len(nodes) != 1 {
|
||||
return false, fmt.Errorf("bad: %v", obj)
|
||||
}
|
||||
return true, nil
|
||||
}, func(err error) { t.Fatalf("err: %v", err) })
|
||||
})
|
||||
}
|
||||
|
||||
func TestHealthChecksInState_DistanceSort(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -258,6 +301,69 @@ func TestHealthServiceChecks(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceChecks_NodeMetaFilter(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.HealthServiceChecks(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
assertIndex(t, resp)
|
||||
|
||||
// Should be a non-nil empty list
|
||||
nodes := obj.(structs.HealthChecks)
|
||||
if nodes == nil || len(nodes) != 0 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
|
||||
// Create a service check
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: srv.agent.config.NodeName,
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
Check: &structs.HealthCheck{
|
||||
Node: srv.agent.config.NodeName,
|
||||
Name: "consul check",
|
||||
ServiceID: "consul",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err = srv.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req, err = http.NewRequest("GET", "/v1/health/checks/consul?dc=dc1&node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = httptest.NewRecorder()
|
||||
obj, err = srv.HealthServiceChecks(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
assertIndex(t, resp)
|
||||
|
||||
// Should be 1 health check for consul
|
||||
nodes = obj.(structs.HealthChecks)
|
||||
if len(nodes) != 1 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceChecks_DistanceSort(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
@ -429,6 +535,69 @@ func TestHealthServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
defer srv.Shutdown()
|
||||
defer srv.agent.Shutdown()
|
||||
|
||||
testutil.WaitForLeader(t, srv.agent.RPC, "dc1")
|
||||
|
||||
req, err := http.NewRequest("GET", "/v1/health/service/consul?dc=dc1&node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp := httptest.NewRecorder()
|
||||
obj, err := srv.HealthServiceNodes(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
assertIndex(t, resp)
|
||||
|
||||
// Should be a non-nil empty list
|
||||
nodes := obj.(structs.CheckServiceNodes)
|
||||
if nodes == nil || len(nodes) != 0 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
|
||||
args := &structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{"somekey": "somevalue"},
|
||||
Service: &structs.NodeService{
|
||||
ID: "test",
|
||||
Service: "test",
|
||||
},
|
||||
}
|
||||
|
||||
var out struct{}
|
||||
if err := srv.agent.RPC("Catalog.Register", args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
req, err = http.NewRequest("GET", "/v1/health/service/test?dc=dc1&node-meta=somekey:somevalue", nil)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
resp = httptest.NewRecorder()
|
||||
obj, err = srv.HealthServiceNodes(resp, req)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
assertIndex(t, resp)
|
||||
|
||||
// Should be a non-nil empty list for checks
|
||||
nodes = obj.(structs.CheckServiceNodes)
|
||||
if len(nodes) != 1 || nodes[0].Checks == nil || len(nodes[0].Checks) != 0 {
|
||||
t.Fatalf("bad: %v", obj)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealthServiceNodes_DistanceSort(t *testing.T) {
|
||||
dir, srv := makeHTTPServer(t)
|
||||
defer os.RemoveAll(dir)
|
||||
|
|
|
@ -243,6 +243,15 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru
|
|||
return err
|
||||
}
|
||||
reply.Index, reply.ServiceNodes = index, services
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
var filtered structs.ServiceNodes
|
||||
for _, service := range services {
|
||||
if structs.SatisfiesMetaFilters(service.NodeMeta, args.NodeMetaFilters) {
|
||||
filtered = append(filtered, service)
|
||||
}
|
||||
}
|
||||
reply.ServiceNodes = filtered
|
||||
}
|
||||
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -592,7 +592,7 @@ func TestCatalog_ListNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalog_ListNodes_MetaFilter(t *testing.T) {
|
||||
func TestCatalog_ListNodes_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
@ -1060,7 +1060,7 @@ func TestCatalog_ListServices(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalog_ListServices_MetaFilter(t *testing.T) {
|
||||
func TestCatalog_ListServices_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
|
@ -1308,6 +1308,106 @@ func TestCatalog_ListServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestCatalog_ListServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
// Add 2 nodes with specific meta maps
|
||||
node := &structs.Node{Node: "foo", Address: "127.0.0.1", Meta: map[string]string{"somekey": "somevalue", "common": "1"}}
|
||||
if err := s1.fsm.State().EnsureNode(1, node); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
node2 := &structs.Node{Node: "bar", Address: "127.0.0.2", Meta: map[string]string{"common": "1"}}
|
||||
if err := s1.fsm.State().EnsureNode(2, node2); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureService(3, "foo", &structs.NodeService{ID: "db", Service: "db", Tags: []string{"primary"}, Address: "127.0.0.1", Port: 5000}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
if err := s1.fsm.State().EnsureService(4, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"secondary"}, Address: "127.0.0.2", Port: 5000}); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
tag string
|
||||
services structs.ServiceNodes
|
||||
}{
|
||||
// Basic meta filter
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
|
||||
},
|
||||
// Basic meta filter, tag
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
tag: "primary",
|
||||
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
|
||||
},
|
||||
// Common meta filter
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
services: structs.ServiceNodes{
|
||||
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
|
||||
&structs.ServiceNode{Node: "foo", ServiceID: "db"},
|
||||
},
|
||||
},
|
||||
// Common meta filter, tag
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
tag: "secondary",
|
||||
services: structs.ServiceNodes{
|
||||
&structs.ServiceNode{Node: "bar", ServiceID: "db2"},
|
||||
},
|
||||
},
|
||||
// Invalid meta filter
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
services: structs.ServiceNodes{},
|
||||
},
|
||||
// Multiple filter values
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue", "common": "1"},
|
||||
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
|
||||
},
|
||||
// Multiple filter values, tag
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue", "common": "1"},
|
||||
tag: "primary",
|
||||
services: structs.ServiceNodes{&structs.ServiceNode{Node: "foo", ServiceID: "db"}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
args := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
NodeMetaFilters: tc.filters,
|
||||
ServiceName: "db",
|
||||
ServiceTag: tc.tag,
|
||||
TagFilter: tc.tag != "",
|
||||
}
|
||||
var out structs.IndexedServiceNodes
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &args, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(out.ServiceNodes) != len(tc.services) {
|
||||
t.Fatalf("bad: %v", out)
|
||||
}
|
||||
|
||||
for i, serviceNode := range out.ServiceNodes {
|
||||
if serviceNode.Node != tc.services[i].Node || serviceNode.ServiceID != tc.services[i].ServiceID {
|
||||
t.Fatalf("bad: %v, %v filters: %v", serviceNode, tc.services[i], tc.filters)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestCatalog_ListServiceNodes_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
|
|
@ -25,7 +25,14 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest,
|
|||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("ChecksInState"),
|
||||
func() error {
|
||||
index, checks, err := state.ChecksInState(args.State)
|
||||
var index uint64
|
||||
var checks structs.HealthChecks
|
||||
var err error
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
index, checks, err = state.ChecksInStateByNodeMeta(args.State, args.NodeMetaFilters)
|
||||
} else {
|
||||
index, checks, err = state.ChecksInState(args.State)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -80,7 +87,14 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest,
|
|||
&reply.QueryMeta,
|
||||
state.GetQueryWatch("ServiceChecks"),
|
||||
func() error {
|
||||
index, checks, err := state.ServiceChecks(args.ServiceName)
|
||||
var index uint64
|
||||
var checks structs.HealthChecks
|
||||
var err error
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
index, checks, err = state.ServiceChecksByNodeMeta(args.ServiceName, args.NodeMetaFilters)
|
||||
} else {
|
||||
index, checks, err = state.ServiceChecks(args.ServiceName)
|
||||
}
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -123,6 +137,15 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
|
|||
}
|
||||
|
||||
reply.Index, reply.Nodes = index, nodes
|
||||
if len(args.NodeMetaFilters) > 0 {
|
||||
var filtered structs.CheckServiceNodes
|
||||
for _, node := range nodes {
|
||||
if structs.SatisfiesMetaFilters(node.Node.Meta, args.NodeMetaFilters) {
|
||||
filtered = append(filtered, node)
|
||||
}
|
||||
}
|
||||
reply.Nodes = filtered
|
||||
}
|
||||
if err := h.srv.filterACL(args.Token, reply); err != nil {
|
||||
return err
|
||||
}
|
||||
|
|
|
@ -57,6 +57,101 @@ func TestHealth_ChecksInState(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealth_ChecksInState_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "memory utilization",
|
||||
Status: structs.HealthPassing,
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
arg = structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.2",
|
||||
NodeMeta: map[string]string{
|
||||
"common": "1",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "disk space",
|
||||
Status: structs.HealthPassing,
|
||||
},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
checkNames []string
|
||||
}{
|
||||
// Get foo's check by its unique meta value
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
checkNames: []string{"memory utilization"},
|
||||
},
|
||||
// Get both foo/bar's checks by their common meta value
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
checkNames: []string{"disk space", "memory utilization"},
|
||||
},
|
||||
// Use an invalid meta value, should get empty result
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
checkNames: []string{},
|
||||
},
|
||||
// Use multiple filters to get foo's check
|
||||
{
|
||||
filters: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
checkNames: []string{"memory utilization"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
var out structs.IndexedHealthChecks
|
||||
inState := structs.ChecksInStateRequest{
|
||||
Datacenter: "dc1",
|
||||
NodeMetaFilters: tc.filters,
|
||||
State: structs.HealthPassing,
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ChecksInState", &inState, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
checks := out.HealthChecks
|
||||
if len(checks) != len(tc.checkNames) {
|
||||
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
|
||||
}
|
||||
|
||||
for i, check := range checks {
|
||||
if tc.checkNames[i] != check.Name {
|
||||
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealth_ChecksInState_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
@ -221,6 +316,111 @@ func TestHealth_ServiceChecks(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealth_ServiceChecks_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "memory utilization",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
arg = structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.2",
|
||||
NodeMeta: map[string]string{
|
||||
"common": "1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "disk space",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
checkNames []string
|
||||
}{
|
||||
// Get foo's check by its unique meta value
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
checkNames: []string{"memory utilization"},
|
||||
},
|
||||
// Get both foo/bar's checks by their common meta value
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
checkNames: []string{"disk space", "memory utilization"},
|
||||
},
|
||||
// Use an invalid meta value, should get empty result
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
checkNames: []string{},
|
||||
},
|
||||
// Use multiple filters to get foo's check
|
||||
{
|
||||
filters: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
checkNames: []string{"memory utilization"},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
var out structs.IndexedHealthChecks
|
||||
inState := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
NodeMetaFilters: tc.filters,
|
||||
ServiceName: "db",
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceChecks", &inState, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
checks := out.HealthChecks
|
||||
if len(checks) != len(tc.checkNames) {
|
||||
t.Fatalf("Bad: %v, %v", checks, tc.checkNames)
|
||||
}
|
||||
|
||||
for i, check := range checks {
|
||||
if tc.checkNames[i] != check.Name {
|
||||
t.Fatalf("Bad: %v %v", checks, tc.checkNames)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealth_ServiceChecks_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
@ -392,6 +592,136 @@ func TestHealth_ServiceNodes(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestHealth_ServiceNodes_NodeMetaFilter(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
defer s1.Shutdown()
|
||||
codec := rpcClient(t, s1)
|
||||
defer codec.Close()
|
||||
|
||||
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
arg := structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "foo",
|
||||
Address: "127.0.0.1",
|
||||
NodeMeta: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "memory utilization",
|
||||
Status: structs.HealthPassing,
|
||||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
var out struct{}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
arg = structs.RegisterRequest{
|
||||
Datacenter: "dc1",
|
||||
Node: "bar",
|
||||
Address: "127.0.0.2",
|
||||
NodeMeta: map[string]string{
|
||||
"common": "1",
|
||||
},
|
||||
Service: &structs.NodeService{
|
||||
ID: "db",
|
||||
Service: "db",
|
||||
},
|
||||
Check: &structs.HealthCheck{
|
||||
Name: "disk space",
|
||||
Status: structs.HealthWarning,
|
||||
ServiceID: "db",
|
||||
},
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &arg, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
nodes structs.CheckServiceNodes
|
||||
}{
|
||||
// Get foo's check by its unique meta value
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
nodes: structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "foo"},
|
||||
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Get both foo/bar's checks by their common meta value
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
nodes: structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "bar"},
|
||||
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "disk space"}},
|
||||
},
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "foo"},
|
||||
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
// Use an invalid meta value, should get empty result
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
nodes: structs.CheckServiceNodes{},
|
||||
},
|
||||
// Use multiple filters to get foo's check
|
||||
{
|
||||
filters: map[string]string{
|
||||
"somekey": "somevalue",
|
||||
"common": "1",
|
||||
},
|
||||
nodes: structs.CheckServiceNodes{
|
||||
structs.CheckServiceNode{
|
||||
Node: &structs.Node{Node: "foo"},
|
||||
Checks: structs.HealthChecks{&structs.HealthCheck{Name: "memory utilization"}},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range cases {
|
||||
var out structs.IndexedCheckServiceNodes
|
||||
req := structs.ServiceSpecificRequest{
|
||||
Datacenter: "dc1",
|
||||
NodeMetaFilters: tc.filters,
|
||||
ServiceName: "db",
|
||||
}
|
||||
if err := msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &out); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
|
||||
if len(out.Nodes) != len(tc.nodes) {
|
||||
t.Fatalf("bad: %v, %v, filters: %v", out.Nodes, tc.nodes, tc.filters)
|
||||
}
|
||||
|
||||
for i, node := range out.Nodes {
|
||||
checks := tc.nodes[i].Checks
|
||||
if len(node.Checks) != len(checks) {
|
||||
t.Fatalf("bad: %v, %v, filters: %v", node.Checks, checks, tc.filters)
|
||||
}
|
||||
for j, check := range node.Checks {
|
||||
if check.Name != checks[j].Name {
|
||||
t.Fatalf("bad: %v, %v, filters: %v", check, checks[j], tc.filters)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestHealth_ServiceNodes_DistanceSort(t *testing.T) {
|
||||
dir1, s1 := testServer(t)
|
||||
defer os.RemoveAll(dir1)
|
||||
|
|
|
@ -198,11 +198,8 @@ func (s *StateStore) Nodes() (uint64, structs.Nodes, error) {
|
|||
return idx, results, nil
|
||||
}
|
||||
|
||||
// NodesByMeta is used to return all nodes with the given meta key/value pair.
|
||||
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
|
||||
func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nodes, error) {
|
||||
if len(filters) > 1 {
|
||||
return 0, nil, fmt.Errorf("multiple meta filters not supported")
|
||||
}
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -213,6 +210,7 @@ func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nod
|
|||
var args []interface{}
|
||||
for key, value := range filters {
|
||||
args = append(args, key, value)
|
||||
break
|
||||
}
|
||||
nodes, err := tx.Get("nodes", "meta", args...)
|
||||
if err != nil {
|
||||
|
@ -222,7 +220,10 @@ func (s *StateStore) NodesByMeta(filters map[string]string) (uint64, structs.Nod
|
|||
// Create and return the nodes list.
|
||||
var results structs.Nodes
|
||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
results = append(results, node.(*structs.Node))
|
||||
n := node.(*structs.Node)
|
||||
if len(filters) <= 1 || structs.SatisfiesMetaFilters(n.Meta, filters) {
|
||||
results = append(results, n)
|
||||
}
|
||||
}
|
||||
return idx, results, nil
|
||||
}
|
||||
|
@ -437,11 +438,8 @@ func (s *StateStore) Services() (uint64, structs.Services, error) {
|
|||
return idx, results, nil
|
||||
}
|
||||
|
||||
// Services returns all services, filtered by the given node metadata.
|
||||
// ServicesByNodeMeta returns all services, filtered by the given node metadata.
|
||||
func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, structs.Services, error) {
|
||||
if len(filters) > 1 {
|
||||
return 0, nil, fmt.Errorf("multiple meta filters not supported")
|
||||
}
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
|
@ -452,6 +450,7 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru
|
|||
var args []interface{}
|
||||
for key, value := range filters {
|
||||
args = append(args, key, value)
|
||||
break
|
||||
}
|
||||
nodes, err := tx.Get("nodes", "meta", args...)
|
||||
if err != nil {
|
||||
|
@ -462,6 +461,9 @@ func (s *StateStore) ServicesByNodeMeta(filters map[string]string) (uint64, stru
|
|||
unique := make(map[string]map[string]struct{})
|
||||
for node := nodes.Next(); node != nil; node = nodes.Next() {
|
||||
n := node.(*structs.Node)
|
||||
if len(filters) > 1 && !structs.SatisfiesMetaFilters(n.Meta, filters) {
|
||||
continue
|
||||
}
|
||||
// List all the services on the node
|
||||
services, err := tx.Get("services", "node", n.Node)
|
||||
if err != nil {
|
||||
|
@ -877,6 +879,24 @@ func (s *StateStore) ServiceChecks(serviceName string) (uint64, structs.HealthCh
|
|||
return s.parseChecks(idx, checks)
|
||||
}
|
||||
|
||||
// ServiceChecksByNodeMeta is used to get all checks associated with a
|
||||
// given service ID, filtered by the given node metadata values. The query
|
||||
// is performed against a service _name_ instead of a service ID.
|
||||
func (s *StateStore) ServiceChecksByNodeMeta(serviceName string, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("ServiceChecksByNodeMeta")...)
|
||||
|
||||
// Return the checks.
|
||||
checks, err := tx.Get("checks", "service", serviceName)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
|
||||
}
|
||||
|
||||
// ChecksInState is used to query the state store for all checks
|
||||
// which are in the provided state.
|
||||
func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks, error) {
|
||||
|
@ -903,6 +923,34 @@ func (s *StateStore) ChecksInState(state string) (uint64, structs.HealthChecks,
|
|||
return s.parseChecks(idx, checks)
|
||||
}
|
||||
|
||||
// ChecksInStateByNodeMeta is used to query the state store for all checks
|
||||
// which are in the provided state, filtered by the given node metadata values.
|
||||
func (s *StateStore) ChecksInStateByNodeMeta(state string, filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||
tx := s.db.Txn(false)
|
||||
defer tx.Abort()
|
||||
|
||||
// Get the table index.
|
||||
idx := maxIndexTxn(tx, s.getWatchTables("ChecksInStateByNodeMeta")...)
|
||||
|
||||
// Query all checks if HealthAny is passed
|
||||
var checks memdb.ResultIterator
|
||||
var err error
|
||||
if state == structs.HealthAny {
|
||||
checks, err = tx.Get("checks", "status")
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
} else {
|
||||
// Any other state we need to query for explicitly
|
||||
checks, err = tx.Get("checks", "status", state)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed check lookup: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
return s.parseChecksByNodeMeta(idx, checks, tx, filters)
|
||||
}
|
||||
|
||||
// parseChecks is a helper function used to deduplicate some
|
||||
// repetitive code for returning health checks.
|
||||
func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64, structs.HealthChecks, error) {
|
||||
|
@ -914,6 +962,27 @@ func (s *StateStore) parseChecks(idx uint64, iter memdb.ResultIterator) (uint64,
|
|||
return idx, results, nil
|
||||
}
|
||||
|
||||
// parseChecksByNodeMeta is a helper function used to deduplicate some
|
||||
// repetitive code for returning health checks filtered by node metadata fields.
|
||||
func (s *StateStore) parseChecksByNodeMeta(idx uint64, iter memdb.ResultIterator, tx *memdb.Txn,
|
||||
filters map[string]string) (uint64, structs.HealthChecks, error) {
|
||||
var results structs.HealthChecks
|
||||
for check := iter.Next(); check != nil; check = iter.Next() {
|
||||
healthCheck := check.(*structs.HealthCheck)
|
||||
node, err := tx.First("nodes", "id", healthCheck.Node)
|
||||
if err != nil {
|
||||
return 0, nil, fmt.Errorf("failed node lookup: %s", err)
|
||||
}
|
||||
if node == nil {
|
||||
return 0, nil, ErrMissingNode
|
||||
}
|
||||
if structs.SatisfiesMetaFilters(node.(*structs.Node).Meta, filters) {
|
||||
results = append(results, healthCheck)
|
||||
}
|
||||
}
|
||||
return idx, results, nil
|
||||
}
|
||||
|
||||
// DeleteCheck is used to delete a health check registration.
|
||||
func (s *StateStore) DeleteCheck(idx uint64, node string, checkID types.CheckID) error {
|
||||
tx := s.db.Txn(true)
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
|
||||
"github.com/hashicorp/consul/consul/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/types"
|
||||
)
|
||||
|
||||
func TestStateStore_EnsureRegistration(t *testing.T) {
|
||||
|
@ -544,65 +545,50 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
|
|||
}
|
||||
|
||||
// Create some nodes in the state store
|
||||
node0 := &structs.Node{Node: "node0", Address: "127.0.0.1", Meta: map[string]string{"role": "client", "common": "1"}}
|
||||
if err := s.EnsureNode(0, node0); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
}
|
||||
node1 := &structs.Node{Node: "node1", Address: "127.0.0.1", Meta: map[string]string{"role": "server", "common": "1"}}
|
||||
if err := s.EnsureNode(1, node1); err != nil {
|
||||
t.Fatalf("err: %v", err)
|
||||
testRegisterNodeWithMeta(t, s, 0, "node0", map[string]string{"role": "client"})
|
||||
testRegisterNodeWithMeta(t, s, 1, "node1", map[string]string{"role": "client", "common": "1"})
|
||||
testRegisterNodeWithMeta(t, s, 2, "node2", map[string]string{"role": "server", "common": "1"})
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
nodes []string
|
||||
}{
|
||||
// Simple meta filter
|
||||
{
|
||||
filters: map[string]string{"role": "server"},
|
||||
nodes: []string{"node2"},
|
||||
},
|
||||
// Common meta filter
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
nodes: []string{"node1", "node2"},
|
||||
},
|
||||
// Invalid meta filter
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
nodes: []string{},
|
||||
},
|
||||
// Multiple meta filters
|
||||
{
|
||||
filters: map[string]string{"role": "client", "common": "1"},
|
||||
nodes: []string{"node1"},
|
||||
},
|
||||
}
|
||||
|
||||
// Retrieve the node with role=client
|
||||
idx, nodes, err := s.NodesByMeta(map[string]string{"role": "client"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// Only one node was returned
|
||||
if n := len(nodes); n != 1 {
|
||||
t.Fatalf("bad node count: %d", n)
|
||||
}
|
||||
|
||||
// Make sure the node is correct
|
||||
if nodes[0].CreateIndex != 0 || nodes[0].ModifyIndex != 0 {
|
||||
t.Fatalf("bad node index: %d, %d", nodes[0].CreateIndex, nodes[0].ModifyIndex)
|
||||
}
|
||||
if nodes[0].Node != "node0" {
|
||||
t.Fatalf("bad: %#v", nodes[0])
|
||||
}
|
||||
if !reflect.DeepEqual(nodes[0].Meta, node0.Meta) {
|
||||
t.Fatalf("bad: %v != %v", nodes[0].Meta, node0.Meta)
|
||||
}
|
||||
|
||||
// Retrieve both nodes via their common meta field
|
||||
idx, nodes, err = s.NodesByMeta(map[string]string{"common": "1"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 1 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
|
||||
// All nodes were returned
|
||||
if n := len(nodes); n != 2 {
|
||||
t.Fatalf("bad node count: %d", n)
|
||||
}
|
||||
|
||||
// Make sure the nodes match
|
||||
for i, node := range nodes {
|
||||
if node.CreateIndex != uint64(i) || node.ModifyIndex != uint64(i) {
|
||||
t.Fatalf("bad node index: %d, %d", node.CreateIndex, node.ModifyIndex)
|
||||
for _, tc := range cases {
|
||||
_, result, err := s.NodesByMeta(tc.filters)
|
||||
if err != nil {
|
||||
t.Fatalf("bad: %v", err)
|
||||
}
|
||||
name := fmt.Sprintf("node%d", i)
|
||||
if node.Node != name {
|
||||
t.Fatalf("bad: %#v", node)
|
||||
|
||||
if len(result) != len(tc.nodes) {
|
||||
t.Fatalf("bad: %v %v", result, tc.nodes)
|
||||
}
|
||||
if v, ok := node.Meta["common"]; !ok || v != "1" {
|
||||
t.Fatalf("bad: %v", node.Meta)
|
||||
|
||||
for i, node := range result {
|
||||
if node.Node != tc.nodes[i] {
|
||||
t.Fatalf("bad: %v %v", node.Node, tc.nodes[i])
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -975,13 +961,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
|
|||
}
|
||||
|
||||
// Filter the services by the first node's meta value
|
||||
idx, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"})
|
||||
_, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
expected := structs.Services{
|
||||
"redis": []string{"master", "prod"},
|
||||
}
|
||||
|
@ -991,13 +974,10 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
|
|||
}
|
||||
|
||||
// Get all services using the common meta value
|
||||
idx, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"})
|
||||
_, res, err = s.ServicesByNodeMeta(map[string]string{"common": "1"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if idx != 3 {
|
||||
t.Fatalf("bad index: %d", idx)
|
||||
}
|
||||
expected = structs.Services{
|
||||
"redis": []string{"master", "prod", "slave"},
|
||||
}
|
||||
|
@ -1005,6 +985,29 @@ func TestStateStore_ServicesByNodeMeta(t *testing.T) {
|
|||
if !reflect.DeepEqual(res, expected) {
|
||||
t.Fatalf("bad: %v %v", res, expected)
|
||||
}
|
||||
|
||||
// Get an empty list for an invalid meta value
|
||||
_, res, err = s.ServicesByNodeMeta(map[string]string{"invalid": "nope"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
expected = structs.Services{}
|
||||
if !reflect.DeepEqual(res, expected) {
|
||||
t.Fatalf("bad: %v %v", res, expected)
|
||||
}
|
||||
|
||||
// Get the first node's service instance using multiple meta filters
|
||||
_, res, err = s.ServicesByNodeMeta(map[string]string{"role": "client", "common": "1"})
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
expected = structs.Services{
|
||||
"redis": []string{"master", "prod"},
|
||||
}
|
||||
sort.Strings(res["redis"])
|
||||
if !reflect.DeepEqual(res, expected) {
|
||||
t.Fatalf("bad: %v %v", res, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_ServiceNodes(t *testing.T) {
|
||||
|
@ -1546,6 +1549,63 @@ func TestStateStore_ServiceChecks(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_ServiceChecksByNodeMeta(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Create the first node and service with some checks
|
||||
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
|
||||
testRegisterService(t, s, 1, "node1", "service1")
|
||||
testRegisterCheck(t, s, 2, "node1", "service1", "check1", structs.HealthPassing)
|
||||
testRegisterCheck(t, s, 3, "node1", "service1", "check2", structs.HealthPassing)
|
||||
|
||||
// Create a second node/service with a different set of checks
|
||||
testRegisterNodeWithMeta(t, s, 4, "node2", map[string]string{"common": "1"})
|
||||
testRegisterService(t, s, 5, "node2", "service1")
|
||||
testRegisterCheck(t, s, 6, "node2", "service1", "check3", structs.HealthPassing)
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
checks []string
|
||||
}{
|
||||
// Basic meta filter
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
checks: []string{"check1", "check2"},
|
||||
},
|
||||
// Common meta field
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
checks: []string{"check1", "check2", "check3"},
|
||||
},
|
||||
// Invalid meta filter
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
checks: []string{},
|
||||
},
|
||||
// Multiple filters
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue", "common": "1"},
|
||||
checks: []string{"check1", "check2"},
|
||||
},
|
||||
}
|
||||
|
||||
// Try querying for all checks associated with service1
|
||||
for _, tc := range cases {
|
||||
_, checks, err := s.ServiceChecksByNodeMeta("service1", tc.filters)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(checks) != len(tc.checks) {
|
||||
t.Fatalf("bad checks: %#v", checks)
|
||||
}
|
||||
for i, check := range checks {
|
||||
if check.CheckID != types.CheckID(tc.checks[i]) {
|
||||
t.Fatalf("bad checks: %#v", checks)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_ChecksInState(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
@ -1585,6 +1645,88 @@ func TestStateStore_ChecksInState(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStateStore_ChecksInStateByNodeMeta(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
// Querying with no results returns nil
|
||||
idx, res, err := s.ChecksInStateByNodeMeta(structs.HealthPassing, nil)
|
||||
if idx != 0 || res != nil || err != nil {
|
||||
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
|
||||
}
|
||||
|
||||
// Register a node with checks in varied states
|
||||
testRegisterNodeWithMeta(t, s, 0, "node1", map[string]string{"somekey": "somevalue", "common": "1"})
|
||||
testRegisterCheck(t, s, 1, "node1", "", "check1", structs.HealthPassing)
|
||||
testRegisterCheck(t, s, 2, "node1", "", "check2", structs.HealthCritical)
|
||||
|
||||
testRegisterNodeWithMeta(t, s, 3, "node2", map[string]string{"common": "1"})
|
||||
testRegisterCheck(t, s, 4, "node2", "", "check3", structs.HealthPassing)
|
||||
|
||||
cases := []struct {
|
||||
filters map[string]string
|
||||
state string
|
||||
checks []string
|
||||
}{
|
||||
// Basic meta filter, any status
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
state: structs.HealthAny,
|
||||
checks: []string{"check2", "check1"},
|
||||
},
|
||||
// Basic meta filter, only passing
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue"},
|
||||
state: structs.HealthPassing,
|
||||
checks: []string{"check1"},
|
||||
},
|
||||
// Common meta filter, any status
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
state: structs.HealthAny,
|
||||
checks: []string{"check2", "check1", "check3"},
|
||||
},
|
||||
// Common meta filter, only passing
|
||||
{
|
||||
filters: map[string]string{"common": "1"},
|
||||
state: structs.HealthPassing,
|
||||
checks: []string{"check1", "check3"},
|
||||
},
|
||||
// Invalid meta filter
|
||||
{
|
||||
filters: map[string]string{"invalid": "nope"},
|
||||
checks: []string{},
|
||||
},
|
||||
// Multiple filters, any status
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue", "common": "1"},
|
||||
state: structs.HealthAny,
|
||||
checks: []string{"check2", "check1"},
|
||||
},
|
||||
// Multiple filters, only passing
|
||||
{
|
||||
filters: map[string]string{"somekey": "somevalue", "common": "1"},
|
||||
state: structs.HealthPassing,
|
||||
checks: []string{"check1"},
|
||||
},
|
||||
}
|
||||
|
||||
// Try querying for all checks associated with service1
|
||||
for _, tc := range cases {
|
||||
_, checks, err := s.ChecksInStateByNodeMeta(tc.state, tc.filters)
|
||||
if err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
if len(checks) != len(tc.checks) {
|
||||
t.Fatalf("bad checks: %#v", checks)
|
||||
}
|
||||
for i, check := range checks {
|
||||
if check.CheckID != types.CheckID(tc.checks[i]) {
|
||||
t.Fatalf("bad checks: %#v, %v", checks, tc.checks)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestStateStore_DeleteCheck(t *testing.T) {
|
||||
s := testStateStore(t)
|
||||
|
||||
|
|
|
@ -222,6 +222,8 @@ func (s *StateStore) getWatchTables(method string) []string {
|
|||
return []string{"nodes", "services"}
|
||||
case "NodeCheck", "NodeChecks", "ServiceChecks", "ChecksInState":
|
||||
return []string{"checks"}
|
||||
case "ChecksInStateByNodeMeta", "ServiceChecksByNodeMeta":
|
||||
return []string{"nodes", "checks"}
|
||||
case "CheckServiceNodes", "NodeInfo", "NodeDump":
|
||||
return []string{"nodes", "services", "checks"}
|
||||
case "SessionGet", "SessionList", "NodeSessions":
|
||||
|
|
|
@ -35,7 +35,11 @@ func testStateStore(t *testing.T) *StateStore {
|
|||
}
|
||||
|
||||
func testRegisterNode(t *testing.T, s *StateStore, idx uint64, nodeID string) {
|
||||
node := &structs.Node{Node: nodeID}
|
||||
testRegisterNodeWithMeta(t, s, idx, nodeID, nil)
|
||||
}
|
||||
|
||||
func testRegisterNodeWithMeta(t *testing.T, s *StateStore, idx uint64, nodeID string, meta map[string]string) {
|
||||
node := &structs.Node{Node: nodeID, Meta: meta}
|
||||
if err := s.EnsureNode(idx, node); err != nil {
|
||||
t.Fatalf("err: %s", err)
|
||||
}
|
||||
|
|
|
@ -241,11 +241,12 @@ func (r *DCSpecificRequest) RequestDatacenter() string {
|
|||
|
||||
// ServiceSpecificRequest is used to query about a specific service
|
||||
type ServiceSpecificRequest struct {
|
||||
Datacenter string
|
||||
ServiceName string
|
||||
ServiceTag string
|
||||
TagFilter bool // Controls tag filtering
|
||||
Source QuerySource
|
||||
Datacenter string
|
||||
NodeMetaFilters map[string]string
|
||||
ServiceName string
|
||||
ServiceTag string
|
||||
TagFilter bool // Controls tag filtering
|
||||
Source QuerySource
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
@ -266,9 +267,10 @@ func (r *NodeSpecificRequest) RequestDatacenter() string {
|
|||
|
||||
// ChecksInStateRequest is used to query for nodes in a state
|
||||
type ChecksInStateRequest struct {
|
||||
Datacenter string
|
||||
State string
|
||||
Source QuerySource
|
||||
Datacenter string
|
||||
NodeMetaFilters map[string]string
|
||||
State string
|
||||
Source QuerySource
|
||||
QueryOptions
|
||||
}
|
||||
|
||||
|
@ -287,6 +289,15 @@ type Node struct {
|
|||
}
|
||||
type Nodes []*Node
|
||||
|
||||
func SatisfiesMetaFilters(meta map[string]string, filters map[string]string) bool {
|
||||
for key, value := range filters {
|
||||
if v, ok := meta[key]; !ok || v != value {
|
||||
return false
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
// Used to return information about a provided services.
|
||||
// Maps service name to available tags
|
||||
type Services map[string][]string
|
||||
|
|
|
@ -200,7 +200,8 @@ node for the sort.
|
|||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This will filter the results to nodes with that pair present.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
nodes with the specified key/value pair(s).
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
|
@ -241,7 +242,8 @@ however, the `dc` can be provided using the `?dc=` query parameter.
|
|||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This will filter the results to services with that pair present.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
services on nodes with the specified key/value pair(s).
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
|
@ -276,6 +278,11 @@ the node list in ascending order based on the estimated round trip
|
|||
time from that node. Passing `?near=_agent` will use the agent's
|
||||
node for the sort.
|
||||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
service entries on nodes with the specified key/value pair(s).
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
```javascript
|
||||
|
|
|
@ -75,6 +75,11 @@ the node list in ascending order based on the estimated round trip
|
|||
time from that node. Passing `?near=_agent` will use the agent's
|
||||
node for the sort.
|
||||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
health checks on nodes with the specified key/value pair(s).
|
||||
|
||||
It returns a JSON body like this:
|
||||
|
||||
```javascript
|
||||
|
@ -112,6 +117,11 @@ Providing the `?passing` query parameter, added in Consul 0.2, will
|
|||
filter results to only nodes with all checks in the `passing` state.
|
||||
This can be used to avoid extra filtering logic on the client side.
|
||||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
nodes with the specified key/value pair(s).
|
||||
|
||||
This endpoint is very similar to the `/v1/catalog/service` endpoint; however, this
|
||||
endpoint automatically returns the status of the associated health check
|
||||
as well as any system level health checks. This allows a client to avoid
|
||||
|
@ -182,6 +192,11 @@ the node list in ascending order based on the estimated round trip
|
|||
time from that node. Passing `?near=_agent` will use the agent's
|
||||
node for the sort.
|
||||
|
||||
In Consul 0.7.3 and later, the optional `?node-meta=` parameter can be
|
||||
provided with a desired node metadata key/value pair of the form `key:value`.
|
||||
This parameter can be specified multiple times, and will filter the results to
|
||||
health checks on nodes with the specified key/value pair(s).
|
||||
|
||||
The supported states are `any`, `passing`, `warning`, or `critical`.
|
||||
The `any` state is a wildcard that can be used to return all checks.
|
||||
|
||||
|
|
Loading…
Reference in New Issue