agent/consul/state: support querying by Connect native

pull/4275/head
Mitchell Hashimoto 7 years ago committed by Jack Pearkes
parent 6b745964c4
commit eb3fcb39b3

@ -17,6 +17,7 @@ import (
"github.com/hashicorp/consul/types"
"github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestCatalog_Register(t *testing.T) {
@ -1823,6 +1824,49 @@ func TestCatalog_ListServiceNodes_ConnectDestination(t *testing.T) {
assert.Equal("", v.ServiceProxyDestination)
}
// Test that calling ServiceNodes with Connect: true will return
// Connect native services.
func TestCatalog_ListServiceNodes_ConnectDestinationNative(t *testing.T) {
t.Parallel()
require := require.New(t)
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()
testrpc.WaitForLeader(t, s1.RPC, "dc1")
// Register the native service
args := structs.TestRegisterRequest(t)
args.Service.ConnectNative = true
var out struct{}
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.Register", args, &out))
// List
req := structs.ServiceSpecificRequest{
Connect: true,
Datacenter: "dc1",
ServiceName: args.Service.Service,
}
var resp structs.IndexedServiceNodes
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
require.Len(resp.ServiceNodes, 1)
v := resp.ServiceNodes[0]
require.Equal(args.Service.Service, v.ServiceName)
// List by non-Connect
req = structs.ServiceSpecificRequest{
Datacenter: "dc1",
ServiceName: args.Service.Service,
}
require.Nil(msgpackrpc.CallWithCodec(codec, "Catalog.ServiceNodes", &req, &resp))
require.Len(resp.ServiceNodes, 1)
v = resp.ServiceNodes[0]
require.Equal(args.Service.Service, v.ServiceName)
}
func TestCatalog_ListServiceNodes_ConnectProxy_ACL(t *testing.T) {
t.Parallel()

@ -91,14 +91,11 @@ func servicesTableSchema() *memdb.TableSchema {
Lowercase: true,
},
},
"proxy_destination": &memdb.IndexSchema{
Name: "proxy_destination",
"connect": &memdb.IndexSchema{
Name: "connect",
AllowMissing: true,
Unique: false,
Indexer: &memdb.StringFieldIndex{
Field: "ServiceProxyDestination",
Lowercase: true,
},
Indexer: &IndexConnectService{},
},
},
}
@ -819,7 +816,7 @@ func (s *Store) serviceNodes(ws memdb.WatchSet, serviceName string, connect bool
}
} else {
f = func() (memdb.ResultIterator, error) {
return tx.Get("services", "proxy_destination", serviceName)
return tx.Get("services", "connect", serviceName)
}
}
@ -1540,7 +1537,7 @@ func (s *Store) checkServiceNodes(ws memdb.WatchSet, serviceName string, connect
}
} else {
f = func() (memdb.ResultIterator, error) {
return tx.Get("services", "proxy_destination", serviceName)
return tx.Get("services", "connect", serviceName)
}
}

