From 2b5b54bd37753ee51a4adebd3f96f34b97111280 Mon Sep 17 00:00:00 2001 From: Daniel Nephin Date: Thu, 6 May 2021 16:59:53 -0400 Subject: [PATCH] Merge pull request #10075 from hashicorp/dnephin/handle-raft-apply-errors rpc: some cleanup of canRetry and ForwardRPC --- agent/consul/acl_endpoint.go | 64 +++++++++--------- agent/consul/acl_endpoint_legacy.go | 8 +-- agent/consul/auto_config_endpoint.go | 9 +-- agent/consul/auto_config_endpoint_test.go | 13 ++-- agent/consul/auto_encrypt_endpoint.go | 2 +- agent/consul/catalog_endpoint.go | 18 ++--- agent/consul/client.go | 12 ++-- agent/consul/config_endpoint.go | 12 ++-- agent/consul/connect_ca_endpoint.go | 12 ++-- agent/consul/coordinate_endpoint.go | 6 +- agent/consul/discovery_chain_endpoint.go | 5 +- agent/consul/federation_state_endpoint.go | 8 +-- agent/consul/health_endpoint.go | 13 ++-- agent/consul/intention_endpoint.go | 10 +-- agent/consul/internal_endpoint.go | 23 ++++--- agent/consul/kvs_endpoint.go | 8 +-- agent/consul/operator_autopilot_endpoint.go | 8 +-- agent/consul/operator_raft_endpoint.go | 11 +-- agent/consul/prepared_query_endpoint.go | 12 ++-- agent/consul/rpc.go | 38 ++++------- agent/consul/rpc_test.go | 67 +++++++++++++++++-- agent/consul/session_endpoint.go | 10 +-- agent/consul/txn_endpoint.go | 4 +- .../connect/envoy/case-basic/verify.bats | 2 +- .../connect/envoy/case-http/verify.bats | 4 +- 25 files changed, 218 insertions(+), 161 deletions(-) diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index 15143f7276..a49ba0501a 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -184,7 +184,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC if err := a.aclPreCheck(); err != nil { return err } - if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, reply); done { return err } @@ -279,7 +279,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, reply); done { return err } @@ -348,7 +348,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, reply); done { return err } @@ -419,7 +419,7 @@ func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken return fmt.Errorf("Local tokens are disabled") } - if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, reply); done { return err } @@ -825,7 +825,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, reply); done { return err } @@ -911,7 +911,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenList", args, reply); done { return err } @@ -974,7 +974,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *struc args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, reply); done { return err } @@ -1028,7 +1028,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, reply); done { return err } @@ -1066,7 +1066,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *str return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, reply); done { return err } @@ -1104,7 +1104,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, reply); done { return err } @@ -1235,7 +1235,7 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string) args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, reply); done { return err } @@ -1288,7 +1288,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, reply); done { return err } @@ -1328,7 +1328,7 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc return err } - if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, reply); done { return err } @@ -1386,7 +1386,7 @@ func makeACLETag(parent string, policy *acl.Policy) string { // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *structs.ACLPolicyResolveLegacyResponse) error { - if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, reply); done { return err } @@ -1433,7 +1433,7 @@ func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, reply); done { return err } @@ -1461,7 +1461,7 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe return err } - if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, reply); done { return err } @@ -1500,7 +1500,7 @@ func (a *ACL) RoleBatchRead(args *structs.ACLRoleBatchGetRequest, reply *structs return err } - if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, reply); done { return err } @@ -1538,7 +1538,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, reply); done { return err } @@ -1696,7 +1696,7 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, reply); done { return err } @@ -1745,7 +1745,7 @@ func (a *ACL) RoleList(args *structs.ACLRoleListRequest, reply *structs.ACLRoleL return err } - if done, err := a.srv.ForwardRPC("ACL.RoleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleList", args, reply); done { return err } @@ -1779,7 +1779,7 @@ func (a *ACL) RoleResolve(args *structs.ACLRoleBatchGetRequest, reply *structs.A return err } - if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, reply); done { return err } @@ -1844,7 +1844,7 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, reply); done { return err } @@ -1883,7 +1883,7 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, reply); done { return err } @@ -2012,7 +2012,7 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, reply); done { return err } @@ -2063,7 +2063,7 @@ func (a *ACL) BindingRuleList(args *structs.ACLBindingRuleListRequest, reply *st return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, reply); done { return err } @@ -2103,7 +2103,7 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, reply); done { return err } @@ -2145,7 +2145,7 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, reply); done { return err } @@ -2257,7 +2257,7 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply * return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, reply); done { return err } @@ -2313,7 +2313,7 @@ func (a *ACL) AuthMethodList(args *structs.ACLAuthMethodListRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, reply); done { return err } @@ -2367,7 +2367,7 @@ func (a *ACL) Login(args *structs.ACLLoginRequest, reply *structs.ACLToken) erro return errors.New("do not provide a token when logging in") } - if done, err := a.srv.ForwardRPC("ACL.Login", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Login", args, reply); done { return err } @@ -2512,7 +2512,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { return acl.ErrNotFound } - if done, err := a.srv.ForwardRPC("ACL.Logout", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Logout", args, reply); done { return err } @@ -2559,7 +2559,7 @@ func (a *ACL) Authorize(args *structs.RemoteACLAuthorizationRequest, reply *[]st return err } - if done, err := a.srv.ForwardRPC("ACL.Authorize", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Authorize", args, reply); done { return err } diff --git a/agent/consul/acl_endpoint_legacy.go b/agent/consul/acl_endpoint_legacy.go index df7dcf8e55..3653ba8f90 100644 --- a/agent/consul/acl_endpoint_legacy.go +++ b/agent/consul/acl_endpoint_legacy.go @@ -24,7 +24,7 @@ var ACLEndpointLegacySummaries = []prometheus.SummaryDefinition{ // Bootstrap is used to perform a one-time ACL bootstrap operation on // a cluster to get the first management token. func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) error { - if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, reply); done { return err } @@ -155,7 +155,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.ForwardRPC("ACL.Apply", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now()) @@ -201,7 +201,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // Get is used to retrieve a single ACL func (a *ACL) Get(args *structs.ACLSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.ForwardRPC("ACL.Get", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Get", args, reply); done { return err } @@ -247,7 +247,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, // List is used to list all the ACLs func (a *ACL) List(args *structs.DCSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.ForwardRPC("ACL.List", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.List", args, reply); done { return err } diff --git a/agent/consul/auto_config_endpoint.go b/agent/consul/auto_config_endpoint.go index 6cbf0a5397..53007bde73 100644 --- a/agent/consul/auto_config_endpoint.go +++ b/agent/consul/auto_config_endpoint.go @@ -9,6 +9,9 @@ import ( "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/proto" + bexpr "github.com/hashicorp/go-bexpr" + "github.com/mitchellh/mapstructure" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/authmethod/ssoauth" "github.com/hashicorp/consul/agent/structs" @@ -17,8 +20,6 @@ import ( "github.com/hashicorp/consul/proto/pbconfig" "github.com/hashicorp/consul/proto/pbconnect" "github.com/hashicorp/consul/tlsutil" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/mitchellh/mapstructure" ) type AutoConfigOptions struct { @@ -107,7 +108,7 @@ func (a *jwtAuthorizer) Authorize(req *pbautoconf.AutoConfigRequest) (AutoConfig type AutoConfigBackend interface { CreateACLToken(template *structs.ACLToken) (*structs.ACLToken, error) DatacenterJoinAddresses(segment string) ([]string, error) - ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) + ForwardRPC(method string, info structs.RPCInfo, reply interface{}) (bool, error) GetCARoots() (*structs.IndexedCARoots, error) SignCertificate(csr *x509.CertificateRequest, id connect.CertURI) (*structs.IssuedCert, error) @@ -339,7 +340,7 @@ func (ac *AutoConfig) InitialConfiguration(req *pbautoconf.AutoConfigRequest, re } // forward to the leader - if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, req, resp); done { + if done, err := ac.backend.ForwardRPC("AutoConfig.InitialConfiguration", req, resp); done { return err } diff --git a/agent/consul/auto_config_endpoint_test.go b/agent/consul/auto_config_endpoint_test.go index c1495a7886..563ed42c35 100644 --- a/agent/consul/auto_config_endpoint_test.go +++ b/agent/consul/auto_config_endpoint_test.go @@ -11,6 +11,11 @@ import ( "testing" "time" + "github.com/hashicorp/memberlist" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/structs" "github.com/hashicorp/consul/internal/go-sso/oidcauth/oidcauthtest" @@ -18,10 +23,6 @@ import ( "github.com/hashicorp/consul/proto/pbconfig" "github.com/hashicorp/consul/sdk/testutil" "github.com/hashicorp/consul/tlsutil" - "github.com/hashicorp/memberlist" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" "gopkg.in/square/go-jose.v2/jwt" ) @@ -44,8 +45,8 @@ func (m *mockAutoConfigBackend) DatacenterJoinAddresses(segment string) ([]strin return addrs, ret.Error(1) } -func (m *mockAutoConfigBackend) ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) { - ret := m.Called(method, info, args, reply) +func (m *mockAutoConfigBackend) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) { + ret := m.Called(method, req, reply) return ret.Bool(0), ret.Error(1) } diff --git a/agent/consul/auto_encrypt_endpoint.go b/agent/consul/auto_encrypt_endpoint.go index 78a100acc2..2f96efacfc 100644 --- a/agent/consul/auto_encrypt_endpoint.go +++ b/agent/consul/auto_encrypt_endpoint.go @@ -24,7 +24,7 @@ func (a *AutoEncrypt) Sign( if !a.srv.config.AutoEncryptAllowTLS { return ErrAutoEncryptAllowTLSNotEnabled } - if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, args, reply); done { + if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, reply); done { return err } diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index e5d430992d..f657608696 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -142,7 +142,7 @@ func checkPreApply(check *structs.HealthCheck) { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.ForwardRPC("Catalog.Register", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Register", args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) @@ -217,7 +217,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now()) @@ -284,7 +284,7 @@ func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]str // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, reply); done { return err } @@ -332,7 +332,7 @@ func isUnmodified(opts structs.QueryOptions, index uint64) bool { // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, reply); done { return err } @@ -373,7 +373,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I } func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { - if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, reply); done { return err } @@ -402,7 +402,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, reply); done { return err } @@ -540,7 +540,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, reply); done { return err } @@ -591,7 +591,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs } func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error { - if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, reply); done { return err } @@ -644,7 +644,7 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru } func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { - if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, reply); done { return err } diff --git a/agent/consul/client.go b/agent/consul/client.go index bff2565be3..f341f9abc3 100644 --- a/agent/consul/client.go +++ b/agent/consul/client.go @@ -10,6 +10,10 @@ import ( "github.com/armon/go-metrics" "github.com/armon/go-metrics/prometheus" + "github.com/hashicorp/go-hclog" + "github.com/hashicorp/serf/serf" + "golang.org/x/time/rate" + "github.com/hashicorp/consul/agent/pool" "github.com/hashicorp/consul/agent/router" "github.com/hashicorp/consul/agent/structs" @@ -17,9 +21,6 @@ import ( "github.com/hashicorp/consul/logging" "github.com/hashicorp/consul/tlsutil" "github.com/hashicorp/consul/types" - "github.com/hashicorp/go-hclog" - "github.com/hashicorp/serf/serf" - "golang.org/x/time/rate" ) var ClientCounters = []prometheus.CounterDefinition{ @@ -287,7 +288,10 @@ TRY: ) metrics.IncrCounterWithLabels([]string{"client", "rpc", "failed"}, 1, []metrics.Label{{Name: "server", Value: server.Name}}) manager.NotifyFailedServer(server) - if retry := canRetry(args, rpcErr); !retry { + + // Use the zero value for RPCInfo if the request doesn't implement RPCInfo + info, _ := args.(structs.RPCInfo) + if retry := canRetry(info, rpcErr); !retry { return rpcErr } diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index 4825a78b3a..87feafae58 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -58,7 +58,7 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now()) @@ -105,7 +105,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now()) @@ -152,7 +152,7 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now()) @@ -209,7 +209,7 @@ func (c *ConfigEntry) ListAll(args *structs.ConfigEntryListAllRequest, reply *st return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now()) @@ -271,7 +271,7 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now()) @@ -305,7 +305,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return err } - if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now()) diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index ffecaae1c2..93f3b55fb3 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -6,12 +6,12 @@ import ( "time" "github.com/hashicorp/go-hclog" + "github.com/hashicorp/go-memdb" "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/connect" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/go-memdb" ) var ( @@ -56,7 +56,7 @@ func (s *ConnectCA) ConfigurationGet( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, reply); done { return err } @@ -88,7 +88,7 @@ func (s *ConnectCA) ConfigurationSet( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, reply); done { return err } @@ -109,7 +109,7 @@ func (s *ConnectCA) Roots( args *structs.DCSpecificRequest, reply *structs.IndexedCARoots) error { // Forward if necessary - if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, reply); done { return err } @@ -141,7 +141,7 @@ func (s *ConnectCA) Sign( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, reply); done { return err } @@ -209,7 +209,7 @@ func (s *ConnectCA) SignIntermediate( return ErrConnectNotEnabled } - if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, reply); done { return err } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index 2fbc10c700..f276215dfe 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -116,7 +116,7 @@ func (c *Coordinate) batchApplyUpdates() error { // Update inserts or updates the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { - if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Update", args, reply); done { return err } @@ -192,7 +192,7 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter // ListNodes returns the list of nodes with their raw network coordinates (if no // coordinates are available for a node it won't appear in this list). func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, reply); done { return err } @@ -215,7 +215,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I // Node returns the raw coordinates for a single node. func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Node", args, reply); done { return err } diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index 23f545ef02..00327d1bb4 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -5,11 +5,12 @@ import ( "time" metrics "github.com/armon/go-metrics" + memdb "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/discoverychain" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - memdb "github.com/hashicorp/go-memdb" ) type DiscoveryChain struct { @@ -22,7 +23,7 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return ErrConnectNotEnabled } - if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, reply); done { return err } defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now()) diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index f5fa358bb4..d4eafe8350 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -48,7 +48,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.ForwardRPC("FederationState.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Apply", args, reply); done { return err } @@ -94,7 +94,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo } func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs.FederationStateResponse) error { - if done, err := c.srv.ForwardRPC("FederationState.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Get", args, reply); done { return err } @@ -135,7 +135,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs // List is the endpoint meant to be used by consul servers performing // replication. func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.IndexedFederationStates) error { - if done, err := c.srv.ForwardRPC("FederationState.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.List", args, reply); done { return err } @@ -178,7 +178,7 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I // in the discovery info for dialing mesh gateways. Analogous to catalog // endpoints. func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, reply *structs.DatacenterIndexedCheckServiceNodes) error { - if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, reply); done { return err } diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 4706405a83..2dd7d575b8 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -5,11 +5,12 @@ import ( "sort" "github.com/armon/go-metrics" + bexpr "github.com/hashicorp/go-bexpr" + "github.com/hashicorp/go-memdb" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/structs" - bexpr "github.com/hashicorp/go-bexpr" - "github.com/hashicorp/go-memdb" ) // Health endpoint is used to query the health information @@ -20,7 +21,7 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, reply); done { return err } @@ -71,7 +72,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, reply); done { return err } @@ -121,7 +122,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, reply); done { return err } @@ -171,7 +172,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, reply); done { return err } diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index 83c7f4bcdb..a8b71b7248 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -80,7 +80,7 @@ func (s *Intention) Apply(args *structs.IntentionRequest, reply *string) error { // datacenter. These will then be replicated to all the other datacenters. args.Datacenter = s.srv.config.PrimaryDatacenter - if done, err := s.srv.ForwardRPC("Intention.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now()) @@ -423,7 +423,7 @@ func (s *Intention) Get(args *structs.IntentionQueryRequest, reply *structs.Inde } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Get", args, reply); done { return err } @@ -501,7 +501,7 @@ func (s *Intention) List(args *structs.IntentionListRequest, reply *structs.Inde } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.List", args, reply); done { return err } @@ -571,7 +571,7 @@ func (s *Intention) Match(args *structs.IntentionQueryRequest, reply *structs.In } // Forward if necessary - if done, err := s.srv.ForwardRPC("Intention.Match", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Match", args, reply); done { return err } @@ -645,7 +645,7 @@ func (s *Intention) Check(args *structs.IntentionQueryRequest, reply *structs.In } // Forward maybe - if done, err := s.srv.ForwardRPC("Intention.Check", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Check", args, reply); done { return err } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index 9a82f136b1..dad798dfc6 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -3,14 +3,15 @@ package consul import ( "fmt" - "github.com/hashicorp/consul/acl" - "github.com/hashicorp/consul/agent/consul/state" - "github.com/hashicorp/consul/agent/structs" bexpr "github.com/hashicorp/go-bexpr" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-memdb" "github.com/hashicorp/go-multierror" "github.com/hashicorp/serf/serf" + + "github.com/hashicorp/consul/acl" + "github.com/hashicorp/consul/agent/consul/state" + "github.com/hashicorp/consul/agent/structs" ) // Internal endpoint is used to query the miscellaneous info that @@ -24,7 +25,7 @@ type Internal struct { // NodeInfo is used to retrieve information about a specific node. func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, reply); done { return err } @@ -50,7 +51,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, // NodeDump is used to generate information about all of the nodes. func (m *Internal) NodeDump(args *structs.DCSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, reply); done { return err } @@ -89,7 +90,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, } func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedNodesWithGateways) error { - if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, reply); done { return err } @@ -145,7 +146,7 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. } func (m *Internal) ServiceTopology(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceTopology) error { - if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceTopology", args, reply); done { return err } if args.ServiceName == "" { @@ -199,7 +200,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl if args.ServiceName == "" { return fmt.Errorf("Must provide a service name") } - if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.IntentionUpstreams", args, reply); done { return err } @@ -233,7 +234,7 @@ func (m *Internal) IntentionUpstreams(args *structs.ServiceSpecificRequest, repl // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { - if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, reply); done { return err } @@ -312,7 +313,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl // Match returns the set of intentions that match the given source/destination. func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := m.srv.ForwardRPC("Internal.GatewayIntentions", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayIntentions", args, reply); done { return err } @@ -398,7 +399,7 @@ func (m *Internal) GatewayIntentions(args *structs.IntentionQueryRequest, reply // triggered in a remote DC. func (m *Internal) EventFire(args *structs.EventFireRequest, reply *structs.EventFireResponse) error { - if done, err := m.srv.ForwardRPC("Internal.EventFire", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.EventFire", args, reply); done { return err } diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index d52ba169c4..df168c5c55 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -96,7 +96,7 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz acl.Authorizer, op api. // Apply is used to apply a KVS update request to the data store. func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.ForwardRPC("KVS.Apply", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now()) @@ -135,7 +135,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key. func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Get", args, reply); done { return err } @@ -180,7 +180,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // List is used to list all keys with a given prefix. func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.ForwardRPC("KVS.List", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.List", args, reply); done { return err } @@ -232,7 +232,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error { - if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, reply); done { return err } diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 29cdbe912c..b415a7cc29 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -12,7 +12,7 @@ import ( // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *structs.AutopilotConfig) error { - if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, reply); done { return err } @@ -44,7 +44,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r // AutopilotSetConfiguration is used to set the current Autopilot configuration. func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { - if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, reply); done { return err } @@ -79,7 +79,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *structs // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, reply); done { return err } @@ -146,7 +146,7 @@ func (op *Operator) AutopilotState(args *structs.DCSpecificRequest, reply *autop // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.ForwardRPC("Operator.AutopilotState", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotState", args, reply); done { return err } diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 71fa1bbde8..d7ceb7e9a2 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -4,16 +4,17 @@ import ( "fmt" "net" + "github.com/hashicorp/raft" + "github.com/hashicorp/serf/serf" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/structs" - "github.com/hashicorp/raft" - "github.com/hashicorp/serf/serf" ) // RaftGetConfiguration is used to retrieve the current Raft configuration. func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { - if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, reply); done { return err } @@ -74,7 +75,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // "IP:port". The reply argument is not used, but it required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, reply); done { return err } @@ -127,7 +128,7 @@ REMOVE: // "IP:port". The reply argument is not used, but is required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, reply); done { return err } diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index e4dc05e9a6..c05de898f7 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -46,7 +46,7 @@ type PreparedQuery struct { // only be used for operations that modify the data. The ID of the session is // returned in the reply. func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) { - if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now()) @@ -225,7 +225,7 @@ func parseDNS(dns *structs.QueryDNSOptions) error { // Get returns a single prepared query by ID. func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, reply); done { return err } @@ -269,7 +269,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, // List returns all the prepared queries. func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, reply); done { return err } @@ -293,7 +293,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind // will be executed here. func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now()) @@ -340,7 +340,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, // part of a DNS lookup, or when executing prepared queries from the HTTP API. func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now()) @@ -475,7 +475,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // We don't want things to fan out further than one level. func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now()) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 80d5b95e6c..0766afe2e7 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -92,9 +92,7 @@ const ( enqueueLimit = 30 * time.Second ) -var ( - ErrChunkingResubmit = errors.New("please resubmit call for rechunking") -) +var ErrChunkingResubmit = errors.New("please resubmit call for rechunking") func (s *Server) rpcLogger() hclog.Logger { return s.loggers.Named(logging.RPC) @@ -527,8 +525,8 @@ func (c *limitedConn) Read(b []byte) (n int, err error) { return c.lr.Read(b) } -// canRetry returns true if the given situation is safe for a retry. -func canRetry(args interface{}, err error) bool { +// canRetry returns true if the request and error indicate that a retry is safe. +func canRetry(info structs.RPCInfo, err error) bool { // No leader errors are always safe to retry since no state could have // been changed. if structs.IsErrNoLeader(err) { @@ -542,26 +540,21 @@ func canRetry(args interface{}, err error) bool { // Reads are safe to retry for stream errors, such as if a server was // being shut down. - info, ok := args.(structs.RPCInfo) - if ok && info.IsRead() && lib.IsErrEOF(err) { - return true - } - - return false + return info != nil && info.IsRead() && lib.IsErrEOF(err) } // ForwardRPC is used to forward an RPC request to a remote DC or to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { +func (s *Server) ForwardRPC(method string, req structs.RPCInfo, reply interface{}) (bool, error) { var firstCheck time.Time // Handle DC forwarding - dc := info.RequestDatacenter() + dc := req.RequestDatacenter() if dc != s.config.Datacenter { // Local tokens only work within the current datacenter. Check to see // if we are attempting to forward one to a remote datacenter and strip // it, falling back on the anonymous token on the other end. - if token := info.TokenSecret(); token != "" { + if token := req.TokenSecret(); token != "" { done, ident, err := s.ResolveIdentityFromToken(token) if done { if err != nil && !acl.IsErrNotFound(err) { @@ -569,18 +562,18 @@ func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{ } if ident != nil && ident.IsLocal() { // Strip it from the request. - info.SetTokenSecret("") - defer info.SetTokenSecret(token) + req.SetTokenSecret("") + defer req.SetTokenSecret(token) } } } - err := s.forwardDC(method, dc, args, reply) + err := s.forwardDC(method, dc, req, reply) return true, err } // Check if we can allow a stale read, ensure our local DB is initialized - if info.IsRead() && info.AllowStaleRead() && !s.raft.LastContact().IsZero() { + if req.IsRead() && req.AllowStaleRead() && !s.raft.LastContact().IsZero() { return false, nil } @@ -603,8 +596,8 @@ CHECK_LEADER: // Handle the case of a known leader if leader != nil { rpcErr = s.connPool.RPC(s.config.Datacenter, leader.ShortName, leader.Addr, - method, args, reply) - if rpcErr != nil && canRetry(info, rpcErr) { + method, req, reply) + if rpcErr != nil && canRetry(req, rpcErr) { goto RETRY } return true, rpcErr @@ -790,11 +783,6 @@ func (s *Server) raftApplyWithEncoder( // In this case we didn't apply all chunks successfully, possibly due // to a term change; resubmit if resp == nil { - // This returns the error in the interface because the raft library - // returns errors from the FSM via the future, not via err from the - // apply function. Downstream client code expects to see any error - // from the FSM (as opposed to the apply itself) and decide whether - // it can retry in the future's response. return nil, ErrChunkingResubmit } // We expect that this conversion should always work diff --git a/agent/consul/rpc_test.go b/agent/consul/rpc_test.go index 4c283867e9..48981234be 100644 --- a/agent/consul/rpc_test.go +++ b/agent/consul/rpc_test.go @@ -4,6 +4,8 @@ import ( "bytes" "encoding/binary" "errors" + "fmt" + "io" "math" "net" "os" @@ -12,6 +14,11 @@ import ( "testing" "time" + "github.com/hashicorp/go-memdb" + msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/hashicorp/consul/acl" "github.com/hashicorp/consul/agent/consul/state" "github.com/hashicorp/consul/agent/pool" @@ -20,10 +27,6 @@ import ( "github.com/hashicorp/consul/api" "github.com/hashicorp/consul/sdk/testutil/retry" "github.com/hashicorp/consul/testrpc" - "github.com/hashicorp/go-memdb" - msgpackrpc "github.com/hashicorp/net-rpc-msgpackrpc" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" ) func TestRPC_NoLeader_Fail(t *testing.T) { @@ -952,3 +955,59 @@ func TestRPC_LocalTokenStrippedOnForward(t *testing.T) { require.NoError(t, err) require.Equal(t, localToken2.SecretID, arg.WriteRequest.Token, "token should not be stripped") } + +func TestCanRetry(t *testing.T) { + type testCase struct { + name string + req structs.RPCInfo + err error + expected bool + } + + run := func(t *testing.T, tc testCase) { + require.Equal(t, tc.expected, canRetry(tc.req, tc.err)) + } + + var testCases = []testCase{ + { + name: "unexpected error", + err: fmt.Errorf("some arbitrary error"), + expected: false, + }, + { + name: "checking error", + err: fmt.Errorf("some wrapping :%w", ErrChunkingResubmit), + expected: true, + }, + { + name: "no leader error", + err: fmt.Errorf("some wrapping: %w", structs.ErrNoLeader), + expected: true, + }, + { + name: "EOF on read request", + req: isReadRequest{}, + err: io.EOF, + expected: true, + }, + { + name: "EOF on write request", + err: io.EOF, + expected: false, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + run(t, tc) + }) + } +} + +type isReadRequest struct { + structs.RPCInfo +} + +func (r isReadRequest) IsRead() bool { + return true +} diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index 1287ca7960..ca81e5cdca 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -45,7 +45,7 @@ func fixupSessionSpecificRequest(args *structs.SessionSpecificRequest) { // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { - if done, err := s.srv.ForwardRPC("Session.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"session", "apply"}, time.Now()) @@ -170,7 +170,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // Get is used to retrieve a single session func (s *Session) Get(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Get", args, reply); done { return err } @@ -211,7 +211,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, // List is used to list all the active sessions func (s *Session) List(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.List", args, reply); done { return err } @@ -245,7 +245,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest, // NodeSessions is used to get all the sessions for a particular node func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, reply); done { return err } @@ -279,7 +279,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, // Renew is used to renew the TTL on a single session func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.ForwardRPC("Session.Renew", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Renew", args, reply); done { return err } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index da1a224cb5..6f6db77375 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -121,7 +121,7 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx // Apply is used to apply multiple operations in a single, atomic transaction. func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { - if done, err := t.srv.ForwardRPC("Txn.Apply", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Apply", args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now()) @@ -160,7 +160,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error // supports staleness, so this should be preferred if you're just performing // reads. func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error { - if done, err := t.srv.ForwardRPC("Txn.Read", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Read", args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "read"}, time.Now()) diff --git a/test/integration/connect/envoy/case-basic/verify.bats b/test/integration/connect/envoy/case-basic/verify.bats index a788a8e48d..47787f5560 100644 --- a/test/integration/connect/envoy/case-basic/verify.bats +++ b/test/integration/connect/envoy/case-basic/verify.bats @@ -39,7 +39,7 @@ load helpers @test "s1 proxy should have been configured with one rbac listener filter at L4" { LISTEN_FILTERS=$(get_envoy_listener_filters localhost:19000) PUB=$(echo "$LISTEN_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "LISTEN_FILTERS = $LISTEN_FILTERS" echo "PUB = $PUB" diff --git a/test/integration/connect/envoy/case-http/verify.bats b/test/integration/connect/envoy/case-http/verify.bats index 551d5a60fa..ce20b268af 100644 --- a/test/integration/connect/envoy/case-http/verify.bats +++ b/test/integration/connect/envoy/case-http/verify.bats @@ -36,7 +36,7 @@ load helpers @test "s1 proxy should have been configured with http connection managers" { LISTEN_FILTERS=$(get_envoy_listener_filters localhost:19000) PUB=$(echo "$LISTEN_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$LISTEN_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "LISTEN_FILTERS = $LISTEN_FILTERS" echo "PUB = $PUB" @@ -59,7 +59,7 @@ load helpers @test "s1 proxy should have been configured with http rbac filters" { HTTP_FILTERS=$(get_envoy_http_filters localhost:19000) PUB=$(echo "$HTTP_FILTERS" | grep -E "^public_listener:" | cut -f 2 -d ' ' ) - UPS=$(echo "$HTTP_FILTERS" | grep -E "^(default\/)?s2:" | cut -f 2 -d ' ' ) + UPS=$(echo "$HTTP_FILTERS" | grep -E "^(\/default\/)?s2:" | cut -f 2 -d ' ' ) echo "HTTP_FILTERS = $HTTP_FILTERS" echo "PUB = $PUB"