From bcf1ffad994830d9c054f70bc6b42e6c3adb8696 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Mon, 12 Dec 2016 11:58:31 -0800 Subject: [PATCH] Adds complete ACL coverage for /v1/coordinate/nodes and Coordinate.Update RPC. --- command/agent/agent.go | 5 +- consul/acl.go | 25 ++- consul/acl_test.go | 35 +++++ consul/coordinate_endpoint.go | 14 ++ consul/coordinate_endpoint_test.go | 238 +++++++++++++++++++++++++++-- 5 files changed, 297 insertions(+), 20 deletions(-) diff --git a/command/agent/agent.go b/command/agent/agent.go index 7e464b2fa4..6b9bcaa50d 100644 --- a/command/agent/agent.go +++ b/command/agent/agent.go @@ -818,14 +818,11 @@ func (a *Agent) sendCoordinate() { continue } - // TODO - Consider adding a distance check so we don't send - // an update if the position hasn't changed by more than a - // threshold. req := structs.CoordinateUpdateRequest{ Datacenter: a.config.Datacenter, Node: a.config.NodeName, Coord: c, - WriteRequest: structs.WriteRequest{Token: a.config.ACLToken}, + WriteRequest: structs.WriteRequest{Token: a.config.GetTokenForAgent()}, } var reply struct{} if err := a.RPC("Coordinate.Update", &req, &reply); err != nil { diff --git a/consul/acl.go b/consul/acl.go index cfbf744f77..ba51e2a8f2 100644 --- a/consul/acl.go +++ b/consul/acl.go @@ -413,6 +413,22 @@ func (f *aclFilter) filterCheckServiceNodes(nodes *structs.CheckServiceNodes) { *nodes = csn } +// filterCoordinates is used to filter nodes in a coordinate dump based on ACL +// rules. +func (f *aclFilter) filterCoordinates(coords *structs.Coordinates) { + c := *coords + for i := 0; i < len(c); i++ { + node := c[i].Node + if f.allowNode(node) { + continue + } + f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node) + c = append(c[:i], c[i+1:]...) + i-- + } + *coords = c +} + // filterNodeDump is used to filter through all parts of a node dump and // remove elements the provided ACL token cannot access. func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) { @@ -448,14 +464,10 @@ func (f *aclFilter) filterNodeDump(dump *structs.NodeDump) { // filterNodes is used to filter through all parts of a node list and remove // elements the provided ACL token cannot access. func (f *aclFilter) filterNodes(nodes *structs.Nodes) { - if !f.enforceVersion8 { - return - } - n := *nodes for i := 0; i < len(n); i++ { node := n[i].Node - if f.acl.NodeRead(node) { + if f.allowNode(node) { continue } f.logger.Printf("[DEBUG] consul: dropping node %q from result due to ACLs", node) @@ -548,6 +560,9 @@ func (s *Server) filterACL(token string, subj interface{}) error { case *structs.IndexedCheckServiceNodes: filt.filterCheckServiceNodes(&v.Nodes) + case *structs.IndexedCoordinates: + filt.filterCoordinates(&v.Coordinates) + case *structs.IndexedHealthChecks: filt.filterHealthChecks(&v.HealthChecks) diff --git a/consul/acl_test.go b/consul/acl_test.go index 221e09fe97..9a6834c9df 100644 --- a/consul/acl_test.go +++ b/consul/acl_test.go @@ -1012,6 +1012,41 @@ func TestACL_filterCheckServiceNodes(t *testing.T) { } } +func TestACL_filterCoordinates(t *testing.T) { + // Create some coordinates. + coords := structs.Coordinates{ + &structs.Coordinate{ + Node: "node1", + Coord: generateRandomCoordinate(), + }, + &structs.Coordinate{ + Node: "node2", + Coord: generateRandomCoordinate(), + }, + } + + // Try permissive filtering. + filt := newAclFilter(acl.AllowAll(), nil, false) + filt.filterCoordinates(&coords) + if len(coords) != 2 { + t.Fatalf("bad: %#v", coords) + } + + // Try restrictive filtering without version 8 ACL enforcement. + filt = newAclFilter(acl.DenyAll(), nil, false) + filt.filterCoordinates(&coords) + if len(coords) != 2 { + t.Fatalf("bad: %#v", coords) + } + + // Try restrictive filtering with version 8 ACL enforcement. + filt = newAclFilter(acl.DenyAll(), nil, true) + filt.filterCoordinates(&coords) + if len(coords) != 0 { + t.Fatalf("bad: %#v", coords) + } +} + func TestACL_filterNodeDump(t *testing.T) { // Create a node dump dump := structs.NodeDump{ diff --git a/consul/coordinate_endpoint.go b/consul/coordinate_endpoint.go index 9e0df58212..6a30baa452 100644 --- a/consul/coordinate_endpoint.go +++ b/consul/coordinate_endpoint.go @@ -119,6 +119,17 @@ func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct return fmt.Errorf("rejected bad coordinate: %v", args.Coord) } + // Fetch the ACL token, if any, and enforce the node policy if enabled. + acl, err := c.srv.resolveToken(args.Token) + if err != nil { + return err + } + if acl != nil && c.srv.config.ACLEnforceVersion8 { + if !acl.NodeWrite(args.Node) { + return permissionDeniedErr + } + } + // Add the coordinate to the map of pending updates. c.updatesLock.Lock() c.updates[args.Node] = args.Coord @@ -173,6 +184,9 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I } reply.Index, reply.Coordinates = index, coords + if err := c.srv.filterACL(args.Token, reply); err != nil { + return err + } return nil }) } diff --git a/consul/coordinate_endpoint_test.go b/consul/coordinate_endpoint_test.go index 88171a7c8f..42e41c0dcb 100644 --- a/consul/coordinate_endpoint_test.go +++ b/consul/coordinate_endpoint_test.go @@ -187,6 +187,87 @@ func TestCoordinate_Update(t *testing.T) { } } +func TestCoordinate_Update_ACLDeny(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"node1", "node2"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send an update for the first node. This should go through since we + // don't have version 8 ACLs enforced yet. + req := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "node1", + Coord: generateRandomCoordinate(), + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Now turn on version 8 enforcement and try again. + s1.config.ACLEnforceVersion8 = true + err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } + + // Create an ACL that can write to the node. + arg := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +node "node1" { + policy = "write" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + var id string + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &arg, &id); err != nil { + t.Fatalf("err: %v", err) + } + + // With the token, it should now go through. + req.Token = id + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // But it should be blocked for the other node. + req.Node = "node2" + err = msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &req, &out) + if err == nil || !strings.Contains(err.Error(), permissionDenied) { + t.Fatalf("err: %v", err) + } +} + func TestCoordinate_ListDatacenters(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) @@ -240,8 +321,7 @@ func TestCoordinate_ListNodes(t *testing.T) { } } - // Send coordinate updates for a few nodes, waiting a little while for - // the batch update to run. + // Send coordinate updates for a few nodes. arg1 := structs.CoordinateUpdateRequest{ Datacenter: "dc1", Node: "foo", @@ -269,9 +349,120 @@ func TestCoordinate_ListNodes(t *testing.T) { if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil { t.Fatalf("err: %v", err) } - time.Sleep(3 * s1.config.CoordinateUpdatePeriod) // Now query back for all the nodes. + testutil.WaitForResult(func() (bool, error) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + resp := structs.IndexedCoordinates{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if len(resp.Coordinates) != 3 || + resp.Coordinates[0].Node != "bar" || + resp.Coordinates[1].Node != "baz" || + resp.Coordinates[2].Node != "foo" { + return false, fmt.Errorf("bad: %v", resp.Coordinates) + } + verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar + verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz + verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo + return true, nil + }, func(err error) { t.Fatalf("err: %v", err) }) +} + +func TestCoordinate_ListNodes_ACLFilter(t *testing.T) { + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = false + }) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + codec := rpcClient(t, s1) + defer codec.Close() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + // Register some nodes. + nodes := []string{"foo", "bar", "baz"} + for _, node := range nodes { + req := structs.RegisterRequest{ + Datacenter: "dc1", + Node: node, + Address: "127.0.0.1", + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + var reply struct{} + if err := msgpackrpc.CallWithCodec(codec, "Catalog.Register", &req, &reply); err != nil { + t.Fatalf("err: %v", err) + } + } + + // Send coordinate updates for a few nodes. + arg1 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "foo", + Coord: generateRandomCoordinate(), + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + var out struct{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg1, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg2 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "bar", + Coord: generateRandomCoordinate(), + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg2, &out); err != nil { + t.Fatalf("err: %v", err) + } + + arg3 := structs.CoordinateUpdateRequest{ + Datacenter: "dc1", + Node: "baz", + Coord: generateRandomCoordinate(), + WriteRequest: structs.WriteRequest{ + Token: "root", + }, + } + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.Update", &arg3, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for all the coordinate updates to apply. Since we aren't + // enforcing version 8 ACLs, this should also allow us to read + // everything back without a token. + testutil.WaitForResult(func() (bool, error) { + arg := structs.DCSpecificRequest{ + Datacenter: "dc1", + } + resp := structs.IndexedCoordinates{} + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if len(resp.Coordinates) == 3 { + return true, nil + } + return false, fmt.Errorf("bad: %v", resp.Coordinates) + }, func(err error) { t.Fatalf("err: %v", err) }) + + // Now that we've waited for the batch processing to ingest the + // coordinates we can do the rest of the requests without the loop. We + // will start by turning on version 8 ACL support which should block + // everything. + s1.config.ACLEnforceVersion8 = true arg := structs.DCSpecificRequest{ Datacenter: "dc1", } @@ -279,13 +470,38 @@ func TestCoordinate_ListNodes(t *testing.T) { if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { t.Fatalf("err: %v", err) } - if len(resp.Coordinates) != 3 || - resp.Coordinates[0].Node != "bar" || - resp.Coordinates[1].Node != "baz" || - resp.Coordinates[2].Node != "foo" { - t.Fatalf("bad: %v", resp.Coordinates) + if len(resp.Coordinates) != 0 { + t.Fatalf("bad: %#v", resp.Coordinates) + } + + // Create an ACL that can read one of the nodes. + var id string + { + req := structs.ACLRequest{ + Datacenter: "dc1", + Op: structs.ACLSet, + ACL: structs.ACL{ + Name: "User token", + Type: structs.ACLTypeClient, + Rules: ` +node "foo" { + policy = "read" +} +`, + }, + WriteRequest: structs.WriteRequest{Token: "root"}, + } + if err := msgpackrpc.CallWithCodec(codec, "ACL.Apply", &req, &id); err != nil { + t.Fatalf("err: %v", err) + } + } + + // With the token, it should now go through. + arg.Token = id + if err := msgpackrpc.CallWithCodec(codec, "Coordinate.ListNodes", &arg, &resp); err != nil { + t.Fatalf("err: %v", err) + } + if len(resp.Coordinates) != 1 || resp.Coordinates[0].Node != "foo" { + t.Fatalf("bad: %#v", resp.Coordinates) } - verifyCoordinatesEqual(t, resp.Coordinates[0].Coord, arg2.Coord) // bar - verifyCoordinatesEqual(t, resp.Coordinates[1].Coord, arg3.Coord) // baz - verifyCoordinatesEqual(t, resp.Coordinates[2].Coord, arg1.Coord) // foo }