Browse Source

consul: use a function for ingesting responses

pull/336/head
Ryan Uber 10 years ago
parent
commit
ef2aabc544
  1. 127
      consul/internal_endpoint.go
  2. 7
      consul/rpc.go

127
consul/internal_endpoint.go

@ -2,6 +2,7 @@ package consul
import ( import (
"github.com/hashicorp/consul/consul/structs" "github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
) )
// Internal endpoint is used to query the miscellaneous info that // Internal endpoint is used to query the miscellaneous info that
@ -63,6 +64,22 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
return m.srv.UserEvent(args.Name, args.Payload) return m.srv.UserEvent(args.Name, args.Payload)
} }
func (m *Internal) ingestKeyringResponse(
resp *serf.KeyResponse,
reply *structs.KeyringResponses) {
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
Datacenter: m.srv.config.Datacenter,
Messages: resp.Messages,
Keys: resp.Keys,
NumResp: resp.NumResp,
NumNodes: resp.NumNodes,
NumErr: resp.NumErr,
})
}
// ListKeys will query the WAN and LAN gossip keyrings of all nodes, adding
// results into a collective response as we go.
func (m *Internal) ListKeys( func (m *Internal) ListKeys(
args *structs.KeyringRequest, args *structs.KeyringRequest,
reply *structs.KeyringResponses) error { reply *structs.KeyringResponses) error {
@ -71,28 +88,14 @@ func (m *Internal) ListKeys(
if err != nil { if err != nil {
return err return err
} }
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ m.ingestKeyringResponse(respLAN, reply)
Datacenter: m.srv.config.Datacenter,
Messages: respLAN.Messages,
Keys: respLAN.Keys,
NumResp: respLAN.NumResp,
NumNodes: respLAN.NumNodes,
NumErr: respLAN.NumErr,
})
if !args.Forwarded { if !args.Forwarded {
respWAN, err := m.srv.KeyManagerWAN().ListKeys() respWAN, err := m.srv.KeyManagerWAN().ListKeys()
if err != nil { if err != nil {
return err return err
} }
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ m.ingestKeyringResponse(respWAN, reply)
Datacenter: m.srv.config.Datacenter,
Messages: respWAN.Messages,
Keys: respWAN.Keys,
NumResp: respWAN.NumResp,
NumNodes: respWAN.NumNodes,
NumErr: respWAN.NumErr,
})
// Mark key rotation as being already forwarded, then forward. // Mark key rotation as being already forwarded, then forward.
args.Forwarded = true args.Forwarded = true
@ -102,32 +105,25 @@ func (m *Internal) ListKeys(
return nil return nil
} }
// InstallKey broadcasts a new encryption key to all nodes. This involves
// installing a new key on every node across all datacenters.
func (m *Internal) InstallKey( func (m *Internal) InstallKey(
args *structs.KeyringRequest, args *structs.KeyringRequest,
reply *structs.KeyringResponses) error { reply *structs.KeyringResponses) error {
respLAN, _ := m.srv.KeyManagerLAN().InstallKey(args.Key) respLAN, err := m.srv.KeyManagerLAN().InstallKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respLAN.Messages, }
Keys: respLAN.Keys, m.ingestKeyringResponse(respLAN, reply)
NumResp: respLAN.NumResp,
NumNodes: respLAN.NumNodes,
NumErr: respLAN.NumErr,
})
if !args.Forwarded { if !args.Forwarded {
respWAN, _ := m.srv.KeyManagerWAN().InstallKey(args.Key) respWAN, err := m.srv.KeyManagerWAN().InstallKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respWAN.Messages, }
Keys: respWAN.Keys, m.ingestKeyringResponse(respWAN, reply)
NumResp: respWAN.NumResp,
NumNodes: respWAN.NumNodes,
NumErr: respWAN.NumErr,
})
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true args.Forwarded = true
return m.srv.forwardAll("Internal.InstallKey", args, reply) return m.srv.forwardAll("Internal.InstallKey", args, reply)
} }
@ -135,32 +131,25 @@ func (m *Internal) InstallKey(
return nil return nil
} }
// UseKey instructs all nodes to change the key they are using to
// encrypt gossip messages.
func (m *Internal) UseKey( func (m *Internal) UseKey(
args *structs.KeyringRequest, args *structs.KeyringRequest,
reply *structs.KeyringResponses) error { reply *structs.KeyringResponses) error {
respLAN, _ := m.srv.KeyManagerLAN().UseKey(args.Key) respLAN, err := m.srv.KeyManagerLAN().UseKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respLAN.Messages, }
Keys: respLAN.Keys, m.ingestKeyringResponse(respLAN, reply)
NumResp: respLAN.NumResp,
NumNodes: respLAN.NumNodes,
NumErr: respLAN.NumErr,
})
if !args.Forwarded { if !args.Forwarded {
respWAN, _ := m.srv.KeyManagerWAN().UseKey(args.Key) respWAN, err := m.srv.KeyManagerWAN().UseKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respWAN.Messages, }
Keys: respWAN.Keys, m.ingestKeyringResponse(respWAN, reply)
NumResp: respWAN.NumResp,
NumNodes: respWAN.NumNodes,
NumErr: respWAN.NumErr,
})
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true args.Forwarded = true
return m.srv.forwardAll("Internal.UseKey", args, reply) return m.srv.forwardAll("Internal.UseKey", args, reply)
} }
@ -168,32 +157,24 @@ func (m *Internal) UseKey(
return nil return nil
} }
// RemoveKey instructs all nodes to drop the specified key from the keyring.
func (m *Internal) RemoveKey( func (m *Internal) RemoveKey(
args *structs.KeyringRequest, args *structs.KeyringRequest,
reply *structs.KeyringResponses) error { reply *structs.KeyringResponses) error {
respLAN, _ := m.srv.KeyManagerLAN().RemoveKey(args.Key) respLAN, err := m.srv.KeyManagerLAN().RemoveKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respLAN.Messages, }
Keys: respLAN.Keys, m.ingestKeyringResponse(respLAN, reply)
NumResp: respLAN.NumResp,
NumNodes: respLAN.NumNodes,
NumErr: respLAN.NumErr,
})
if !args.Forwarded { if !args.Forwarded {
respWAN, _ := m.srv.KeyManagerWAN().RemoveKey(args.Key) respWAN, err := m.srv.KeyManagerWAN().RemoveKey(args.Key)
reply.Responses = append(reply.Responses, &structs.KeyringResponse{ if err != nil {
Datacenter: m.srv.config.Datacenter, return err
Messages: respWAN.Messages, }
Keys: respWAN.Keys, m.ingestKeyringResponse(respWAN, reply)
NumResp: respWAN.NumResp,
NumNodes: respWAN.NumNodes,
NumErr: respWAN.NumErr,
})
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true args.Forwarded = true
return m.srv.forwardAll("Internal.RemoveKey", args, reply) return m.srv.forwardAll("Internal.RemoveKey", args, reply)
} }

7
consul/rpc.go

@ -227,9 +227,10 @@ func (s *Server) forwardDC(method, dc string, args interface{}, reply interface{
func (s *Server) forwardAll(method string, args, reply interface{}) error { func (s *Server) forwardAll(method string, args, reply interface{}) error {
for dc, _ := range s.remoteConsuls { for dc, _ := range s.remoteConsuls {
if dc != s.config.Datacenter { if dc != s.config.Datacenter {
if err := s.forwardDC(method, dc, args, reply); err != nil { // Forward the RPC call. Even if an error is returned here, we still
return err // want to continue broadcasting to the remaining DC's to avoid
} // network partitions completely killing us.
go s.forwardDC(method, dc, args, reply)
} }
} }
return nil return nil

Loading…
Cancel
Save