From d428bc63c1130dda2f4092c74bb177977b273a0c Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 23 Mar 2017 13:34:30 -0700 Subject: [PATCH 1/2] Modifies server reconcile path to not use the server's token for internal operations. --- consul/leader.go | 26 ++++------ consul/leader_test.go | 51 ++++++++++++++++--- .../source/docs/agent/options.html.markdown | 5 +- .../source/docs/internals/acl.html.markdown | 10 ++-- 4 files changed, 61 insertions(+), 31 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index dbc60d8a08..124a14a258 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -464,10 +464,9 @@ AFTER_CHECK: Status: structs.HealthPassing, Output: SerfCheckAliveOutput, }, - WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()}, } - var out struct{} - return s.endpoints.Catalog.Register(&req, &out) + _, err = s.raftApply(structs.RegisterRequestType, &req) + return err } // handleFailedMember is used to mark the node's status @@ -505,10 +504,9 @@ func (s *Server) handleFailedMember(member serf.Member) error { Status: structs.HealthCritical, Output: SerfCheckFailedOutput, }, - WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()}, } - var out struct{} - return s.endpoints.Catalog.Register(&req, &out) + _, err = s.raftApply(structs.RegisterRequestType, &req) + return err } // handleLeftMember is used to handle members that gracefully @@ -553,12 +551,11 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error // Deregister the node s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason) req := structs.DeregisterRequest{ - Datacenter: s.config.Datacenter, - Node: member.Name, - WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()}, + Datacenter: s.config.Datacenter, + Node: member.Name, } - var out struct{} - return s.endpoints.Catalog.Deregister(&req, &out) + _, err = s.raftApply(structs.DeregisterRequestType, &req) + return err } // joinConsulServer is used to try to join another consul server @@ -712,10 +709,9 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error { func (s *Server) reapTombstones(index uint64) { defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now()) req := structs.TombstoneRequest{ - Datacenter: s.config.Datacenter, - Op: structs.TombstoneReap, - ReapIndex: index, - WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()}, + Datacenter: s.config.Datacenter, + Op: structs.TombstoneReap, + ReapIndex: index, } _, err := s.raftApply(structs.TombstoneRequestType, &req) if err != nil { diff --git a/consul/leader_test.go b/consul/leader_test.go index 7dc9e0a248..243420e592 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -14,7 +14,12 @@ import ( ) func TestLeader_RegisterMember(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -81,7 +86,12 @@ func TestLeader_RegisterMember(t *testing.T) { } func TestLeader_FailedMember(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -140,7 +150,12 @@ func TestLeader_FailedMember(t *testing.T) { } func TestLeader_LeftMember(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -185,7 +200,12 @@ func TestLeader_LeftMember(t *testing.T) { } func TestLeader_ReapMember(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -244,7 +264,12 @@ func TestLeader_ReapMember(t *testing.T) { } func TestLeader_Reconcile_ReapMember(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -261,6 +286,9 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) { Name: SerfCheckName, Status: structs.HealthCritical, }, + WriteRequest: structs.WriteRequest{ + Token: "root", + }, } var out struct{} if err := s1.RPC("Catalog.Register", &dead, &out); err != nil { @@ -284,7 +312,12 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) { } func TestLeader_Reconcile(t *testing.T) { - dir1, s1 := testServer(t) + dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" + c.ACLEnforceVersion8 = true + }) defer os.RemoveAll(dir1) defer s1.Shutdown() @@ -562,6 +595,9 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) { func TestLeader_ReapTombstones(t *testing.T) { dir1, s1 := testServerWithConfig(t, func(c *Config) { + c.ACLDatacenter = "dc1" + c.ACLMasterToken = "root" + c.ACLDefaultPolicy = "deny" c.TombstoneTTL = 50 * time.Millisecond c.TombstoneTTLGranularity = 10 * time.Millisecond }) @@ -579,6 +615,9 @@ func TestLeader_ReapTombstones(t *testing.T) { Key: "test", Value: []byte("test"), }, + WriteRequest: structs.WriteRequest{ + Token: "root", + }, } var out bool if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil { diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown index c523c0c0f3..d20079320b 100644 --- a/website/source/docs/agent/options.html.markdown +++ b/website/source/docs/agent/options.html.markdown @@ -454,9 +454,8 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass and servers to perform internal operations to the service catalog. If this isn't specified, then the `acl_token` will be used. This was added in Consul 0.7.2.

