From ef2aabc54492137494ad21a64474da360353b589 Mon Sep 17 00:00:00 2001 From: Ryan Uber Date: Thu, 25 Sep 2014 00:22:06 -0700 Subject: [PATCH] consul: use a function for ingesting responses --- consul/internal_endpoint.go | 127 +++++++++++++++--------------------- consul/rpc.go | 7 +- 2 files changed, 58 insertions(+), 76 deletions(-) diff --git a/consul/internal_endpoint.go b/consul/internal_endpoint.go index e7dafb319f..6907076116 100644 --- a/consul/internal_endpoint.go +++ b/consul/internal_endpoint.go @@ -2,6 +2,7 @@ package consul import ( "github.com/hashicorp/consul/consul/structs" + "github.com/hashicorp/serf/serf" ) // Internal endpoint is used to query the miscellaneous info that @@ -63,6 +64,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest, return m.srv.UserEvent(args.Name, args.Payload) } +func (m *Internal) ingestKeyringResponse( + resp *serf.KeyResponse, + reply *structs.KeyringResponses) { + + reply.Responses = append(reply.Responses, &structs.KeyringResponse{ + Datacenter: m.srv.config.Datacenter, + Messages: resp.Messages, + Keys: resp.Keys, + NumResp: resp.NumResp, + NumNodes: resp.NumNodes, + NumErr: resp.NumErr, + }) +} + +// ListKeys will query the WAN and LAN gossip keyrings of all nodes, adding +// results into a collective response as we go. func (m *Internal) ListKeys( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { @@ -71,28 +88,14 @@ func (m *Internal) ListKeys( if err != nil { return err } - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respLAN.Messages, - Keys: respLAN.Keys, - NumResp: respLAN.NumResp, - NumNodes: respLAN.NumNodes, - NumErr: respLAN.NumErr, - }) + m.ingestKeyringResponse(respLAN, reply) if !args.Forwarded { respWAN, err := m.srv.KeyManagerWAN().ListKeys() if err != nil { return err } - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respWAN.Messages, - Keys: respWAN.Keys, - NumResp: respWAN.NumResp, - NumNodes: respWAN.NumNodes, - NumErr: respWAN.NumErr, - }) + m.ingestKeyringResponse(respWAN, reply) // Mark key rotation as being already forwarded, then forward. args.Forwarded = true @@ -102,32 +105,25 @@ func (m *Internal) ListKeys( return nil } +// InstallKey broadcasts a new encryption key to all nodes. This involves +// installing a new key on every node across all datacenters. func (m *Internal) InstallKey( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { - respLAN, _ := m.srv.KeyManagerLAN().InstallKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respLAN.Messages, - Keys: respLAN.Keys, - NumResp: respLAN.NumResp, - NumNodes: respLAN.NumNodes, - NumErr: respLAN.NumErr, - }) + respLAN, err := m.srv.KeyManagerLAN().InstallKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respLAN, reply) if !args.Forwarded { - respWAN, _ := m.srv.KeyManagerWAN().InstallKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respWAN.Messages, - Keys: respWAN.Keys, - NumResp: respWAN.NumResp, - NumNodes: respWAN.NumNodes, - NumErr: respWAN.NumErr, - }) + respWAN, err := m.srv.KeyManagerWAN().InstallKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respWAN, reply) - // Mark key rotation as being already forwarded, then forward. args.Forwarded = true return m.srv.forwardAll("Internal.InstallKey", args, reply) } @@ -135,32 +131,25 @@ func (m *Internal) InstallKey( return nil } +// UseKey instructs all nodes to change the key they are using to +// encrypt gossip messages. func (m *Internal) UseKey( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { - respLAN, _ := m.srv.KeyManagerLAN().UseKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respLAN.Messages, - Keys: respLAN.Keys, - NumResp: respLAN.NumResp, - NumNodes: respLAN.NumNodes, - NumErr: respLAN.NumErr, - }) + respLAN, err := m.srv.KeyManagerLAN().UseKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respLAN, reply) if !args.Forwarded { - respWAN, _ := m.srv.KeyManagerWAN().UseKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respWAN.Messages, - Keys: respWAN.Keys, - NumResp: respWAN.NumResp, - NumNodes: respWAN.NumNodes, - NumErr: respWAN.NumErr, - }) + respWAN, err := m.srv.KeyManagerWAN().UseKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respWAN, reply) - // Mark key rotation as being already forwarded, then forward. args.Forwarded = true return m.srv.forwardAll("Internal.UseKey", args, reply) } @@ -168,32 +157,24 @@ func (m *Internal) UseKey( return nil } +// RemoveKey instructs all nodes to drop the specified key from the keyring. func (m *Internal) RemoveKey( args *structs.KeyringRequest, reply *structs.KeyringResponses) error { - respLAN, _ := m.srv.KeyManagerLAN().RemoveKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respLAN.Messages, - Keys: respLAN.Keys, - NumResp: respLAN.NumResp, - NumNodes: respLAN.NumNodes, - NumErr: respLAN.NumErr, - }) + respLAN, err := m.srv.KeyManagerLAN().RemoveKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respLAN, reply) if !args.Forwarded { - respWAN, _ := m.srv.KeyManagerWAN().RemoveKey(args.Key) - reply.Responses = append(reply.Responses, &structs.KeyringResponse{ - Datacenter: m.srv.config.Datacenter, - Messages: respWAN.Messages, - Keys: respWAN.Keys, - NumResp: respWAN.NumResp, - NumNodes: respWAN.NumNodes, - NumErr: respWAN.NumErr, - }) + respWAN, err := m.srv.KeyManagerWAN().RemoveKey(args.Key) + if err != nil { + return err + } + m.ingestKeyringResponse(respWAN, reply) - // Mark key rotation as being already forwarded, then forward. args.Forwarded = true return m.srv.forwardAll("Internal.RemoveKey", args, reply) } diff --git a/consul/rpc.go b/consul/rpc.go index 4526ca75b9..e5b5be6c5e 100644 --- a/consul/rpc.go +++ b/consul/rpc.go @@ -227,9 +227,10 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{ func (s *Server) forwardAll(method string, args, reply interface{}) error { for dc, _ := range s.remoteConsuls { if dc != s.config.Datacenter { - if err := s.forwardDC(method, dc, args, reply); err != nil { - return err - } + // Forward the RPC call. Even if an error is returned here, we still + // want to continue broadcasting to the remaining DC's to avoid + // network partitions completely killing us. + go s.forwardDC(method, dc, args, reply) } } return nil