consul: break rpc forwarding and response ingestion out of internal endpoints

pull/336/head
Ryan Uber 10 years ago
parent 6a3271980e
commit 74c7b1239b

@ -2,7 +2,6 @@ package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
// Internal endpoint is used to query the miscellaneous info that
@ -64,67 +63,23 @@ func (m *Internal) EventFire(args *structs.EventFireRequest,
return m.srv.UserEvent(args.Name, args.Payload)
}
// ingestKeyringResponse is a helper method to pick the relative information
// from a Serf message and stuff it into a KeyringResponse.
func (m *Internal) ingestKeyringResponse(
serfResp *serf.KeyResponse,
reply *structs.KeyringResponses,
err error, wan bool) {
errStr := ""
if err != nil {
errStr = err.Error()
}
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
WAN: wan,
Datacenter: m.srv.config.Datacenter,
Messages: serfResp.Messages,
Keys: serfResp.Keys,
NumNodes: serfResp.NumNodes,
Error: errStr,
})
}
func (m *Internal) forwardKeyring(
method string,
args *structs.KeyringRequest,
replies *structs.KeyringResponses) error {
for dc, _ := range m.srv.remoteConsuls {
if dc == m.srv.config.Datacenter {
continue
}
rr := structs.KeyringResponses{}
if err := m.srv.forwardDC(method, dc, args, &rr); err != nil {
return err
}
for _, r := range rr.Responses {
replies.Responses = append(replies.Responses, r)
}
}
return nil
}
// ListKeys will query the WAN and LAN gossip keyrings of all nodes, adding
// results into a collective response as we go.
func (m *Internal) ListKeys(
args *structs.KeyringRequest,
reply *structs.KeyringResponses) error {
m.srv.setQueryMeta(&reply.QueryMeta)
dc := m.srv.config.Datacenter
respLAN, err := m.srv.KeyManagerLAN().ListKeys()
m.ingestKeyringResponse(respLAN, reply, err, false)
ingestKeyringResponse(respLAN, reply, dc, false, err)
if !args.Forwarded {
respWAN, err := m.srv.KeyManagerWAN().ListKeys()
m.ingestKeyringResponse(respWAN, reply, err, true)
ingestKeyringResponse(respWAN, reply, dc, true, err)
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true
m.forwardKeyring("Internal.ListKeys", args, reply)
m.srv.forwardKeyringRPC("Internal.ListKeys", args, reply)
}
return nil
@ -136,16 +91,16 @@ func (m *Internal) InstallKey(
args *structs.KeyringRequest,
reply *structs.KeyringResponses) error {
dc := m.srv.config.Datacenter
respLAN, err := m.srv.KeyManagerLAN().InstallKey(args.Key)
m.ingestKeyringResponse(respLAN, reply, err, false)
ingestKeyringResponse(respLAN, reply, dc, false, err)
if !args.Forwarded {
respWAN, err := m.srv.KeyManagerWAN().InstallKey(args.Key)
m.ingestKeyringResponse(respWAN, reply, err, true)
ingestKeyringResponse(respWAN, reply, dc, true, err)
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true
m.forwardKeyring("Internal.InstallKey", args, reply)
m.srv.forwardKeyringRPC("Internal.InstallKey", args, reply)
}
return nil
@ -157,16 +112,16 @@ func (m *Internal) UseKey(
args *structs.KeyringRequest,
reply *structs.KeyringResponses) error {
dc := m.srv.config.Datacenter
respLAN, err := m.srv.KeyManagerLAN().UseKey(args.Key)
m.ingestKeyringResponse(respLAN, reply, err, false)
ingestKeyringResponse(respLAN, reply, dc, false, err)
if !args.Forwarded {
respWAN, err := m.srv.KeyManagerWAN().UseKey(args.Key)
m.ingestKeyringResponse(respWAN, reply, err, true)
ingestKeyringResponse(respWAN, reply, dc, true, err)
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true
m.forwardKeyring("Internal.UseKey", args, reply)
m.srv.forwardKeyringRPC("Internal.UseKey", args, reply)
}
return nil
@ -177,16 +132,16 @@ func (m *Internal) RemoveKey(
args *structs.KeyringRequest,
reply *structs.KeyringResponses) error {
dc := m.srv.config.Datacenter
respLAN, err := m.srv.KeyManagerLAN().RemoveKey(args.Key)
m.ingestKeyringResponse(respLAN, reply, err, false)
ingestKeyringResponse(respLAN, reply, dc, false, err)
if !args.Forwarded {
respWAN, err := m.srv.KeyManagerWAN().RemoveKey(args.Key)
m.ingestKeyringResponse(respWAN, reply, err, true)
ingestKeyringResponse(respWAN, reply, dc, true, err)
// Mark key rotation as being already forwarded, then forward.
args.Forwarded = true
m.forwardKeyring("Internal.RemoveKey", args, reply)
m.srv.forwardKeyringRPC("Internal.RemoveKey", args, reply)
}
return nil

@ -0,0 +1,55 @@
package consul
import (
"github.com/hashicorp/consul/consul/structs"
"github.com/hashicorp/serf/serf"
)
// ingestKeyringResponse is a helper method to pick the relative information
// from a Serf message and stuff it into a KeyringResponse.
func ingestKeyringResponse(
serfResp *serf.KeyResponse, reply *structs.KeyringResponses,
dc string, wan bool, err error) {
errStr := ""
if err != nil {
errStr = err.Error()
}
reply.Responses = append(reply.Responses, &structs.KeyringResponse{
WAN: wan,
Datacenter: dc,
Messages: serfResp.Messages,
Keys: serfResp.Keys,
NumNodes: serfResp.NumNodes,
Error: errStr,
})
}
// forwardKeyringRPC is used to forward a keyring-related RPC request to one
// server in each datacenter. Since the net/rpc package writes replies in-place,
// we use this specialized method for dealing with keyring-related replies
// specifically by appending them to a wrapper response struct.
//
// This will only error for RPC-related errors. Otherwise, application-level
// errors are returned inside of the inner response objects.
func (s *Server) forwardKeyringRPC(
method string,
args *structs.KeyringRequest,
replies *structs.KeyringResponses) error {
for dc, _ := range s.remoteConsuls {
if dc == s.config.Datacenter {
continue
}
rr := structs.KeyringResponses{}
if err := s.forwardDC(method, dc, args, &rr); err != nil {
return err
}
for _, r := range rr.Responses {
replies.Responses = append(replies.Responses, r)
}
}
return nil
}
Loading…
Cancel
Save