- For clients, this token must at least have write access to the node name it will register as. For - servers, this must have write access to all nodes that are expected to join the cluster, as well - as write access to the "consul" service, which will be registered automatically on its behalf. + This token must at least have write access to the node name it will register as in order to set any + of the node-level information in the catalog such as metadata, or the node's tagged addresses. * `acl_enforce_version_8` - Used for clients and servers to determine if enforcement should occur for new ACL policies being diff --git a/website/source/docs/internals/acl.html.markdown b/website/source/docs/internals/acl.html.markdown index 382566800d..d0231ef846 100644 --- a/website/source/docs/internals/acl.html.markdown +++ b/website/source/docs/internals/acl.html.markdown @@ -571,13 +571,9 @@ Two new configuration options are used once complete ACLs are enabled: tokens during normal operation. * [`acl_agent_token`](/docs/agent/options.html#acl_agent_token) is used internally by Consul agents to perform operations to the service catalog when registering themselves - or sending network coordinates to the servers. -
-
- For clients, this token must at least have `node` ACL policy `write` access to the node - name it will register as. For servers, this must have `node` ACL policy `write` access to - all nodes that are expected to join the cluster, as well as `service` ACL policy `write` - access to the `consul` service, which will be registered automatically on its behalf. + or sending network coordinates to the servers. This token must at least have `node` ACL + policy `write` access to the node name it will register as in order to register any + node-level information like metadata or tagged addresses. Since clients now resolve ACLs locally, the [`acl_down_policy`](/docs/agent/options.html#acl_down_policy) now applies to Consul clients as well as Consul servers. This will determine what the From 3f1c4a6f4453c8959dca67709cbc309bbae4bc82 Mon Sep 17 00:00:00 2001 From: James Phillips Date: Thu, 23 Mar 2017 15:01:46 -0700 Subject: [PATCH 2/2] Fixes an issue where servers would delete catalog information set by the node when they were trying to reconcile a member. --- consul/leader.go | 13 ++++- consul/leader_test.go | 97 ++++++++++++++++++++++++++++++++++ consul/structs/structs.go | 14 +++++ consul/structs/structs_test.go | 10 ++++ 4 files changed, 132 insertions(+), 2 deletions(-) diff --git a/consul/leader.go b/consul/leader.go index 124a14a258..dc41a48792 100644 --- a/consul/leader.go +++ b/consul/leader.go @@ -450,11 +450,11 @@ func (s *Server) handleAliveMember(member serf.Member) error { AFTER_CHECK: s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name) - // Register with the catalog + // Register with the catalog. req := structs.RegisterRequest{ Datacenter: s.config.Datacenter, - ID: types.NodeID(member.Tags["id"]), Node: member.Name, + ID: types.NodeID(member.Tags["id"]), Address: member.Addr.String(), Service: service, Check: &structs.HealthCheck{ @@ -464,6 +464,10 @@ AFTER_CHECK: Status: structs.HealthPassing, Output: SerfCheckAliveOutput, }, + + // If there's existing information about the node, do not + // clobber it. + SkipNodeUpdate: true, } _, err = s.raftApply(structs.RegisterRequestType, &req) return err @@ -496,6 +500,7 @@ func (s *Server) handleFailedMember(member serf.Member) error { req := structs.RegisterRequest{ Datacenter: s.config.Datacenter, Node: member.Name, + ID: types.NodeID(member.Tags["id"]), Address: member.Addr.String(), Check: &structs.HealthCheck{ Node: member.Name, @@ -504,6 +509,10 @@ func (s *Server) handleFailedMember(member serf.Member) error { Status: structs.HealthCritical, Output: SerfCheckFailedOutput, }, + + // If there's existing information about the node, do not + // clobber it. + SkipNodeUpdate: true, } _, err = s.raftApply(structs.RegisterRequestType, &req) return err diff --git a/consul/leader_test.go b/consul/leader_test.go index 243420e592..15db7021b8 100644 --- a/consul/leader_test.go +++ b/consul/leader_test.go @@ -354,6 +354,103 @@ func TestLeader_Reconcile(t *testing.T) { }) } +func TestLeader_Reconcile_Races(t *testing.T) { + dir1, s1 := testServer(t) + defer os.RemoveAll(dir1) + defer s1.Shutdown() + + testutil.WaitForLeader(t, s1.RPC, "dc1") + + dir2, c1 := testClient(t) + defer os.RemoveAll(dir2) + defer c1.Shutdown() + + addr := fmt.Sprintf("127.0.0.1:%d", + s1.config.SerfLANConfig.MemberlistConfig.BindPort) + if _, err := c1.JoinLAN([]string{addr}); err != nil { + t.Fatalf("err: %v", err) + } + + // Wait for the server to reconcile the client and register it. + state := s1.fsm.State() + var nodeAddr string + testutil.WaitForResult(func() (bool, error) { + _, node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + if node != nil { + nodeAddr = node.Address + return true, nil + } else { + return false, nil + } + }, func(err error) { + t.Fatalf("client should be registered") + }) + + // Add in some metadata via the catalog (as if the agent synced it + // there). We also set the serfHealth check to failing so the reconile + // will attempt to flip it back + req := structs.RegisterRequest{ + Datacenter: s1.config.Datacenter, + Node: c1.config.NodeName, + ID: c1.config.NodeID, + Address: nodeAddr, + NodeMeta: map[string]string{"hello": "world"}, + Check: &structs.HealthCheck{ + Node: c1.config.NodeName, + CheckID: SerfCheckID, + Name: SerfCheckName, + Status: structs.HealthCritical, + Output: "", + }, + } + var out struct{} + if err := s1.RPC("Catalog.Register", &req, &out); err != nil { + t.Fatalf("err: %v", err) + } + + // Force a reconcile and make sure the metadata stuck around. + if err := s1.reconcile(); err != nil { + t.Fatalf("err: %v", err) + } + _, node, err := state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { + t.Fatalf("bad") + } + if hello, ok := node.Meta["hello"]; !ok || hello != "world" { + t.Fatalf("bad") + } + + // Fail the member and wait for the health to go critical. + c1.Shutdown() + testutil.WaitForResult(func() (bool, error) { + _, checks, err := state.NodeChecks(nil, c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status) + }, func(err error) { + t.Fatalf("check status is %v, should be critical", err) + }) + + // Make sure the metadata didn't get clobbered. + _, node, err = state.GetNode(c1.config.NodeName) + if err != nil { + t.Fatalf("err: %v", err) + } + if node == nil { + t.Fatalf("bad") + } + if hello, ok := node.Meta["hello"]; !ok || hello != "world" { + t.Fatalf("bad") + } +} + func TestLeader_LeftServer(t *testing.T) { dir1, s1 := testServer(t) defer os.RemoveAll(dir1) diff --git a/consul/structs/structs.go b/consul/structs/structs.go index bd58ee9b3d..0805dbf4cf 100644 --- a/consul/structs/structs.go +++ b/consul/structs/structs.go @@ -201,6 +201,14 @@ type RegisterRequest struct { Service *NodeService Check *HealthCheck Checks HealthChecks + + // SkipNodeUpdate can be used when a register request is intended for + // updating a service and/or checks, but doesn't want to overwrite any + // node information if the node is already registered. If the node + // doesn't exist, it will still be created, but if the node exists, any + // node portion of this update will not apply. + SkipNodeUpdate bool + WriteRequest } @@ -217,6 +225,12 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool { return true } + // If we've been asked to skip the node update, then say there are no + // changes. + if r.SkipNodeUpdate { + return false + } + // Check if any of the node-level fields are being changed. if r.ID != node.ID || r.Node != node.Node || diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go index c7d02c785c..56b6dcf1f6 100644 --- a/consul/structs/structs_test.go +++ b/consul/structs/structs_test.go @@ -136,6 +136,16 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) { t.Fatalf("should change") } + req.SkipNodeUpdate = true + if req.ChangesNode(node) { + t.Fatalf("should skip") + } + + req.SkipNodeUpdate = false + if !req.ChangesNode(node) { + t.Fatalf("should change") + } + restore() if req.ChangesNode(node) { t.Fatalf("should not change")