Browse Source

state: refactor some node/coordinate state store functions to take an EnterpriseMeta (#10687)

Note the field is not used yet.
pull/10693/head
R.B. Boyer 3 years ago committed by GitHub
parent
commit
3343c7cb3a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      agent/consul/catalog_endpoint.go
  2. 6
      agent/consul/coordinate_endpoint.go
  3. 22
      agent/consul/coordinate_endpoint_test.go
  4. 2
      agent/consul/fsm/commands_oss.go
  5. 18
      agent/consul/fsm/commands_oss_test.go
  6. 7
      agent/consul/fsm/snapshot_oss.go
  7. 6
      agent/consul/fsm/snapshot_oss_test.go
  8. 12
      agent/consul/leader.go
  9. 57
      agent/consul/leader_test.go
  10. 3
      agent/consul/prepared_query_endpoint.go
  11. 14
      agent/consul/rtt.go
  12. 20
      agent/consul/state/catalog.go
  13. 72
      agent/consul/state/catalog_test.go
  14. 6
      agent/consul/state/coordinate.go
  15. 35
      agent/consul/state/coordinate_test.go
  16. 9
      agent/consul/state/session_test.go
  17. 5
      agent/consul/state/txn_test.go
  18. 7
      agent/consul/state/usage_test.go
  19. 8
      agent/consul/txn_endpoint_test.go

5
agent/consul/catalog_endpoint.go

@ -298,10 +298,11 @@ func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.Inde
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
var err error
// TODO(partitions)
if len(args.NodeMetaFilters) > 0 {
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters)
reply.Index, reply.Nodes, err = state.NodesByMeta(ws, args.NodeMetaFilters, nil)
} else {
reply.Index, reply.Nodes, err = state.Nodes(ws)
reply.Index, reply.Nodes, err = state.Nodes(ws, nil)
}
if err != nil {
return err

6
agent/consul/coordinate_endpoint.go

@ -199,7 +199,8 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, coords, err := state.Coordinates(ws)
// TODO(partitions)
index, coords, err := state.Coordinates(ws, nil)
if err != nil {
return err
}
@ -236,7 +237,8 @@ func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.Inde
return c.srv.blockingQuery(&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
index, nodeCoords, err := state.Coordinate(args.Node, ws)
// TODO(partitions)
index, nodeCoords, err := state.Coordinate(ws, args.Node, nil)
if err != nil {
return err
}

22
agent/consul/coordinate_endpoint_test.go

@ -10,14 +10,15 @@ import (
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
)
// generateRandomCoordinate creates a random coordinate. This mucks with the
@ -83,13 +84,15 @@ func TestCoordinate_Update(t *testing.T) {
// Make sure the updates did not yet apply because the update period
// hasn't expired.
state := s1.fsm.State()
_, c, err := state.Coordinate("node1", nil)
// TODO(partitions)
_, c, err := state.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
require.Equal(t, lib.CoordinateSet{}, c)
_, c, err = state.Coordinate("node2", nil)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -104,7 +107,8 @@ func TestCoordinate_Update(t *testing.T) {
// Wait a while and the updates should get picked up.
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
_, c, err = state.Coordinate("node1", nil)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -113,7 +117,8 @@ func TestCoordinate_Update(t *testing.T) {
}
require.Equal(t, expected, c)
_, c, err = state.Coordinate("node2", nil)
// TODO(partitions)
_, c, err = state.Coordinate(nil, "node2", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -152,7 +157,8 @@ func TestCoordinate_Update(t *testing.T) {
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
numDropped := 0
for i := 0; i < spamLen; i++ {
_, c, err = state.Coordinate(fmt.Sprintf("bogusnode%d", i), nil)
// TODO(partitions)
_, c, err = state.Coordinate(nil, fmt.Sprintf("bogusnode%d", i), nil)
if err != nil {
t.Fatalf("err: %v", err)
}

2
agent/consul/fsm/commands_oss.go

@ -169,7 +169,7 @@ func (c *FSM) applyDeregister(buf []byte, index uint64) interface{} {
return err
}
} else {
if err := c.state.DeleteNode(index, req.Node); err != nil {
if err := c.state.DeleteNode(index, req.Node, &req.EnterpriseMeta); err != nil {
c.logger.Warn("DeleteNode failed", "error", err)
return err
}

18
agent/consul/fsm/commands_oss_test.go

@ -68,7 +68,7 @@ func TestFSM_RegisterNode(t *testing.T) {
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -126,7 +126,7 @@ func TestFSM_RegisterNode_Service(t *testing.T) {
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -198,7 +198,7 @@ func TestFSM_DeregisterService(t *testing.T) {
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -261,7 +261,7 @@ func TestFSM_DeregisterCheck(t *testing.T) {
}
// Verify we are registered
_, node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -330,7 +330,7 @@ func TestFSM_DeregisterNode(t *testing.T) {
}
// Verify we are not registered
_, node, err := fsm.state.GetNode("foo")
_, node, err := fsm.state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -730,7 +730,7 @@ func TestFSM_CoordinateUpdate(t *testing.T) {
}
// Read back the two coordinates to make sure they got updated.
_, coords, err := fsm.state.Coordinates(nil)
_, coords, err := fsm.state.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1543,7 +1543,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are not registered
for i := 0; i < 10; i++ {
_, node, err := fsm.state.GetNode(fmt.Sprintf("foo%d", i))
_, node, err := fsm.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err)
assert.Nil(node)
}
@ -1566,7 +1566,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are still not registered
for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i))
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err)
assert.Nil(node)
}
@ -1590,7 +1590,7 @@ func TestFSM_Chunking_Lifecycle(t *testing.T) {
// Verify we are registered
for i := 0; i < 10; i++ {
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i))
_, node, err := fsm2.state.GetNode(fmt.Sprintf("foo%d", i), nil)
require.NoError(err)
assert.NotNil(node)

7
agent/consul/fsm/snapshot_oss.go

@ -115,7 +115,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
}
// Register each service this node has
services, err := s.state.Services(n.Node)
// TODO(partitions)
services, err := s.state.Services(n.Node, nil)
if err != nil {
return err
}
@ -131,7 +132,8 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
// Register each check this node has
req.Service = nil
checks, err := s.state.Checks(n.Node)
// TODO(partitions)
checks, err := s.state.Checks(n.Node, nil)
if err != nil {
return err
}
@ -153,6 +155,7 @@ func (s *snapshot) persistNodes(sink raft.SnapshotSink,
if err != nil {
return err
}
// TODO(partitions)
for coord := coords.Next(); coord != nil; coord = coords.Next() {
if _, err := sink.Write([]byte{byte(structs.CoordinateBatchUpdateType)}); err != nil {
return err

6
agent/consul/fsm/snapshot_oss_test.go

@ -489,7 +489,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, fsm2.Restore(sink))
// Verify the contents
_, nodes, err := fsm2.state.Nodes(nil)
_, nodes, err := fsm2.state.Nodes(nil, nil)
require.NoError(t, err)
require.Len(t, nodes, 2, "incorect number of nodes: %v", nodes)
@ -625,7 +625,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
}()
// Verify coordinates are restored
_, coords, err := fsm2.state.Coordinates(nil)
_, coords, err := fsm2.state.Coordinates(nil, nil)
require.NoError(t, err)
require.Equal(t, updates, coords)
@ -749,7 +749,7 @@ func TestFSM_BadRestore_OSS(t *testing.T) {
require.Error(t, fsm.Restore(sink))
// Verify the contents didn't get corrupted.
_, nodes, err := fsm.state.Nodes(nil)
_, nodes, err := fsm.state.Nodes(nil, nil)
require.NoError(t, err)
require.Len(t, nodes, 1)
require.Equal(t, "foo", nodes[0].Node)

12
agent/consul/leader.go

@ -1139,7 +1139,8 @@ func (s *Server) reconcileReaped(known map[string]struct{}) error {
CHECKS:
for _, service := range services.Services {
if service.ID == structs.ConsulServiceID {
_, node, err := state.GetNode(check.Node)
// TODO(partitions)
_, node, err := state.GetNode(check.Node, nil)
if err != nil {
s.logger.Error("Unable to look up node with name", "name", check.Node, "error", err)
continue CHECKS
@ -1262,7 +1263,8 @@ func (s *Server) handleAliveMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil {
return err
}
@ -1330,7 +1332,8 @@ AFTER_CHECK:
func (s *Server) handleFailedMember(member serf.Member) error {
// Check if the node exists
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil {
return err
}
@ -1407,7 +1410,8 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Check if the node does not exist
state := s.fsm.State()
_, node, err := state.GetNode(member.Name)
// TODO(partitions)
_, node, err := state.GetNode(member.Name, nil)
if err != nil {
return err
}

57
agent/consul/leader_test.go

@ -9,16 +9,17 @@ import (
"testing"
"time"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
tokenStore "github.com/hashicorp/consul/agent/token"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/go-hclog"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/hashicorp/serf/serf"
"github.com/stretchr/testify/require"
)
func TestLeader_RegisterMember(t *testing.T) {
@ -48,7 +49,8 @@ func TestLeader_RegisterMember(t *testing.T) {
// Client should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -77,7 +79,8 @@ func TestLeader_RegisterMember(t *testing.T) {
// Server should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(s1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -126,7 +129,8 @@ func TestLeader_FailedMember(t *testing.T) {
// Should be registered
state := s1.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -187,7 +191,8 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -202,7 +207,8 @@ func TestLeader_LeftMember(t *testing.T) {
// Should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -237,7 +243,8 @@ func TestLeader_ReapMember(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -262,7 +269,8 @@ func TestLeader_ReapMember(t *testing.T) {
// anti-entropy will put it back.
reaped := false
for start := time.Now(); time.Since(start) < 5*time.Second; {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -431,7 +439,8 @@ func TestLeader_ReapServer(t *testing.T) {
// s3 should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -452,7 +461,8 @@ func TestLeader_ReapServer(t *testing.T) {
}
// s3 should be deregistered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(s3.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(s3.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -507,7 +517,8 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
// Node should be gone
state := s1.fsm.State()
_, node, err := state.GetNode("no-longer-around")
// TODO(partitions)
_, node, err := state.GetNode("no-longer-around", nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -540,7 +551,8 @@ func TestLeader_Reconcile(t *testing.T) {
// Should not be registered
state := s1.fsm.State()
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -550,7 +562,8 @@ func TestLeader_Reconcile(t *testing.T) {
// Should be registered
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -582,7 +595,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
state := s1.fsm.State()
var nodeAddr string
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}
@ -618,7 +632,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
if err := s1.reconcile(); err != nil {
t.Fatalf("err: %v", err)
}
_, node, err := state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -642,7 +657,8 @@ func TestLeader_Reconcile_Races(t *testing.T) {
})
// Make sure the metadata didn't get clobbered.
_, node, err = state.GetNode(c1.config.NodeName)
// TODO(partitions)
_, node, err = state.GetNode(c1.config.NodeName, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -757,7 +773,8 @@ func TestLeader_LeftLeader(t *testing.T) {
// Verify the old leader is deregistered
state := remain.fsm.State()
retry.Run(t, func(r *retry.R) {
_, node, err := state.GetNode(leader.config.NodeName)
// TODO(partitions)
_, node, err := state.GetNode(leader.config.NodeName, nil)
if err != nil {
r.Fatalf("err: %v", err)
}

3
agent/consul/prepared_query_endpoint.go

@ -402,7 +402,8 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest,
qs.Node = args.Agent.Node
} else if qs.Node == "_ip" {
if args.Source.Ip != "" {
_, nodes, err := state.Nodes(nil)
// TODO(partitions)
_, nodes, err := state.Nodes(nil, nil)
if err != nil {
return err
}

14
agent/consul/rtt.go

@ -22,7 +22,8 @@ func (s *Server) newNodeSorter(cs lib.CoordinateSet, nodes structs.Nodes) (sort.
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
_, other, err := state.Coordinate(node.Node, nil)
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
if err != nil {
return nil, err
}
@ -62,7 +63,8 @@ func (s *Server) newServiceNodeSorter(cs lib.CoordinateSet, nodes structs.Servic
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
_, other, err := state.Coordinate(node.Node, nil)
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node, nil)
if err != nil {
return nil, err
}
@ -102,7 +104,8 @@ func (s *Server) newHealthCheckSorter(cs lib.CoordinateSet, checks structs.Healt
state := s.fsm.State()
vec := make([]float64, len(checks))
for i, check := range checks {
_, other, err := state.Coordinate(check.Node, nil)
// TODO(partitions)
_, other, err := state.Coordinate(nil, check.Node, nil)
if err != nil {
return nil, err
}
@ -142,7 +145,8 @@ func (s *Server) newCheckServiceNodeSorter(cs lib.CoordinateSet, nodes structs.C
state := s.fsm.State()
vec := make([]float64, len(nodes))
for i, node := range nodes {
_, other, err := state.Coordinate(node.Node.Node, nil)
// TODO(partitions)
_, other, err := state.Coordinate(nil, node.Node.Node, nil)
if err != nil {
return nil, err
}
@ -203,7 +207,7 @@ func (s *Server) sortNodesByDistanceFrom(source structs.QuerySource, subj interf
// There won't always be coordinates for the source node. If there are
// none then we can bail out because there's no meaning for the sort.
state := s.fsm.State()
_, cs, err := state.Coordinate(source.Node, nil)
_, cs, err := state.Coordinate(nil, source.Node, source.NodeEnterpriseMeta())
if err != nil {
return err
}

20
agent/consul/state/catalog.go

@ -49,13 +49,15 @@ func (s *Snapshot) Nodes() (memdb.ResultIterator, error) {
// Services is used to pull the full list of services for a given node for use
// during snapshots.
func (s *Snapshot) Services(node string) (memdb.ResultIterator, error) {
func (s *Snapshot) Services(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableServices, indexNode, Query{Value: node})
}
// Checks is used to pull the full list of checks for a given node for use
// during snapshots.
func (s *Snapshot) Checks(node string) (memdb.ResultIterator, error) {
func (s *Snapshot) Checks(node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
// TODO(partitions): use the provided entmeta
return s.tx.Get(tableChecks, indexNode, Query{Value: node})
}
@ -329,7 +331,8 @@ func (s *Store) ensureNodeTxn(tx WriteTxn, idx uint64, preserveIndexes bool, nod
}
// GetNode is used to retrieve a node registration by node name ID.
func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
func (s *Store) GetNode(nodeNameOrID string, _ *structs.EnterpriseMeta) (uint64, *structs.Node, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false)
defer tx.Abort()
@ -337,7 +340,7 @@ func (s *Store) GetNode(id string) (uint64, *structs.Node, error) {
idx := maxIndexTxn(tx, "nodes")
// Retrieve the node from the state store
node, err := getNodeTxn(tx, id)
node, err := getNodeTxn(tx, nodeNameOrID)
if err != nil {
return 0, nil, fmt.Errorf("node lookup failed: %s", err)
}
@ -386,7 +389,8 @@ func (s *Store) GetNodeID(id types.NodeID) (uint64, *structs.Node, error) {
}
// Nodes is used to return all of the known nodes.
func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
func (s *Store) Nodes(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false)
defer tx.Abort()
@ -409,7 +413,8 @@ func (s *Store) Nodes(ws memdb.WatchSet) (uint64, structs.Nodes, error) {
}
// NodesByMeta is used to return all nodes with the given metadata key/value pairs.
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint64, structs.Nodes, error) {
func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string, _ *structs.EnterpriseMeta) (uint64, structs.Nodes, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false)
defer tx.Abort()
@ -440,7 +445,8 @@ func (s *Store) NodesByMeta(ws memdb.WatchSet, filters map[string]string) (uint6
}
// DeleteNode is used to delete a given node by its ID.
func (s *Store) DeleteNode(idx uint64, nodeName string) error {
func (s *Store) DeleteNode(idx uint64, nodeName string, _ *structs.EnterpriseMeta) error {
// TODO(partitions): use the provided entmeta
tx := s.db.WriteTxn(idx)
defer tx.Abort()

72
agent/consul/state/catalog_test.go

@ -176,7 +176,7 @@ func TestStateStore_EnsureRegistration(t *testing.T) {
RaftIndex: structs.RaftIndex{CreateIndex: 1, ModifyIndex: 1},
}
_, out, err := s.GetNode("node1")
_, out, err := s.GetNode("node1", nil)
if err != nil {
t.Fatalf("got err %s want nil", err)
}
@ -392,7 +392,7 @@ func TestStateStore_EnsureRegistration_Restore(t *testing.T) {
// Retrieve the node and verify its contents.
verifyNode := func(nodeLookup string) {
_, out, err := s.GetNode(nodeLookup)
_, out, err := s.GetNode(nodeLookup, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -550,7 +550,7 @@ func deprecatedEnsureNodeWithoutIDCanRegister(t *testing.T, s *Store, nodeName s
if err := s.EnsureNode(txIdx, in); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err := s.GetNode(nodeName)
idx, out, err := s.GetNode(nodeName, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -580,7 +580,7 @@ func TestStateStore_EnsureNodeDeprecated(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Retrieve the node again
idx, out, err := s.GetNode(firstNodeName)
idx, out, err := s.GetNode(firstNodeName, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -603,7 +603,7 @@ func TestStateStore_EnsureNodeDeprecated(t *testing.T) {
t.Fatalf("err: %v", err)
}
// Retrieve the node again
idx, out, err = s.GetNode(firstNodeName)
idx, out, err = s.GetNode(firstNodeName, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -725,7 +725,7 @@ func TestNodeRenamingNodes(t *testing.T) {
}
// Retrieve the node again
idx, out, err := s.GetNode("node2bis")
idx, out, err := s.GetNode("node2bis", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -749,7 +749,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
s := testStateStore(t)
// Fetching a non-existent node returns nil
if _, node, err := s.GetNode("node1"); node != nil || err != nil {
if _, node, err := s.GetNode("node1", nil); node != nil || err != nil {
t.Fatalf("expected (nil, nil), got: (%#v, %#v)", node, err)
}
@ -766,7 +766,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node again
idx, out, err := s.GetNode("node1")
idx, out, err := s.GetNode("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -795,7 +795,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node
idx, out, err = s.GetNode("node1")
idx, out, err = s.GetNode("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -812,7 +812,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(3, in2); err != nil {
t.Fatalf("err: %s", err)
}
_, out, err = s.GetNode("node1")
_, out, err = s.GetNode("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -829,7 +829,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(3, in3); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("node1")
idx, out, err = s.GetNode("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -856,13 +856,13 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node
_, out, err = s.GetNode("node1")
_, out, err = s.GetNode("node1", nil)
require.NoError(t, err)
if out != nil {
t.Fatalf("Node should not exist anymore: %q", out)
}
idx, out, err = s.GetNode("node1-renamed")
idx, out, err = s.GetNode("node1-renamed", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -921,7 +921,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
}
// Retrieve the node
_, out, err = s.GetNode("Node1bis")
_, out, err = s.GetNode("Node1bis", nil)
require.NoError(t, err)
if out == nil {
t.Fatalf("Node should exist, but was null")
@ -937,7 +937,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("Node1bis")
idx, out, err = s.GetNode("Node1bis", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -980,7 +980,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(12, in); err != nil {
t.Fatalf("err: %s", err)
}
idx, out, err = s.GetNode("Node1-Renamed2")
idx, out, err = s.GetNode("Node1-Renamed2", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1010,7 +1010,7 @@ func TestStateStore_EnsureNode(t *testing.T) {
if err := s.EnsureNode(15, in); err != nil {
t.Fatalf("[DEPRECATED] it should work, err:= %q", err)
}
_, out, err = s.GetNode("Node1-Renamed2")
_, out, err = s.GetNode("Node1-Renamed2", nil)
if err != nil {
t.Fatalf("[DEPRECATED] err: %s", err)
}
@ -1027,7 +1027,7 @@ func TestStateStore_GetNodes(t *testing.T) {
// Listing with no results returns nil.
ws := memdb.NewWatchSet()
idx, res, err := s.Nodes(ws)
idx, res, err := s.Nodes(ws, nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
@ -1042,7 +1042,7 @@ func TestStateStore_GetNodes(t *testing.T) {
// Retrieve the nodes.
ws = memdb.NewWatchSet()
idx, nodes, err := s.Nodes(ws)
idx, nodes, err := s.Nodes(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -1072,7 +1072,7 @@ func TestStateStore_GetNodes(t *testing.T) {
if watchFired(ws) {
t.Fatalf("bad")
}
if err := s.DeleteNode(3, "node1"); err != nil {
if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
@ -1092,7 +1092,7 @@ func BenchmarkGetNodes(b *testing.B) {
ws := memdb.NewWatchSet()
for i := 0; i < b.N; i++ {
s.Nodes(ws)
s.Nodes(ws, nil)
}
}
@ -1101,7 +1101,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
// Listing with no results returns nil
ws := memdb.NewWatchSet()
idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"})
idx, res, err := s.NodesByMeta(ws, map[string]string{"somekey": "somevalue"}, nil)
if idx != 0 || res != nil || err != nil {
t.Fatalf("expected (0, nil, nil), got: (%d, %#v, %#v)", idx, res, err)
}
@ -1141,7 +1141,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
}
for _, tc := range cases {
_, result, err := s.NodesByMeta(nil, tc.filters)
_, result, err := s.NodesByMeta(nil, tc.filters, nil)
if err != nil {
t.Fatalf("bad: %v", err)
}
@ -1159,7 +1159,7 @@ func TestStateStore_GetNodesByMeta(t *testing.T) {
// Set up a watch.
ws = memdb.NewWatchSet()
_, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"})
_, _, err = s.NodesByMeta(ws, map[string]string{"role": "client"}, nil)
if err != nil {
t.Fatalf("err: %v", err)
}
@ -1285,12 +1285,12 @@ func TestStateStore_DeleteNode(t *testing.T) {
testRegisterCheck(t, s, 2, "node1", "", "check1", api.HealthPassing)
// Delete the node
if err := s.DeleteNode(3, "node1"); err != nil {
if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
// The node was removed
if idx, n, err := s.GetNode("node1"); err != nil || n != nil || idx != 3 {
if idx, n, err := s.GetNode("node1", nil); err != nil || n != nil || idx != 3 {
t.Fatalf("bad: %#v %d (err: %#v)", n, idx, err)
}
@ -1324,7 +1324,7 @@ func TestStateStore_DeleteNode(t *testing.T) {
// Deleting a nonexistent node should be idempotent and not return
// an error
if err := s.DeleteNode(4, "node1"); err != nil {
if err := s.DeleteNode(4, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
if idx := s.maxIndex("nodes"); idx != 3 {
@ -1618,7 +1618,7 @@ func TestStateStore_Services(t *testing.T) {
}
// Deleting a node with a service should fire the watch.
if err := s.DeleteNode(6, "node1"); err != nil {
if err := s.DeleteNode(6, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
@ -1868,7 +1868,7 @@ func TestStateStore_ServiceNodes(t *testing.T) {
}
// But removing a node with the "db" service should fire the watch.
if err := s.DeleteNode(18, "bar"); err != nil {
if err := s.DeleteNode(18, "bar", nil); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
@ -1972,7 +1972,7 @@ func TestStateStore_ServiceTagNodes(t *testing.T) {
}
// But removing a node with the "db:primary" service should fire the watch.
if err := s.DeleteNode(21, "foo"); err != nil {
if err := s.DeleteNode(21, "foo", nil); err != nil {
t.Fatalf("err: %s", err)
}
if !watchFired(ws) {
@ -2134,7 +2134,7 @@ func TestStateStore_ConnectServiceNodes(t *testing.T) {
assert.False(watchFired(ws))
// But removing a node with the "db" service should fire the watch.
assert.Nil(s.DeleteNode(18, "bar"))
assert.Nil(s.DeleteNode(18, "bar", nil))
assert.True(watchFired(ws))
}
@ -2293,7 +2293,7 @@ func TestStateStore_Service_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 4 {
t.Fatalf("bad index: %d", idx)
}
services, err := snap.Services("node1")
services, err := snap.Services("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -3875,7 +3875,7 @@ func TestStateStore_Check_Snapshot(t *testing.T) {
if idx := snap.LastIndex(); idx != 5 {
t.Fatalf("bad index: %d", idx)
}
iter, err := snap.Checks("node1")
iter, err := snap.Checks("node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -4131,7 +4131,7 @@ func TestStateStore_ServiceDump(t *testing.T) {
{
name: "delete a node",
modFn: func(t *testing.T) {
s.DeleteNode(12, "node2")
s.DeleteNode(12, "node2", nil)
},
allFired: true, // fires due to "index"
kindFired: true, // fires due to "index"
@ -6791,7 +6791,7 @@ func TestCatalog_topologyCleanupPanic(t *testing.T) {
assert.True(t, watchFired(ws))
// Now delete the node Foo, and this would panic because of the deletion within an iterator
require.NoError(t, s.DeleteNode(3, "foo"))
require.NoError(t, s.DeleteNode(3, "foo", nil))
assert.True(t, watchFired(ws))
}
@ -7074,7 +7074,7 @@ func TestCatalog_cleanupGatewayWildcards_panic(t *testing.T) {
require.NoError(t, s.EnsureService(5, "foo", &api2))
// Now delete the node "foo", and this would panic because of the deletion within an iterator
require.NoError(t, s.DeleteNode(6, "foo"))
require.NoError(t, s.DeleteNode(6, "foo", nil))
}
func TestCatalog_DownstreamsForService(t *testing.T) {

6
agent/consul/state/coordinate.go

@ -82,7 +82,8 @@ func (s *Restore) Coordinates(idx uint64, updates structs.Coordinates) error {
// Coordinate returns a map of coordinates for the given node, indexed by
// network segment.
func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.CoordinateSet, error) {
func (s *Store) Coordinate(ws memdb.WatchSet, node string, _ *structs.EnterpriseMeta) (uint64, lib.CoordinateSet, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false)
defer tx.Abort()
@ -103,7 +104,8 @@ func (s *Store) Coordinate(node string, ws memdb.WatchSet) (uint64, lib.Coordina
}
// Coordinates queries for all nodes with coordinates.
func (s *Store) Coordinates(ws memdb.WatchSet) (uint64, structs.Coordinates, error) {
func (s *Store) Coordinates(ws memdb.WatchSet, _ *structs.EnterpriseMeta) (uint64, structs.Coordinates, error) {
// TODO(partitions): use the provided entmeta
tx := s.db.Txn(false)
defer tx.Abort()

35
agent/consul/state/coordinate_test.go

@ -5,13 +5,16 @@ import (
"math/rand"
"testing"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/serf/coordinate"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
)
// TODO(partitions): test partitioned nodes here
// generateRandomCoordinate creates a random coordinate. This mucks with the
// underlying structure directly, so it's not really useful for any particular
// position in the network, but it's a good payload to send through to make
@ -37,7 +40,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Make sure the coordinates list starts out empty, and that a query for
// a per-node coordinate for a nonexistent node doesn't do anything bad.
ws := memdb.NewWatchSet()
idx, all, err := s.Coordinates(ws)
idx, all, err := s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -47,7 +50,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
require.Nil(t, all)
coordinateWs := memdb.NewWatchSet()
_, coords, err := s.Coordinate("nope", coordinateWs)
_, coords, err := s.Coordinate(coordinateWs, "nope", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -75,7 +78,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should still be empty, though applying an empty batch does bump
// the table index.
ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -85,7 +88,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
require.Nil(t, all)
coordinateWs = memdb.NewWatchSet()
idx, _, err = s.Coordinate("node1", coordinateWs)
idx, _, err = s.Coordinate(coordinateWs, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -105,7 +108,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Should go through now.
ws = memdb.NewWatchSet()
idx, all, err = s.Coordinates(ws)
idx, all, err = s.Coordinates(ws, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -118,7 +121,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
nodeWs := make([]memdb.WatchSet, len(updates))
for i, update := range updates {
nodeWs[i] = memdb.NewWatchSet()
idx, coords, err := s.Coordinate(update.Node, nodeWs[i])
idx, coords, err := s.Coordinate(nodeWs[i], update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -146,7 +149,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
}
// Verify it got applied.
idx, all, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -157,7 +160,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// And check the per-node coordinate version of the same thing.
for _, update := range updates {
idx, coords, err := s.Coordinate(update.Node, nil)
idx, coords, err := s.Coordinate(nil, update.Node, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -183,7 +186,7 @@ func TestStateStore_Coordinate_Updates(t *testing.T) {
// Verify we are at the previous state, though the empty batch does bump
// the table index.
idx, all, err = s.Coordinates(nil)
idx, all, err = s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -215,7 +218,7 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
}
// Make sure it's in there.
_, coords, err := s.Coordinate("node1", nil)
_, coords, err := s.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -226,19 +229,19 @@ func TestStateStore_Coordinate_Cleanup(t *testing.T) {
require.Equal(t, expected, coords)
// Now delete the node.
if err := s.DeleteNode(3, "node1"); err != nil {
if err := s.DeleteNode(3, "node1", nil); err != nil {
t.Fatalf("err: %s", err)
}
// Make sure the coordinate is gone.
_, coords, err = s.Coordinate("node1", nil)
_, coords, err = s.Coordinate(nil, "node1", nil)
if err != nil {
t.Fatalf("err: %s", err)
}
require.Equal(t, lib.CoordinateSet{}, coords)
// Make sure the index got updated.
idx, all, err := s.Coordinates(nil)
idx, all, err := s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
@ -326,7 +329,7 @@ func TestStateStore_Coordinate_Snapshot_Restore(t *testing.T) {
restore.Commit()
// Read the restored coordinates back out and verify that they match.
idx, res, err := s.Coordinates(nil)
idx, res, err := s.Coordinates(nil, nil)
if err != nil {
t.Fatalf("err: %s", err)
}

9
agent/consul/state/session_test.go

@ -9,10 +9,11 @@ import (
"github.com/stretchr/testify/assert"
"github.com/hashicorp/go-memdb"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/hashicorp/go-memdb"
)
func TestStateStore_SessionCreate_SessionGet(t *testing.T) {
@ -552,7 +553,7 @@ func TestStateStore_Session_Invalidate_DeleteNode(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if err := s.DeleteNode(15, "foo"); err != nil {
if err := s.DeleteNode(15, "foo", nil); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
@ -776,7 +777,7 @@ func TestStateStore_Session_Invalidate_Key_Unlock_Behavior(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if err := s.DeleteNode(6, "foo"); err != nil {
if err := s.DeleteNode(6, "foo", nil); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {
@ -858,7 +859,7 @@ func TestStateStore_Session_Invalidate_Key_Delete_Behavior(t *testing.T) {
if err != nil {
t.Fatalf("err: %v", err)
}
if err := s.DeleteNode(6, "foo"); err != nil {
if err := s.DeleteNode(6, "foo", nil); err != nil {
t.Fatalf("err: %v", err)
}
if !watchFired(ws) {

5
agent/consul/state/txn_test.go

@ -5,10 +5,11 @@ import (
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/types"
"github.com/stretchr/testify/require"
)
//nolint:staticcheck
@ -195,7 +196,7 @@ func TestStateStore_Txn_Node(t *testing.T) {
require.Equal(t, expected, results)
// Pull the resulting state store contents.
idx, actual, err := s.Nodes(nil)
idx, actual, err := s.Nodes(nil, nil)
require.NoError(t, err)
if idx != 8 {
t.Fatalf("bad index: %d", idx)

7
agent/consul/state/usage_test.go

@ -3,9 +3,10 @@ package state
import (
"testing"
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
)
func TestStateStore_Usage_NodeCount(t *testing.T) {
@ -37,7 +38,7 @@ func TestStateStore_Usage_NodeCount_Delete(t *testing.T) {
require.Equal(t, idx, uint64(1))
require.Equal(t, count, 2)
require.NoError(t, s.DeleteNode(2, "node2"))
require.NoError(t, s.DeleteNode(2, "node2", nil))
idx, count, err = s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
@ -83,7 +84,7 @@ func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2)
require.NoError(t, s.DeleteNode(3, "node1"))
require.NoError(t, s.DeleteNode(3, "node1", nil))
idx, usage, err = s.ServiceUsage()
require.NoError(t, err)

8
agent/consul/txn_endpoint_test.go

@ -8,13 +8,14 @@ import (
"testing"
"time"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/types"
msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc"
"github.com/stretchr/testify/require"
)
var testTxnRules = `
@ -233,7 +234,8 @@ func TestTxn_Apply(t *testing.T) {
t.Fatalf("bad: %v", d)
}
_, n, err := state.GetNode("foo")
// TODO(partitions)
_, n, err := state.GetNode("foo", nil)
if err != nil {
t.Fatalf("err: %v", err)
}

Loading…
Cancel
Save