diff --git a/consul/leader.go b/consul/leader.go
index dbc60d8a08..dc41a48792 100644
--- a/consul/leader.go
+++ b/consul/leader.go
@@ -450,11 +450,11 @@ func (s *Server) handleAliveMember(member serf.Member) error {
AFTER_CHECK:
s.logger.Printf("[INFO] consul: member '%s' joined, marking health alive", member.Name)
- // Register with the catalog
+ // Register with the catalog.
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
- ID: types.NodeID(member.Tags["id"]),
Node: member.Name,
+ ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Service: service,
Check: &structs.HealthCheck{
@@ -464,10 +464,13 @@ AFTER_CHECK:
Status: structs.HealthPassing,
Output: SerfCheckAliveOutput,
},
- WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()},
+
+ // If there's existing information about the node, do not
+ // clobber it.
+ SkipNodeUpdate: true,
}
- var out struct{}
- return s.endpoints.Catalog.Register(&req, &out)
+ _, err = s.raftApply(structs.RegisterRequestType, &req)
+ return err
}
// handleFailedMember is used to mark the node's status
@@ -497,6 +500,7 @@ func (s *Server) handleFailedMember(member serf.Member) error {
req := structs.RegisterRequest{
Datacenter: s.config.Datacenter,
Node: member.Name,
+ ID: types.NodeID(member.Tags["id"]),
Address: member.Addr.String(),
Check: &structs.HealthCheck{
Node: member.Name,
@@ -505,10 +509,13 @@ func (s *Server) handleFailedMember(member serf.Member) error {
Status: structs.HealthCritical,
Output: SerfCheckFailedOutput,
},
- WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()},
+
+ // If there's existing information about the node, do not
+ // clobber it.
+ SkipNodeUpdate: true,
}
- var out struct{}
- return s.endpoints.Catalog.Register(&req, &out)
+ _, err = s.raftApply(structs.RegisterRequestType, &req)
+ return err
}
// handleLeftMember is used to handle members that gracefully
@@ -553,12 +560,11 @@ func (s *Server) handleDeregisterMember(reason string, member serf.Member) error
// Deregister the node
s.logger.Printf("[INFO] consul: member '%s' %s, deregistering", member.Name, reason)
req := structs.DeregisterRequest{
- Datacenter: s.config.Datacenter,
- Node: member.Name,
- WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()},
+ Datacenter: s.config.Datacenter,
+ Node: member.Name,
}
- var out struct{}
- return s.endpoints.Catalog.Deregister(&req, &out)
+ _, err = s.raftApply(structs.DeregisterRequestType, &req)
+ return err
}
// joinConsulServer is used to try to join another consul server
@@ -712,10 +718,9 @@ func (s *Server) removeConsulServer(m serf.Member, port int) error {
func (s *Server) reapTombstones(index uint64) {
defer metrics.MeasureSince([]string{"consul", "leader", "reapTombstones"}, time.Now())
req := structs.TombstoneRequest{
- Datacenter: s.config.Datacenter,
- Op: structs.TombstoneReap,
- ReapIndex: index,
- WriteRequest: structs.WriteRequest{Token: s.config.GetTokenForAgent()},
+ Datacenter: s.config.Datacenter,
+ Op: structs.TombstoneReap,
+ ReapIndex: index,
}
_, err := s.raftApply(structs.TombstoneRequestType, &req)
if err != nil {
diff --git a/consul/leader_test.go b/consul/leader_test.go
index 7dc9e0a248..15db7021b8 100644
--- a/consul/leader_test.go
+++ b/consul/leader_test.go
@@ -14,7 +14,12 @@ import (
)
func TestLeader_RegisterMember(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -81,7 +86,12 @@ func TestLeader_RegisterMember(t *testing.T) {
}
func TestLeader_FailedMember(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -140,7 +150,12 @@ func TestLeader_FailedMember(t *testing.T) {
}
func TestLeader_LeftMember(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -185,7 +200,12 @@ func TestLeader_LeftMember(t *testing.T) {
}
func TestLeader_ReapMember(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -244,7 +264,12 @@ func TestLeader_ReapMember(t *testing.T) {
}
func TestLeader_Reconcile_ReapMember(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -261,6 +286,9 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
Name: SerfCheckName,
Status: structs.HealthCritical,
},
+ WriteRequest: structs.WriteRequest{
+ Token: "root",
+ },
}
var out struct{}
if err := s1.RPC("Catalog.Register", &dead, &out); err != nil {
@@ -284,7 +312,12 @@ func TestLeader_Reconcile_ReapMember(t *testing.T) {
}
func TestLeader_Reconcile(t *testing.T) {
- dir1, s1 := testServer(t)
+ dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
+ c.ACLEnforceVersion8 = true
+ })
defer os.RemoveAll(dir1)
defer s1.Shutdown()
@@ -321,6 +354,103 @@ func TestLeader_Reconcile(t *testing.T) {
})
}
+func TestLeader_Reconcile_Races(t *testing.T) {
+ dir1, s1 := testServer(t)
+ defer os.RemoveAll(dir1)
+ defer s1.Shutdown()
+
+ testutil.WaitForLeader(t, s1.RPC, "dc1")
+
+ dir2, c1 := testClient(t)
+ defer os.RemoveAll(dir2)
+ defer c1.Shutdown()
+
+ addr := fmt.Sprintf("127.0.0.1:%d",
+ s1.config.SerfLANConfig.MemberlistConfig.BindPort)
+ if _, err := c1.JoinLAN([]string{addr}); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Wait for the server to reconcile the client and register it.
+ state := s1.fsm.State()
+ var nodeAddr string
+ testutil.WaitForResult(func() (bool, error) {
+ _, node, err := state.GetNode(c1.config.NodeName)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if node != nil {
+ nodeAddr = node.Address
+ return true, nil
+ } else {
+ return false, nil
+ }
+ }, func(err error) {
+ t.Fatalf("client should be registered")
+ })
+
+ // Add in some metadata via the catalog (as if the agent synced it
+ // there). We also set the serfHealth check to failing so the reconile
+ // will attempt to flip it back
+ req := structs.RegisterRequest{
+ Datacenter: s1.config.Datacenter,
+ Node: c1.config.NodeName,
+ ID: c1.config.NodeID,
+ Address: nodeAddr,
+ NodeMeta: map[string]string{"hello": "world"},
+ Check: &structs.HealthCheck{
+ Node: c1.config.NodeName,
+ CheckID: SerfCheckID,
+ Name: SerfCheckName,
+ Status: structs.HealthCritical,
+ Output: "",
+ },
+ }
+ var out struct{}
+ if err := s1.RPC("Catalog.Register", &req, &out); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+
+ // Force a reconcile and make sure the metadata stuck around.
+ if err := s1.reconcile(); err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ _, node, err := state.GetNode(c1.config.NodeName)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if node == nil {
+ t.Fatalf("bad")
+ }
+ if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
+ t.Fatalf("bad")
+ }
+
+ // Fail the member and wait for the health to go critical.
+ c1.Shutdown()
+ testutil.WaitForResult(func() (bool, error) {
+ _, checks, err := state.NodeChecks(nil, c1.config.NodeName)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ return checks[0].Status == structs.HealthCritical, errors.New(checks[0].Status)
+ }, func(err error) {
+ t.Fatalf("check status is %v, should be critical", err)
+ })
+
+ // Make sure the metadata didn't get clobbered.
+ _, node, err = state.GetNode(c1.config.NodeName)
+ if err != nil {
+ t.Fatalf("err: %v", err)
+ }
+ if node == nil {
+ t.Fatalf("bad")
+ }
+ if hello, ok := node.Meta["hello"]; !ok || hello != "world" {
+ t.Fatalf("bad")
+ }
+}
+
func TestLeader_LeftServer(t *testing.T) {
dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
@@ -562,6 +692,9 @@ func TestLeader_TombstoneGC_Reset(t *testing.T) {
func TestLeader_ReapTombstones(t *testing.T) {
dir1, s1 := testServerWithConfig(t, func(c *Config) {
+ c.ACLDatacenter = "dc1"
+ c.ACLMasterToken = "root"
+ c.ACLDefaultPolicy = "deny"
c.TombstoneTTL = 50 * time.Millisecond
c.TombstoneTTLGranularity = 10 * time.Millisecond
})
@@ -579,6 +712,9 @@ func TestLeader_ReapTombstones(t *testing.T) {
Key: "test",
Value: []byte("test"),
},
+ WriteRequest: structs.WriteRequest{
+ Token: "root",
+ },
}
var out bool
if err := msgpackrpc.CallWithCodec(codec, "KVS.Apply", &arg, &out); err != nil {
diff --git a/consul/structs/structs.go b/consul/structs/structs.go
index bd58ee9b3d..0805dbf4cf 100644
--- a/consul/structs/structs.go
+++ b/consul/structs/structs.go
@@ -201,6 +201,14 @@ type RegisterRequest struct {
Service *NodeService
Check *HealthCheck
Checks HealthChecks
+
+ // SkipNodeUpdate can be used when a register request is intended for
+ // updating a service and/or checks, but doesn't want to overwrite any
+ // node information if the node is already registered. If the node
+ // doesn't exist, it will still be created, but if the node exists, any
+ // node portion of this update will not apply.
+ SkipNodeUpdate bool
+
WriteRequest
}
@@ -217,6 +225,12 @@ func (r *RegisterRequest) ChangesNode(node *Node) bool {
return true
}
+ // If we've been asked to skip the node update, then say there are no
+ // changes.
+ if r.SkipNodeUpdate {
+ return false
+ }
+
// Check if any of the node-level fields are being changed.
if r.ID != node.ID ||
r.Node != node.Node ||
diff --git a/consul/structs/structs_test.go b/consul/structs/structs_test.go
index c7d02c785c..56b6dcf1f6 100644
--- a/consul/structs/structs_test.go
+++ b/consul/structs/structs_test.go
@@ -136,6 +136,16 @@ func TestStructs_RegisterRequest_ChangesNode(t *testing.T) {
t.Fatalf("should change")
}
+ req.SkipNodeUpdate = true
+ if req.ChangesNode(node) {
+ t.Fatalf("should skip")
+ }
+
+ req.SkipNodeUpdate = false
+ if !req.ChangesNode(node) {
+ t.Fatalf("should change")
+ }
+
restore()
if req.ChangesNode(node) {
t.Fatalf("should not change")
diff --git a/website/source/docs/agent/options.html.markdown b/website/source/docs/agent/options.html.markdown
index c523c0c0f3..d20079320b 100644
--- a/website/source/docs/agent/options.html.markdown
+++ b/website/source/docs/agent/options.html.markdown
@@ -454,9 +454,8 @@ Consul will not enable TLS for the HTTP API unless the `https` port has been ass
and servers to perform internal operations to the service catalog. If this isn't specified, then
the `acl_token` will be used. This was added in Consul 0.7.2.
- For clients, this token must at least have write access to the node name it will register as. For
- servers, this must have write access to all nodes that are expected to join the cluster, as well
- as write access to the "consul" service, which will be registered automatically on its behalf.
+ This token must at least have write access to the node name it will register as in order to set any
+ of the node-level information in the catalog such as metadata, or the node's tagged addresses.
* `acl_enforce_version_8` -
Used for clients and servers to determine if enforcement should occur for new ACL policies being
diff --git a/website/source/docs/internals/acl.html.markdown b/website/source/docs/internals/acl.html.markdown
index 382566800d..d0231ef846 100644
--- a/website/source/docs/internals/acl.html.markdown
+++ b/website/source/docs/internals/acl.html.markdown
@@ -571,13 +571,9 @@ Two new configuration options are used once complete ACLs are enabled:
tokens during normal operation.
* [`acl_agent_token`](/docs/agent/options.html#acl_agent_token) is used internally by
Consul agents to perform operations to the service catalog when registering themselves
- or sending network coordinates to the servers.
-
-
- For clients, this token must at least have `node` ACL policy `write` access to the node
- name it will register as. For servers, this must have `node` ACL policy `write` access to
- all nodes that are expected to join the cluster, as well as `service` ACL policy `write`
- access to the `consul` service, which will be registered automatically on its behalf.
+ or sending network coordinates to the servers. This token must at least have `node` ACL
+ policy `write` access to the node name it will register as in order to register any
+ node-level information like metadata or tagged addresses.
Since clients now resolve ACLs locally, the [`acl_down_policy`](/docs/agent/options.html#acl_down_policy)
now applies to Consul clients as well as Consul servers. This will determine what the