mirror of https://github.com/hashicorp/consul
Adds complete ACL coverage for /v1/coordinate/nodes and Coordinate.Update RPC.
parent
0139bbb963
commit
bcf1ffad99
|
@ -818,14 +818,11 @@ func (a *Agent) sendCoordinate() {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO - Consider adding a distance check so we don't send
|
|
||||||
// an update if the position hasn't changed by more than a
|
|
||||||
// threshold.
|
|
||||||
req := structs.CoordinateUpdateRequest{
|
req := structs.CoordinateUpdateRequest{
|
||||||
Datacenter: a.config.Datacenter,
|
Datacenter: a.config.Datacenter,
|
||||||
Node: a.config.NodeName,
|
Node: a.config.NodeName,
|
||||||
Coord: c,
|
Coord: c,
|
||||||
WriteRequest: structs.WriteRequest{Token: a.config.ACLToken},
|
WriteRequest: structs.WriteRequest{Token: a.config.GetTokenForAgent()},
|
||||||
}
|
}
|
||||||
var reply struct{}
|
var reply struct{}
|
||||||
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
|
if err := a.RPC("Coordinate.Update", &req, &reply); err != nil {
|
||||||
|
|
|
@ -413,6 +413,22 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) {
|
||||||
*nodes = csn
|
*nodes = csn
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// filterCoordinates is used to filter nodes in a coordinate dump based on ACL
|
||||||
|
// rules.
|
||||||
|
func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) {
|
||||||
|
c := *coords
|
||||||
|
for i := 0; i < len(c); i++ {
|
||||||
|
node := c[i].Node
|
||||||
|
if f.allowNode(node) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
|
||||||
|
c = append(c[:i], c[i+1:]...)
|
||||||
|
i--
|
||||||
|
}
|
||||||
|
*coords = c
|
||||||
|
}
|
||||||
|
|
||||||
// filterNodeDump is used to filter through all parts of a node dump and
|
// filterNodeDump is used to filter through all parts of a node dump and
|
||||||
// remove elements the provided ACL token cannot access.
|
// remove elements the provided ACL token cannot access.
|
||||||
func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
|
func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
|
||||||
|
@ -448,14 +464,10 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) {
|
||||||
// filterNodes is used to filter through all parts of a node list and remove
|
// filterNodes is used to filter through all parts of a node list and remove
|
||||||
// elements the provided ACL token cannot access.
|
// elements the provided ACL token cannot access.
|
||||||
func (f *aclFilter) filterNodes(nodes *structs.Nodes) {
|
func (f *aclFilter) filterNodes(nodes *structs.Nodes) {
|
||||||
if !f.enforceVersion8 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
n := *nodes
|
n := *nodes
|
||||||
for i := 0; i < len(n); i++ {
|
for i := 0; i < len(n); i++ {
|
||||||
node := n[i].Node
|
node := n[i].Node
|
||||||
if f.acl.NodeRead(node) {
|
if f.allowNode(node) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
|
f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node)
|
||||||
|
@ -548,6 +560,9 @@ func (s *Server) filterACL(token string, subj interface{}) error {
|
||||||
case *structs.IndexedCheckServiceNodes:
|
case *structs.IndexedCheckServiceNodes:
|
||||||
filt.filterCheckServiceNodes(&v.Nodes)
|
filt.filterCheckServiceNodes(&v.Nodes)
|
||||||
|
|
||||||
|
case *structs.IndexedCoordinates:
|
||||||
|
filt.filterCoordinates(&v.Coordinates)
|
||||||
|
|
||||||
case *structs.IndexedHealthChecks:
|
case *structs.IndexedHealthChecks:
|
||||||
filt.filterHealthChecks(&v.HealthChecks)
|
filt.filterHealthChecks(&v.HealthChecks)
|
||||||
|
|
||||||
|
|
|
@ -1012,6 +1012,41 @@ func TestACL_filterCheckServiceNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestACL_filterCoordinates(t *testing.T) {
|
||||||
|
// Create some coordinates.
|
||||||
|
coords := structs.Coordinates{
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
&structs.Coordinate{
|
||||||
|
Node: "node2",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try permissive filtering.
|
||||||
|
filt := newAclFilter(acl.AllowAll(), nil, false)
|
||||||
|
filt.filterCoordinates(&coords)
|
||||||
|
if len(coords) != 2 {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try restrictive filtering without version 8 ACL enforcement.
|
||||||
|
filt = newAclFilter(acl.DenyAll(), nil, false)
|
||||||
|
filt.filterCoordinates(&coords)
|
||||||
|
if len(coords) != 2 {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try restrictive filtering with version 8 ACL enforcement.
|
||||||
|
filt = newAclFilter(acl.DenyAll(), nil, true)
|
||||||
|
filt.filterCoordinates(&coords)
|
||||||
|
if len(coords) != 0 {
|
||||||
|
t.Fatalf("bad: %#v", coords)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestACL_filterNodeDump(t *testing.T) {
|
func TestACL_filterNodeDump(t *testing.T) {
|
||||||
// Create a node dump
|
// Create a node dump
|
||||||
dump := structs.NodeDump{
|
dump := structs.NodeDump{
|
||||||
|
|
|
@ -119,6 +119,17 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct
|
||||||
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
|
return fmt.Errorf("rejected bad coordinate: %v", args.Coord)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch the ACL token, if any, and enforce the node policy if enabled.
|
||||||
|
acl, err := c.srv.resolveToken(args.Token)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if acl != nil && c.srv.config.ACLEnforceVersion8 {
|
||||||
|
if !acl.NodeWrite(args.Node) {
|
||||||
|
return permissionDeniedErr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Add the coordinate to the map of pending updates.
|
// Add the coordinate to the map of pending updates.
|
||||||
c.updatesLock.Lock()
|
c.updatesLock.Lock()
|
||||||
c.updates[args.Node] = args.Coord
|
c.updates[args.Node] = args.Coord
|
||||||
|
@ -173,6 +184,9 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I
|
||||||
}
|
}
|
||||||
|
|
||||||
reply.Index, reply.Coordinates = index, coords
|
reply.Index, reply.Coordinates = index, coords
|
||||||
|
if err := c.srv.filterACL(args.Token, reply); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -187,6 +187,87 @@ func TestCoordinate_Update(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestCoordinate_Update_ACLDeny(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.ACLEnforceVersion8 = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some nodes.
|
||||||
|
nodes := []string{"node1", "node2"}
|
||||||
|
for _, node := range nodes {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: node,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send an update for the first node. This should go through since we
|
||||||
|
// don't have version 8 ACLs enforced yet.
|
||||||
|
req := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "node1",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now turn on version 8 enforcement and try again.
|
||||||
|
s1.config.ACLEnforceVersion8 = true
|
||||||
|
err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create an ACL that can write to the node.
|
||||||
|
arg := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: `
|
||||||
|
node "node1" {
|
||||||
|
policy = "write"
|
||||||
|
}
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
var id string
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the token, it should now go through.
|
||||||
|
req.Token = id
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// But it should be blocked for the other node.
|
||||||
|
req.Node = "node2"
|
||||||
|
err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out)
|
||||||
|
if err == nil || !strings.Contains(err.Error(), permissionDenied) {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestCoordinate_ListDatacenters(t *testing.T) {
|
func TestCoordinate_ListDatacenters(t *testing.T) {
|
||||||
dir1, s1 := testServer(t)
|
dir1, s1 := testServer(t)
|
||||||
defer os.RemoveAll(dir1)
|
defer os.RemoveAll(dir1)
|
||||||
|
@ -240,8 +321,7 @@ func TestCoordinate_ListNodes(t *testing.T) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send coordinate updates for a few nodes, waiting a little while for
|
// Send coordinate updates for a few nodes.
|
||||||
// the batch update to run.
|
|
||||||
arg1 := structs.CoordinateUpdateRequest{
|
arg1 := structs.CoordinateUpdateRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
Node: "foo",
|
Node: "foo",
|
||||||
|
@ -269,9 +349,120 @@ func TestCoordinate_ListNodes(t *testing.T) {
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
time.Sleep(3 * s1.config.CoordinateUpdatePeriod)
|
|
||||||
|
|
||||||
// Now query back for all the nodes.
|
// Now query back for all the nodes.
|
||||||
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
resp := structs.IndexedCoordinates{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(resp.Coordinates) != 3 ||
|
||||||
|
resp.Coordinates[0].Node != "bar" ||
|
||||||
|
resp.Coordinates[1].Node != "baz" ||
|
||||||
|
resp.Coordinates[2].Node != "foo" {
|
||||||
|
return false, fmt.Errorf("bad: %v", resp.Coordinates)
|
||||||
|
}
|
||||||
|
verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar
|
||||||
|
verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz
|
||||||
|
verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo
|
||||||
|
return true, nil
|
||||||
|
}, func(err error) { t.Fatalf("err: %v", err) })
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestCoordinate_ListNodes_ACLFilter(t *testing.T) {
|
||||||
|
dir1, s1 := testServerWithConfig(t, func(c *Config) {
|
||||||
|
c.ACLDatacenter = "dc1"
|
||||||
|
c.ACLMasterToken = "root"
|
||||||
|
c.ACLDefaultPolicy = "deny"
|
||||||
|
c.ACLEnforceVersion8 = false
|
||||||
|
})
|
||||||
|
defer os.RemoveAll(dir1)
|
||||||
|
defer s1.Shutdown()
|
||||||
|
codec := rpcClient(t, s1)
|
||||||
|
defer codec.Close()
|
||||||
|
|
||||||
|
testutil.WaitForLeader(t, s1.RPC, "dc1")
|
||||||
|
|
||||||
|
// Register some nodes.
|
||||||
|
nodes := []string{"foo", "bar", "baz"}
|
||||||
|
for _, node := range nodes {
|
||||||
|
req := structs.RegisterRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: node,
|
||||||
|
Address: "127.0.0.1",
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Token: "root",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var reply struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send coordinate updates for a few nodes.
|
||||||
|
arg1 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "foo",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Token: "root",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
var out struct{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
arg2 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "bar",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Token: "root",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
arg3 := structs.CoordinateUpdateRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Node: "baz",
|
||||||
|
Coord: generateRandomCoordinate(),
|
||||||
|
WriteRequest: structs.WriteRequest{
|
||||||
|
Token: "root",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for all the coordinate updates to apply. Since we aren't
|
||||||
|
// enforcing version 8 ACLs, this should also allow us to read
|
||||||
|
// everything back without a token.
|
||||||
|
testutil.WaitForResult(func() (bool, error) {
|
||||||
|
arg := structs.DCSpecificRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
}
|
||||||
|
resp := structs.IndexedCoordinates{}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(resp.Coordinates) == 3 {
|
||||||
|
return true, nil
|
||||||
|
}
|
||||||
|
return false, fmt.Errorf("bad: %v", resp.Coordinates)
|
||||||
|
}, func(err error) { t.Fatalf("err: %v", err) })
|
||||||
|
|
||||||
|
// Now that we've waited for the batch processing to ingest the
|
||||||
|
// coordinates we can do the rest of the requests without the loop. We
|
||||||
|
// will start by turning on version 8 ACL support which should block
|
||||||
|
// everything.
|
||||||
|
s1.config.ACLEnforceVersion8 = true
|
||||||
arg := structs.DCSpecificRequest{
|
arg := structs.DCSpecificRequest{
|
||||||
Datacenter: "dc1",
|
Datacenter: "dc1",
|
||||||
}
|
}
|
||||||
|
@ -279,13 +470,38 @@ func TestCoordinate_ListNodes(t *testing.T) {
|
||||||
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
|
||||||
t.Fatalf("err: %v", err)
|
t.Fatalf("err: %v", err)
|
||||||
}
|
}
|
||||||
if len(resp.Coordinates) != 3 ||
|
if len(resp.Coordinates) != 0 {
|
||||||
resp.Coordinates[0].Node != "bar" ||
|
t.Fatalf("bad: %#v", resp.Coordinates)
|
||||||
resp.Coordinates[1].Node != "baz" ||
|
}
|
||||||
resp.Coordinates[2].Node != "foo" {
|
|
||||||
t.Fatalf("bad: %v", resp.Coordinates)
|
// Create an ACL that can read one of the nodes.
|
||||||
|
var id string
|
||||||
|
{
|
||||||
|
req := structs.ACLRequest{
|
||||||
|
Datacenter: "dc1",
|
||||||
|
Op: structs.ACLSet,
|
||||||
|
ACL: structs.ACL{
|
||||||
|
Name: "User token",
|
||||||
|
Type: structs.ACLTypeClient,
|
||||||
|
Rules: `
|
||||||
|
node "foo" {
|
||||||
|
policy = "read"
|
||||||
|
}
|
||||||
|
`,
|
||||||
|
},
|
||||||
|
WriteRequest: structs.WriteRequest{Token: "root"},
|
||||||
|
}
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &id); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// With the token, it should now go through.
|
||||||
|
arg.Token = id
|
||||||
|
if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil {
|
||||||
|
t.Fatalf("err: %v", err)
|
||||||
|
}
|
||||||
|
if len(resp.Coordinates) != 1 || resp.Coordinates[0].Node != "foo" {
|
||||||
|
t.Fatalf("bad: %#v", resp.Coordinates)
|
||||||
}
|
}
|
||||||
verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar
|
|
||||||
verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz
|
|
||||||
verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue