diff --git a/agent/consul/acl_endpoint.go b/agent/consul/acl_endpoint.go index f4d29ab9f7..6d61d4f857 100644 --- a/agent/consul/acl_endpoint.go +++ b/agent/consul/acl_endpoint.go @@ -115,7 +115,7 @@ func (a *ACL) BootstrapTokens(args *structs.DCSpecificRequest, reply *structs.AC if err := a.aclPreCheck(); err != nil { return err } - if done, err := a.srv.forward("ACL.BootstrapTokens", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BootstrapTokens", args, args, reply); done { return err } @@ -214,7 +214,7 @@ func (a *ACL) TokenRead(args *structs.ACLTokenGetRequest, reply *structs.ACLToke args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenRead", args, args, reply); done { return err } @@ -283,7 +283,7 @@ func (a *ACL) TokenClone(args *structs.ACLTokenSetRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenClone", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenClone", args, args, reply); done { return err } @@ -354,7 +354,7 @@ func (a *ACL) TokenSet(args *structs.ACLTokenSetRequest, reply *structs.ACLToken return fmt.Errorf("Local tokens are disabled") } - if done, err := a.srv.forward("ACL.TokenSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenSet", args, args, reply); done { return err } @@ -764,7 +764,7 @@ func (a *ACL) TokenDelete(args *structs.ACLTokenDeleteRequest, reply *string) er args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenDelete", args, args, reply); done { return err } @@ -854,7 +854,7 @@ func (a *ACL) TokenList(args *structs.ACLTokenListRequest, reply *structs.ACLTok args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenList", args, args, reply); done { return err } @@ -917,7 +917,7 @@ func (a *ACL) TokenBatchRead(args *structs.ACLTokenBatchGetRequest, reply *struc args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.TokenBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.TokenBatchRead", args, args, reply); done { return err } @@ -971,7 +971,7 @@ func (a *ACL) PolicyRead(args *structs.ACLPolicyGetRequest, reply *structs.ACLPo return err } - if done, err := a.srv.forward("ACL.PolicyRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyRead", args, args, reply); done { return err } @@ -1009,7 +1009,7 @@ func (a *ACL) PolicyBatchRead(args *structs.ACLPolicyBatchGetRequest, reply *str return err } - if done, err := a.srv.forward("ACL.PolicyBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyBatchRead", args, args, reply); done { return err } @@ -1047,7 +1047,7 @@ func (a *ACL) PolicySet(args *structs.ACLPolicySetRequest, reply *structs.ACLPol args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.PolicySet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicySet", args, args, reply); done { return err } @@ -1182,7 +1182,7 @@ func (a *ACL) PolicyDelete(args *structs.ACLPolicyDeleteRequest, reply *string) args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.PolicyDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyDelete", args, args, reply); done { return err } @@ -1241,7 +1241,7 @@ func (a *ACL) PolicyList(args *structs.ACLPolicyListRequest, reply *structs.ACLP return err } - if done, err := a.srv.forward("ACL.PolicyList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyList", args, args, reply); done { return err } @@ -1281,7 +1281,7 @@ func (a *ACL) PolicyResolve(args *structs.ACLPolicyBatchGetRequest, reply *struc return err } - if done, err := a.srv.forward("ACL.PolicyResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.PolicyResolve", args, args, reply); done { return err } @@ -1339,7 +1339,7 @@ func makeACLETag(parent string, policy *acl.Policy) string { // GetPolicy is used to retrieve a compiled policy object with a TTL. Does not // support a blocking query. func (a *ACL) GetPolicy(args *structs.ACLPolicyResolveLegacyRequest, reply *structs.ACLPolicyResolveLegacyResponse) error { - if done, err := a.srv.forward("ACL.GetPolicy", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.GetPolicy", args, args, reply); done { return err } @@ -1386,7 +1386,7 @@ func (a *ACL) ReplicationStatus(args *structs.DCSpecificRequest, // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := a.srv.forward("ACL.ReplicationStatus", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.ReplicationStatus", args, args, reply); done { return err } @@ -1414,7 +1414,7 @@ func (a *ACL) RoleRead(args *structs.ACLRoleGetRequest, reply *structs.ACLRoleRe return err } - if done, err := a.srv.forward("ACL.RoleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleRead", args, args, reply); done { return err } @@ -1453,7 +1453,7 @@ func (a *ACL) RoleBatchRead(args *structs.ACLRoleBatchGetRequest, reply *structs return err } - if done, err := a.srv.forward("ACL.RoleBatchRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleBatchRead", args, args, reply); done { return err } @@ -1491,7 +1491,7 @@ func (a *ACL) RoleSet(args *structs.ACLRoleSetRequest, reply *structs.ACLRole) e args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.RoleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleSet", args, args, reply); done { return err } @@ -1653,7 +1653,7 @@ func (a *ACL) RoleDelete(args *structs.ACLRoleDeleteRequest, reply *string) erro args.Datacenter = a.srv.config.ACLDatacenter } - if done, err := a.srv.forward("ACL.RoleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleDelete", args, args, reply); done { return err } @@ -1708,7 +1708,7 @@ func (a *ACL) RoleList(args *structs.ACLRoleListRequest, reply *structs.ACLRoleL return err } - if done, err := a.srv.forward("ACL.RoleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleList", args, args, reply); done { return err } @@ -1742,7 +1742,7 @@ func (a *ACL) RoleResolve(args *structs.ACLRoleBatchGetRequest, reply *structs.A return err } - if done, err := a.srv.forward("ACL.RoleResolve", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.RoleResolve", args, args, reply); done { return err } @@ -1807,7 +1807,7 @@ func (a *ACL) BindingRuleRead(args *structs.ACLBindingRuleGetRequest, reply *str return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleRead", args, args, reply); done { return err } @@ -1846,7 +1846,7 @@ func (a *ACL) BindingRuleSet(args *structs.ACLBindingRuleSetRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleSet", args, args, reply); done { return err } @@ -1978,7 +1978,7 @@ func (a *ACL) BindingRuleDelete(args *structs.ACLBindingRuleDeleteRequest, reply return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleDelete", args, args, reply); done { return err } @@ -2033,7 +2033,7 @@ func (a *ACL) BindingRuleList(args *structs.ACLBindingRuleListRequest, reply *st return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.BindingRuleList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.BindingRuleList", args, args, reply); done { return err } @@ -2073,7 +2073,7 @@ func (a *ACL) AuthMethodRead(args *structs.ACLAuthMethodGetRequest, reply *struc return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodRead", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodRead", args, args, reply); done { return err } @@ -2115,7 +2115,7 @@ func (a *ACL) AuthMethodSet(args *structs.ACLAuthMethodSetRequest, reply *struct return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodSet", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodSet", args, args, reply); done { return err } @@ -2231,7 +2231,7 @@ func (a *ACL) AuthMethodDelete(args *structs.ACLAuthMethodDeleteRequest, reply * return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodDelete", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodDelete", args, args, reply); done { return err } @@ -2291,7 +2291,7 @@ func (a *ACL) AuthMethodList(args *structs.ACLAuthMethodListRequest, reply *stru return errAuthMethodsRequireTokenReplication } - if done, err := a.srv.forward("ACL.AuthMethodList", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.AuthMethodList", args, args, reply); done { return err } @@ -2345,7 +2345,7 @@ func (a *ACL) Login(args *structs.ACLLoginRequest, reply *structs.ACLToken) erro return errors.New("do not provide a token when logging in") } - if done, err := a.srv.forward("ACL.Login", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Login", args, args, reply); done { return err } @@ -2490,7 +2490,7 @@ func (a *ACL) Logout(args *structs.ACLLogoutRequest, reply *bool) error { return acl.ErrNotFound } - if done, err := a.srv.forward("ACL.Logout", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Logout", args, args, reply); done { return err } @@ -2543,7 +2543,7 @@ func (a *ACL) Authorize(args *structs.RemoteACLAuthorizationRequest, reply *[]st return err } - if done, err := a.srv.forward("ACL.Authorize", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Authorize", args, args, reply); done { return err } diff --git a/agent/consul/acl_endpoint_legacy.go b/agent/consul/acl_endpoint_legacy.go index 890699b639..2d771cf731 100644 --- a/agent/consul/acl_endpoint_legacy.go +++ b/agent/consul/acl_endpoint_legacy.go @@ -15,7 +15,7 @@ import ( // Bootstrap is used to perform a one-time ACL bootstrap operation on // a cluster to get the first management token. func (a *ACL) Bootstrap(args *structs.DCSpecificRequest, reply *structs.ACL) error { - if done, err := a.srv.forward("ACL.Bootstrap", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Bootstrap", args, args, reply); done { return err } @@ -153,7 +153,7 @@ func aclApplyInternal(srv *Server, args *structs.ACLRequest, reply *string) erro // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { - if done, err := a.srv.forward("ACL.Apply", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"acl", "apply"}, time.Now()) @@ -199,7 +199,7 @@ func (a *ACL) Apply(args *structs.ACLRequest, reply *string) error { // Get is used to retrieve a single ACL func (a *ACL) Get(args *structs.ACLSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.forward("ACL.Get", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.Get", args, args, reply); done { return err } @@ -245,7 +245,7 @@ func (a *ACL) Get(args *structs.ACLSpecificRequest, // List is used to list all the ACLs func (a *ACL) List(args *structs.DCSpecificRequest, reply *structs.IndexedACLs) error { - if done, err := a.srv.forward("ACL.List", args, args, reply); done { + if done, err := a.srv.ForwardRPC("ACL.List", args, args, reply); done { return err } diff --git a/agent/consul/auto_encrypt_endpoint.go b/agent/consul/auto_encrypt_endpoint.go index a8267da076..78a100acc2 100644 --- a/agent/consul/auto_encrypt_endpoint.go +++ b/agent/consul/auto_encrypt_endpoint.go @@ -24,7 +24,7 @@ func (a *AutoEncrypt) Sign( if !a.srv.config.AutoEncryptAllowTLS { return ErrAutoEncryptAllowTLSNotEnabled } - if done, err := a.srv.forward("AutoEncrypt.Sign", args, args, reply); done { + if done, err := a.srv.ForwardRPC("AutoEncrypt.Sign", args, args, reply); done { return err } diff --git a/agent/consul/catalog_endpoint.go b/agent/consul/catalog_endpoint.go index b4f7ce2790..301e00974c 100644 --- a/agent/consul/catalog_endpoint.go +++ b/agent/consul/catalog_endpoint.go @@ -94,7 +94,7 @@ func checkPreApply(check *structs.HealthCheck) { // Register is used register that a node is providing a given service. func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Register", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Register", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "register"}, time.Now()) @@ -175,7 +175,7 @@ func (c *Catalog) Register(args *structs.RegisterRequest, reply *struct{}) error // Deregister is used to remove a service registration for a given node. func (c *Catalog) Deregister(args *structs.DeregisterRequest, reply *struct{}) error { - if done, err := c.srv.forward("Catalog.Deregister", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.Deregister", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"catalog", "deregister"}, time.Now()) @@ -244,7 +244,7 @@ func (c *Catalog) ListDatacenters(args *structs.DatacentersRequest, reply *[]str // ListNodes is used to query the nodes in a DC func (c *Catalog) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedNodes) error { - if done, err := c.srv.forward("Catalog.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListNodes", args, args, reply); done { return err } @@ -292,7 +292,7 @@ func isUnmodified(opts structs.QueryOptions, index uint64) bool { // ListServices is used to query the services in a DC func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.IndexedServices) error { - if done, err := c.srv.forward("Catalog.ListServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ListServices", args, args, reply); done { return err } @@ -331,7 +331,7 @@ func (c *Catalog) ListServices(args *structs.DCSpecificRequest, reply *structs.I } func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.IndexedServiceList) error { - if done, err := c.srv.forward("Catalog.ServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceList", args, args, reply); done { return err } @@ -360,7 +360,7 @@ func (c *Catalog) ServiceList(args *structs.DCSpecificRequest, reply *structs.In // ServiceNodes returns all the nodes registered as part of a service func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceNodes) error { - if done, err := c.srv.forward("Catalog.ServiceNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.ServiceNodes", args, args, reply); done { return err } @@ -498,7 +498,7 @@ func (c *Catalog) ServiceNodes(args *structs.ServiceSpecificRequest, reply *stru // NodeServices returns all the services registered as part of a node func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServices) error { - if done, err := c.srv.forward("Catalog.NodeServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServices", args, args, reply); done { return err } @@ -549,7 +549,7 @@ func (c *Catalog) NodeServices(args *structs.NodeSpecificRequest, reply *structs } func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeServiceList) error { - if done, err := c.srv.forward("Catalog.NodeServiceList", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.NodeServiceList", args, args, reply); done { return err } @@ -602,7 +602,7 @@ func (c *Catalog) NodeServiceList(args *structs.NodeSpecificRequest, reply *stru } func (c *Catalog) GatewayServices(args *structs.ServiceSpecificRequest, reply *structs.IndexedGatewayServices) error { - if done, err := c.srv.forward("Catalog.GatewayServices", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Catalog.GatewayServices", args, args, reply); done { return err } diff --git a/agent/consul/config_endpoint.go b/agent/consul/config_endpoint.go index f3c3177ba3..996f3d23b2 100644 --- a/agent/consul/config_endpoint.go +++ b/agent/consul/config_endpoint.go @@ -27,7 +27,7 @@ func (c *ConfigEntry) Apply(args *structs.ConfigEntryRequest, reply *bool) error // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("ConfigEntry.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "apply"}, time.Now()) @@ -73,7 +73,7 @@ func (c *ConfigEntry) Get(args *structs.ConfigEntryQuery, reply *structs.ConfigE return err } - if done, err := c.srv.forward("ConfigEntry.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Get", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "get"}, time.Now()) @@ -120,7 +120,7 @@ func (c *ConfigEntry) List(args *structs.ConfigEntryQuery, reply *structs.Indexe return err } - if done, err := c.srv.forward("ConfigEntry.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.List", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "list"}, time.Now()) @@ -165,7 +165,7 @@ func (c *ConfigEntry) ListAll(args *structs.DCSpecificRequest, reply *structs.In return err } - if done, err := c.srv.forward("ConfigEntry.ListAll", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ListAll", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "listAll"}, time.Now()) @@ -209,7 +209,7 @@ func (c *ConfigEntry) Delete(args *structs.ConfigEntryRequest, reply *struct{}) // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("ConfigEntry.Delete", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.Delete", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "delete"}, time.Now()) @@ -245,7 +245,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r return err } - if done, err := c.srv.forward("ConfigEntry.ResolveServiceConfig", args, args, reply); done { + if done, err := c.srv.ForwardRPC("ConfigEntry.ResolveServiceConfig", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"config_entry", "resolve_service_config"}, time.Now()) diff --git a/agent/consul/connect_ca_endpoint.go b/agent/consul/connect_ca_endpoint.go index 995cdd3f3d..9691d203a2 100644 --- a/agent/consul/connect_ca_endpoint.go +++ b/agent/consul/connect_ca_endpoint.go @@ -63,7 +63,7 @@ func (s *ConnectCA) ConfigurationGet( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.ConfigurationGet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationGet", args, args, reply); done { return err } @@ -95,7 +95,7 @@ func (s *ConnectCA) ConfigurationSet( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.ConfigurationSet", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.ConfigurationSet", args, args, reply); done { return err } @@ -312,7 +312,7 @@ func (s *ConnectCA) Roots( args *structs.DCSpecificRequest, reply *structs.IndexedCARoots) error { // Forward if necessary - if done, err := s.srv.forward("ConnectCA.Roots", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Roots", args, args, reply); done { return err } @@ -387,7 +387,7 @@ func (s *ConnectCA) Sign( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.Sign", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.Sign", args, args, reply); done { return err } @@ -593,7 +593,7 @@ func (s *ConnectCA) SignIntermediate( return ErrConnectNotEnabled } - if done, err := s.srv.forward("ConnectCA.SignIntermediate", args, args, reply); done { + if done, err := s.srv.ForwardRPC("ConnectCA.SignIntermediate", args, args, reply); done { return err } diff --git a/agent/consul/coordinate_endpoint.go b/agent/consul/coordinate_endpoint.go index d31136ebc8..e4a3ac4f79 100644 --- a/agent/consul/coordinate_endpoint.go +++ b/agent/consul/coordinate_endpoint.go @@ -117,7 +117,7 @@ func (c *Coordinate) batchApplyUpdates() error { // Update inserts or updates the LAN coordinate of a node. func (c *Coordinate) Update(args *structs.CoordinateUpdateRequest, reply *struct{}) (err error) { - if done, err := c.srv.forward("Coordinate.Update", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Update", args, args, reply); done { return err } @@ -184,7 +184,7 @@ func (c *Coordinate) ListDatacenters(args *struct{}, reply *[]structs.Datacenter // ListNodes returns the list of nodes with their raw network coordinates (if no // coordinates are available for a node it won't appear in this list). func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.forward("Coordinate.ListNodes", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.ListNodes", args, args, reply); done { return err } @@ -207,7 +207,7 @@ func (c *Coordinate) ListNodes(args *structs.DCSpecificRequest, reply *structs.I // Node returns the raw coordinates for a single node. func (c *Coordinate) Node(args *structs.NodeSpecificRequest, reply *structs.IndexedCoordinates) error { - if done, err := c.srv.forward("Coordinate.Node", args, args, reply); done { + if done, err := c.srv.ForwardRPC("Coordinate.Node", args, args, reply); done { return err } diff --git a/agent/consul/discovery_chain_endpoint.go b/agent/consul/discovery_chain_endpoint.go index a4a21ec053..a9933fa453 100644 --- a/agent/consul/discovery_chain_endpoint.go +++ b/agent/consul/discovery_chain_endpoint.go @@ -24,7 +24,7 @@ func (c *DiscoveryChain) Get(args *structs.DiscoveryChainRequest, reply *structs return ErrConnectNotEnabled } - if done, err := c.srv.forward("DiscoveryChain.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("DiscoveryChain.Get", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"discovery_chain", "get"}, time.Now()) diff --git a/agent/consul/federation_state_endpoint.go b/agent/consul/federation_state_endpoint.go index cd914e9ba8..a98ab83e8f 100644 --- a/agent/consul/federation_state_endpoint.go +++ b/agent/consul/federation_state_endpoint.go @@ -27,7 +27,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo // be replicated to all the other datacenters. args.Datacenter = c.srv.config.PrimaryDatacenter - if done, err := c.srv.forward("FederationState.Apply", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Apply", args, args, reply); done { return err } @@ -76,7 +76,7 @@ func (c *FederationState) Apply(args *structs.FederationStateRequest, reply *boo } func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs.FederationStateResponse) error { - if done, err := c.srv.forward("FederationState.Get", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.Get", args, args, reply); done { return err } @@ -117,7 +117,7 @@ func (c *FederationState) Get(args *structs.FederationStateQuery, reply *structs // List is the endpoint meant to be used by consul servers performing // replication. func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.IndexedFederationStates) error { - if done, err := c.srv.forward("FederationState.List", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.List", args, args, reply); done { return err } @@ -160,7 +160,7 @@ func (c *FederationState) List(args *structs.DCSpecificRequest, reply *structs.I // in the discovery info for dialing mesh gateways. Analogous to catalog // endpoints. func (c *FederationState) ListMeshGateways(args *structs.DCSpecificRequest, reply *structs.DatacenterIndexedCheckServiceNodes) error { - if done, err := c.srv.forward("FederationState.ListMeshGateways", args, args, reply); done { + if done, err := c.srv.ForwardRPC("FederationState.ListMeshGateways", args, args, reply); done { return err } diff --git a/agent/consul/gateway_locator.go b/agent/consul/gateway_locator.go index 638c9efdf4..7233266462 100644 --- a/agent/consul/gateway_locator.go +++ b/agent/consul/gateway_locator.go @@ -325,7 +325,7 @@ func (g *GatewayLocator) runOnce(lastFetchIndex uint64) (uint64, error) { return queryMeta.Index, nil } -// checkLocalStateIsReady is inlined a bit from (*Server).forward(). We need to +// checkLocalStateIsReady is inlined a bit from (*Server).ForwardRPC(). We need to // wait until our own state machine is safe to read from. func (g *GatewayLocator) checkLocalStateIsReady() error { // Check if we can allow a stale read, ensure our local DB is initialized diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index 0b8353840f..4706405a83 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -20,7 +20,7 @@ type Health struct { // ChecksInState is used to get all the checks in a given state func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.ChecksInState", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ChecksInState", args, args, reply); done { return err } @@ -71,7 +71,7 @@ func (h *Health) ChecksInState(args *structs.ChecksInStateRequest, // NodeChecks is used to get all the checks for a node func (h *Health) NodeChecks(args *structs.NodeSpecificRequest, reply *structs.IndexedHealthChecks) error { - if done, err := h.srv.forward("Health.NodeChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.NodeChecks", args, args, reply); done { return err } @@ -121,7 +121,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, } // Potentially forward - if done, err := h.srv.forward("Health.ServiceChecks", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceChecks", args, args, reply); done { return err } @@ -171,7 +171,7 @@ func (h *Health) ServiceChecks(args *structs.ServiceSpecificRequest, // ServiceNodes returns all the nodes registered as part of a service including health info func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := h.srv.forward("Health.ServiceNodes", args, args, reply); done { + if done, err := h.srv.ForwardRPC("Health.ServiceNodes", args, args, reply); done { return err } diff --git a/agent/consul/intention_endpoint.go b/agent/consul/intention_endpoint.go index a030b4e9e9..4204d1c72f 100644 --- a/agent/consul/intention_endpoint.go +++ b/agent/consul/intention_endpoint.go @@ -197,7 +197,7 @@ func (s *Intention) Apply( args.Datacenter = s.srv.config.PrimaryDatacenter } - if done, err := s.srv.forward("Intention.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"consul", "intention", "apply"}, time.Now()) @@ -253,7 +253,7 @@ func (s *Intention) Get( args *structs.IntentionQueryRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := s.srv.forward("Intention.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Get", args, args, reply); done { return err } @@ -328,7 +328,7 @@ func (s *Intention) List( args *structs.DCSpecificRequest, reply *structs.IndexedIntentions) error { // Forward if necessary - if done, err := s.srv.forward("Intention.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.List", args, args, reply); done { return err } @@ -379,7 +379,7 @@ func (s *Intention) Match( args *structs.IntentionQueryRequest, reply *structs.IndexedIntentionMatches) error { // Forward if necessary - if done, err := s.srv.forward("Intention.Match", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Match", args, args, reply); done { return err } @@ -448,7 +448,7 @@ func (s *Intention) Check( args *structs.IntentionQueryRequest, reply *structs.IntentionQueryCheckResponse) error { // Forward maybe - if done, err := s.srv.forward("Intention.Check", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Intention.Check", args, args, reply); done { return err } diff --git a/agent/consul/internal_endpoint.go b/agent/consul/internal_endpoint.go index e02c21d9d7..720810f0d8 100644 --- a/agent/consul/internal_endpoint.go +++ b/agent/consul/internal_endpoint.go @@ -24,7 +24,7 @@ type Internal struct { // NodeInfo is used to retrieve information about a specific node. func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.forward("Internal.NodeInfo", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeInfo", args, args, reply); done { return err } @@ -50,7 +50,7 @@ func (m *Internal) NodeInfo(args *structs.NodeSpecificRequest, // NodeDump is used to generate information about all of the nodes. func (m *Internal) NodeDump(args *structs.DCSpecificRequest, reply *structs.IndexedNodeDump) error { - if done, err := m.srv.forward("Internal.NodeDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.NodeDump", args, args, reply); done { return err } @@ -89,7 +89,7 @@ func (m *Internal) NodeDump(args *structs.DCSpecificRequest, } func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs.IndexedCheckServiceNodes) error { - if done, err := m.srv.forward("Internal.ServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.ServiceDump", args, args, reply); done { return err } @@ -129,7 +129,7 @@ func (m *Internal) ServiceDump(args *structs.ServiceDumpRequest, reply *structs. // GatewayServiceNodes returns all the nodes for services associated with a gateway along with their gateway config func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, reply *structs.IndexedServiceDump) error { - if done, err := m.srv.forward("Internal.GatewayServiceDump", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.GatewayServiceDump", args, args, reply); done { return err } @@ -210,7 +210,7 @@ func (m *Internal) GatewayServiceDump(args *structs.ServiceSpecificRequest, repl // triggered in a remote DC. func (m *Internal) EventFire(args *structs.EventFireRequest, reply *structs.EventFireResponse) error { - if done, err := m.srv.forward("Internal.EventFire", args, args, reply); done { + if done, err := m.srv.ForwardRPC("Internal.EventFire", args, args, reply); done { return err } diff --git a/agent/consul/kvs_endpoint.go b/agent/consul/kvs_endpoint.go index 2e93cdbf69..04dee57b62 100644 --- a/agent/consul/kvs_endpoint.go +++ b/agent/consul/kvs_endpoint.go @@ -87,7 +87,7 @@ func kvsPreApply(logger hclog.Logger, srv *Server, authz acl.Authorizer, op api. // Apply is used to apply a KVS update request to the data store. func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { - if done, err := k.srv.forward("KVS.Apply", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"kvs", "apply"}, time.Now()) @@ -130,7 +130,7 @@ func (k *KVS) Apply(args *structs.KVSRequest, reply *bool) error { // Get is used to lookup a single key. func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.Get", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.Get", args, args, reply); done { return err } @@ -175,7 +175,7 @@ func (k *KVS) Get(args *structs.KeyRequest, reply *structs.IndexedDirEntries) er // List is used to list all keys with a given prefix. func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) error { - if done, err := k.srv.forward("KVS.List", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.List", args, args, reply); done { return err } @@ -227,7 +227,7 @@ func (k *KVS) List(args *structs.KeyRequest, reply *structs.IndexedDirEntries) e // of the response so that only a subset of the prefix is returned. In this // mode, the keys which are omitted are still counted in the returned index. func (k *KVS) ListKeys(args *structs.KeyListRequest, reply *structs.IndexedKeyList) error { - if done, err := k.srv.forward("KVS.ListKeys", args, args, reply); done { + if done, err := k.srv.ForwardRPC("KVS.ListKeys", args, args, reply); done { return err } diff --git a/agent/consul/operator_autopilot_endpoint.go b/agent/consul/operator_autopilot_endpoint.go index 47cc8f49a8..c286fd1e95 100644 --- a/agent/consul/operator_autopilot_endpoint.go +++ b/agent/consul/operator_autopilot_endpoint.go @@ -10,7 +10,7 @@ import ( // AutopilotGetConfiguration is used to retrieve the current Autopilot configuration. func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, reply *autopilot.Config) error { - if done, err := op.srv.forward("Operator.AutopilotGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotGetConfiguration", args, args, reply); done { return err } @@ -42,7 +42,7 @@ func (op *Operator) AutopilotGetConfiguration(args *structs.DCSpecificRequest, r // AutopilotSetConfiguration is used to set the current Autopilot configuration. func (op *Operator) AutopilotSetConfiguration(args *structs.AutopilotSetConfigRequest, reply *bool) error { - if done, err := op.srv.forward("Operator.AutopilotSetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.AutopilotSetConfiguration", args, args, reply); done { return err } @@ -81,7 +81,7 @@ func (op *Operator) ServerHealth(args *structs.DCSpecificRequest, reply *autopil // re-using a structure where we don't support all the options. args.RequireConsistent = true args.AllowStale = false - if done, err := op.srv.forward("Operator.ServerHealth", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.ServerHealth", args, args, reply); done { return err } diff --git a/agent/consul/operator_raft_endpoint.go b/agent/consul/operator_raft_endpoint.go index 3fa4a6691a..b3ea4da28e 100644 --- a/agent/consul/operator_raft_endpoint.go +++ b/agent/consul/operator_raft_endpoint.go @@ -13,7 +13,7 @@ import ( // RaftGetConfiguration is used to retrieve the current Raft configuration. func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply *structs.RaftConfigurationResponse) error { - if done, err := op.srv.forward("Operator.RaftGetConfiguration", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftGetConfiguration", args, args, reply); done { return err } @@ -74,7 +74,7 @@ func (op *Operator) RaftGetConfiguration(args *structs.DCSpecificRequest, reply // "IP:port". The reply argument is not used, but it required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByAddress(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByAddress", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByAddress", args, args, reply); done { return err } @@ -146,7 +146,7 @@ REMOVE: // "IP:port". The reply argument is not used, but is required to fulfill the RPC // interface. func (op *Operator) RaftRemovePeerByID(args *structs.RaftRemovePeerRequest, reply *struct{}) error { - if done, err := op.srv.forward("Operator.RaftRemovePeerByID", args, args, reply); done { + if done, err := op.srv.ForwardRPC("Operator.RaftRemovePeerByID", args, args, reply); done { return err } diff --git a/agent/consul/prepared_query_endpoint.go b/agent/consul/prepared_query_endpoint.go index 01957a7c61..bb13ff3cb3 100644 --- a/agent/consul/prepared_query_endpoint.go +++ b/agent/consul/prepared_query_endpoint.go @@ -25,7 +25,7 @@ type PreparedQuery struct { // only be used for operations that modify the data. The ID of the session is // returned in the reply. func (p *PreparedQuery) Apply(args *structs.PreparedQueryRequest, reply *string) (err error) { - if done, err := p.srv.forward("PreparedQuery.Apply", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "apply"}, time.Now()) @@ -209,7 +209,7 @@ func parseDNS(dns *structs.QueryDNSOptions) error { // Get returns a single prepared query by ID. func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.forward("PreparedQuery.Get", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Get", args, args, reply); done { return err } @@ -253,7 +253,7 @@ func (p *PreparedQuery) Get(args *structs.PreparedQuerySpecificRequest, // List returns all the prepared queries. func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.IndexedPreparedQueries) error { - if done, err := p.srv.forward("PreparedQuery.List", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.List", args, args, reply); done { return err } @@ -277,7 +277,7 @@ func (p *PreparedQuery) List(args *structs.DCSpecificRequest, reply *structs.Ind // will be executed here. func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExplainResponse) error { - if done, err := p.srv.forward("PreparedQuery.Explain", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Explain", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "explain"}, time.Now()) @@ -324,7 +324,7 @@ func (p *PreparedQuery) Explain(args *structs.PreparedQueryExecuteRequest, // part of a DNS lookup, or when executing prepared queries from the HTTP API. func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.forward("PreparedQuery.Execute", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.Execute", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute"}, time.Now()) @@ -459,7 +459,7 @@ func (p *PreparedQuery) Execute(args *structs.PreparedQueryExecuteRequest, // We don't want things to fan out further than one level. func (p *PreparedQuery) ExecuteRemote(args *structs.PreparedQueryExecuteRemoteRequest, reply *structs.PreparedQueryExecuteResponse) error { - if done, err := p.srv.forward("PreparedQuery.ExecuteRemote", args, args, reply); done { + if done, err := p.srv.ForwardRPC("PreparedQuery.ExecuteRemote", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"prepared-query", "execute_remote"}, time.Now()) diff --git a/agent/consul/rpc.go b/agent/consul/rpc.go index 938f1ee21e..a73f613be8 100644 --- a/agent/consul/rpc.go +++ b/agent/consul/rpc.go @@ -499,9 +499,9 @@ func canRetry(args interface{}, err error) bool { return false } -// forward is used to forward to a remote DC or to forward to the local leader +// ForwardRPC is used to forward an RPC request to a remote DC or to the local leader // Returns a bool of if forwarding was performed, as well as any error -func (s *Server) forward(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { +func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args interface{}, reply interface{}) (bool, error) { var firstCheck time.Time // Handle DC forwarding diff --git a/agent/consul/server.go b/agent/consul/server.go index 0888fd537e..a08b9dc805 100644 --- a/agent/consul/server.go +++ b/agent/consul/server.go @@ -1502,11 +1502,6 @@ func (s *Server) DatacenterJoinAddresses(segment string) ([]string, error) { return joinAddrs, nil } -// ForwardRPC is basically just renaming forward, in a future commit I am going to actually do the rename. -func (s *Server) ForwardRPC(method string, info structs.RPCInfo, args, reply interface{}) (bool, error) { - return s.forward(method, info, args, reply) -} - // GetConfig will return the servers configuration - this is needed to satisfy // interfaces: AutoConfigDelegate func (s *Server) GetConfig() *Config { diff --git a/agent/consul/session_endpoint.go b/agent/consul/session_endpoint.go index 2dd0527c29..3ac8b41dc0 100644 --- a/agent/consul/session_endpoint.go +++ b/agent/consul/session_endpoint.go @@ -32,7 +32,7 @@ func fixupSessionSpecificRequest(args *structs.SessionSpecificRequest) { // Apply is used to apply a modifying request to the data store. This should // only be used for operations that modify the data func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { - if done, err := s.srv.forward("Session.Apply", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"session", "apply"}, time.Now()) @@ -162,7 +162,7 @@ func (s *Session) Apply(args *structs.SessionRequest, reply *string) error { // Get is used to retrieve a single session func (s *Session) Get(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.Get", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Get", args, args, reply); done { return err } @@ -203,7 +203,7 @@ func (s *Session) Get(args *structs.SessionSpecificRequest, // List is used to list all the active sessions func (s *Session) List(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.List", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.List", args, args, reply); done { return err } @@ -237,7 +237,7 @@ func (s *Session) List(args *structs.SessionSpecificRequest, // NodeSessions is used to get all the sessions for a particular node func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.NodeSessions", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.NodeSessions", args, args, reply); done { return err } @@ -271,7 +271,7 @@ func (s *Session) NodeSessions(args *structs.NodeSpecificRequest, // Renew is used to renew the TTL on a single session func (s *Session) Renew(args *structs.SessionSpecificRequest, reply *structs.IndexedSessions) error { - if done, err := s.srv.forward("Session.Renew", args, args, reply); done { + if done, err := s.srv.ForwardRPC("Session.Renew", args, args, reply); done { return err } diff --git a/agent/consul/txn_endpoint.go b/agent/consul/txn_endpoint.go index 83d46be896..9819d63704 100644 --- a/agent/consul/txn_endpoint.go +++ b/agent/consul/txn_endpoint.go @@ -108,7 +108,7 @@ func (t *Txn) preCheck(authorizer acl.Authorizer, ops structs.TxnOps) structs.Tx // Apply is used to apply multiple operations in a single, atomic transaction. func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error { - if done, err := t.srv.forward("Txn.Apply", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Apply", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "apply"}, time.Now()) @@ -151,7 +151,7 @@ func (t *Txn) Apply(args *structs.TxnRequest, reply *structs.TxnResponse) error // supports staleness, so this should be preferred if you're just performing // reads. func (t *Txn) Read(args *structs.TxnReadRequest, reply *structs.TxnReadResponse) error { - if done, err := t.srv.forward("Txn.Read", args, args, reply); done { + if done, err := t.srv.ForwardRPC("Txn.Read", args, args, reply); done { return err } defer metrics.MeasureSince([]string{"txn", "read"}, time.Now())