@ -1590,7 +1590,8 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
assert.Nil(s.EnsureService(13, "bar", &structs.NodeService{ID: "api", Service: "api", Tags: nil, Address: "", Port: 5000}))
assert.Nil(s.EnsureService(14, "foo", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(15, "bar", &structs.NodeService{Kind: structs.ServiceKindConnectProxy, ID: "proxy", Service: "proxy", ProxyDestination: "db", Port: 8000}))
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
assert.Nil(s.EnsureService(16, "bar", &structs.NodeService{ID: "native-db", Service: "db", ConnectNative: true}))
assert.Nil(s.EnsureService(17, "bar", &structs.NodeService{ID: "db2", Service: "db", Tags: []string{"slave"}, Address: "", Port: 8001}))
assert.True(watchFired(ws))
// Read everything back.
@ -1598,11 +1599,13 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
idx, nodes, err = s.ConnectServiceNodes(ws, "db")
assert.Nil(err)
assert.Equal(idx, uint64(idx))
assert.Len(nodes, 2)
assert.Len(nodes, 3)
for _, n := range nodes {
assert.Equal(structs.ServiceKindConnectProxy, n.ServiceKind)
assert.Equal("db", n.ServiceProxyDestination)
assert.True(
n.ServiceKind == structs.ServiceKindConnectProxy ||
n.ServiceConnectNative,
"either proxy or connect native")
}
// Registering some unrelated node should not fire the watch.

@ -0,0 +1,54 @@
package state
import (
"fmt"
"strings"
"github.com/hashicorp/consul/agent/structs"
)
// IndexConnectService indexes a *struct.ServiceNode for querying by
// services that support Connect to some target service. This will
// properly index the proxy destination for proxies and the service name
// for native services.
type IndexConnectService struct{}
func (idx *IndexConnectService) FromObject(obj interface{}) (bool, []byte, error) {
sn, ok := obj.(*structs.ServiceNode)
if !ok {
return false, nil, fmt.Errorf("Object must be ServiceNode, got %T", obj)
}
var result []byte
switch {
case sn.ServiceKind == structs.ServiceKindConnectProxy:
// For proxies, this service supports Connect for the destination
result = []byte(strings.ToLower(sn.ServiceProxyDestination))
case sn.ServiceConnectNative:
// For native, this service supports Connect directly
result = []byte(strings.ToLower(sn.ServiceName))
default:
// Doesn't support Connect at all
return false, nil, nil
}
// Return the result with the null terminator appended so we can
// differentiate prefix vs. non-prefix matches.
return true, append(result, '\x00'), nil
}
func (idx *IndexConnectService) FromArgs(args ...interface{}) ([]byte, error) {
if len(args) != 1 {
return nil, fmt.Errorf("must provide only a single argument")
}
arg, ok := args[0].(string)
if !ok {
return nil, fmt.Errorf("argument must be a string: %#v", args[0])
}
// Add the null character as a terminator
return append([]byte(strings.ToLower(arg)), '\x00'), nil
}

@ -0,0 +1,122 @@
package state
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/stretchr/testify/require"
)
func TestIndexConnectService_FromObject(t *testing.T) {
cases := []struct {
Name string
Input interface{}
ExpectMatch bool
ExpectVal []byte
ExpectErr string
}{
{
"not a ServiceNode",
42,
false,
nil,
"ServiceNode",
},
{
"typical service, not native",
&structs.ServiceNode{
ServiceName: "db",
},
false,
nil,
"",
},
{
"typical service, is native",
&structs.ServiceNode{
ServiceName: "dB",
ServiceConnectNative: true,
},
true,
[]byte("db\x00"),
"",
},
{
"proxy service",
&structs.ServiceNode{
ServiceKind: structs.ServiceKindConnectProxy,
ServiceName: "db",
ServiceProxyDestination: "fOo",
},
true,
[]byte("foo\x00"),
"",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
var idx IndexConnectService
match, val, err := idx.FromObject(tc.Input)
if tc.ExpectErr != "" {
require.Error(err)
require.Contains(err.Error(), tc.ExpectErr)
return
}
require.NoError(err)
require.Equal(tc.ExpectMatch, match)
require.Equal(tc.ExpectVal, val)
})
}
}
func TestIndexConnectService_FromArgs(t *testing.T) {
cases := []struct {
Name string
Args []interface{}
ExpectVal []byte
ExpectErr string
}{
{
"multiple arguments",
[]interface{}{"foo", "bar"},
nil,
"single",
},
{
"not a string",
[]interface{}{42},
nil,
"must be a string",
},
{
"string",
[]interface{}{"fOO"},
[]byte("foo\x00"),
"",
},
}
for _, tc := range cases {
t.Run(tc.Name, func(t *testing.T) {
require := require.New(t)
var idx IndexConnectService
val, err := idx.FromArgs(tc.Args...)
if tc.ExpectErr != "" {
require.Error(err)
require.Contains(err.Error(), tc.ExpectErr)
return
}
require.NoError(err)
require.Equal(tc.ExpectVal, val)
})
}
}

@ -555,6 +555,11 @@ func (s *NodeService) Validate() error {
result = multierror.Append(result, fmt.Errorf(
"Port must be set for a Connect proxy"))
}
if s.ConnectNative {
result = multierror.Append(result, fmt.Errorf(
"A Proxy cannot also be ConnectNative, only typical services"))
}
}
return result

@ -246,6 +246,12 @@ func TestStructs_NodeService_ValidateConnectProxy(t *testing.T) {
func(x *NodeService) { x.Port = 0 },
"Port must",
},
{
"connect-proxy: ConnectNative set",
func(x *NodeService) { x.ConnectNative = true },
"cannot also be",
},
}
for _, tc := range cases {

Loading…
Cancel
Save