mirror of https://github.com/hashicorp/consul
state: remove unused Store method receiver
And use ReadTxn interface where appropriate.pull/8482/head
parent
190fcc14a3
commit
d677706625
|
@ -339,7 +339,7 @@ func (s *Store) CanBootstrapACLToken() (bool, uint64, error) {
|
||||||
// to update the name. Unlike the older functions to operate specifically on role or policy links
|
// to update the name. Unlike the older functions to operate specifically on role or policy links
|
||||||
// this function does not itself handle the case where the id cannot be found. Instead the
|
// this function does not itself handle the case where the id cannot be found. Instead the
|
||||||
// getName function should handle that and return an error if necessary
|
// getName function should handle that and return an error if necessary
|
||||||
func resolveACLLinks(tx *txn, links []pbacl.ACLLink, getName func(*txn, string) (string, error)) (int, error) {
|
func resolveACLLinks(tx ReadTxn, links []pbacl.ACLLink, getName func(ReadTxn, string) (string, error)) (int, error) {
|
||||||
var numValid int
|
var numValid int
|
||||||
for linkIndex, link := range links {
|
for linkIndex, link := range links {
|
||||||
if link.ID != "" {
|
if link.ID != "" {
|
||||||
|
@ -365,7 +365,7 @@ func resolveACLLinks(tx *txn, links []pbacl.ACLLink, getName func(*txn, string)
|
||||||
// associated with the ID of the link. Ideally this will be a no-op if the names are already correct
|
// associated with the ID of the link. Ideally this will be a no-op if the names are already correct
|
||||||
// however if a linked resource was renamed it might be stale. This function will treat the incoming
|
// however if a linked resource was renamed it might be stale. This function will treat the incoming
|
||||||
// links with copy-on-write semantics and its output will indicate whether any modifications were made.
|
// links with copy-on-write semantics and its output will indicate whether any modifications were made.
|
||||||
func fixupACLLinks(tx *txn, original []pbacl.ACLLink, getName func(*txn, string) (string, error)) ([]pbacl.ACLLink, bool, error) {
|
func fixupACLLinks(tx ReadTxn, original []pbacl.ACLLink, getName func(ReadTxn, string) (string, error)) ([]pbacl.ACLLink, bool, error) {
|
||||||
owned := false
|
owned := false
|
||||||
links := original
|
links := original
|
||||||
|
|
||||||
|
@ -579,7 +579,7 @@ func resolveRolePolicyLinks(tx *txn, role *structs.ACLRole, allowMissing bool) e
|
||||||
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
|
// stale when a linked policy was deleted or renamed. This will correct them and generate a newly allocated
|
||||||
// role only when fixes are needed. If the policy links are still accurate then we just return the original
|
// role only when fixes are needed. If the policy links are still accurate then we just return the original
|
||||||
// role.
|
// role.
|
||||||
func fixupRolePolicyLinks(tx *txn, original *structs.ACLRole) (*structs.ACLRole, error) {
|
func fixupRolePolicyLinks(tx ReadTxn, original *structs.ACLRole) (*structs.ACLRole, error) {
|
||||||
owned := false
|
owned := false
|
||||||
role := original
|
role := original
|
||||||
|
|
||||||
|
@ -1201,9 +1201,9 @@ func (s *Store) ACLPolicyBatchGet(ws memdb.WatchSet, ids []string) (uint64, stru
|
||||||
return idx, policies, nil
|
return idx, policies, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type aclPolicyGetFn func(*txn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
|
type aclPolicyGetFn func(ReadTxn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
|
||||||
|
|
||||||
func getPolicyWithTxn(tx *txn, ws memdb.WatchSet, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLPolicy, error) {
|
func getPolicyWithTxn(tx ReadTxn, ws memdb.WatchSet, value string, fn aclPolicyGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLPolicy, error) {
|
||||||
watchCh, policy, err := fn(tx, value, entMeta)
|
watchCh, policy, err := fn(tx, value, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed acl policy lookup: %v", err)
|
return nil, fmt.Errorf("failed acl policy lookup: %v", err)
|
||||||
|
@ -1391,7 +1391,7 @@ func aclRoleSetTxn(tx *txn, idx uint64, role *structs.ACLRole, allowMissing bool
|
||||||
return aclRoleInsert(tx, role)
|
return aclRoleInsert(tx, role)
|
||||||
}
|
}
|
||||||
|
|
||||||
type aclRoleGetFn func(*txn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
|
type aclRoleGetFn func(ReadTxn, string, *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error)
|
||||||
|
|
||||||
func (s *Store) ACLRoleGetByID(ws memdb.WatchSet, id string, entMeta *structs.EnterpriseMeta) (uint64, *structs.ACLRole, error) {
|
func (s *Store) ACLRoleGetByID(ws memdb.WatchSet, id string, entMeta *structs.EnterpriseMeta) (uint64, *structs.ACLRole, error) {
|
||||||
return s.aclRoleGet(ws, id, aclRoleGetByID, entMeta)
|
return s.aclRoleGet(ws, id, aclRoleGetByID, entMeta)
|
||||||
|
@ -1422,7 +1422,7 @@ func (s *Store) ACLRoleBatchGet(ws memdb.WatchSet, ids []string) (uint64, struct
|
||||||
return idx, roles, nil
|
return idx, roles, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getRoleWithTxn(tx *txn, ws memdb.WatchSet, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLRole, error) {
|
func getRoleWithTxn(tx ReadTxn, ws memdb.WatchSet, value string, fn aclRoleGetFn, entMeta *structs.EnterpriseMeta) (*structs.ACLRole, error) {
|
||||||
watchCh, rawRole, err := fn(tx, value, entMeta)
|
watchCh, rawRole, err := fn(tx, value, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed acl role lookup: %v", err)
|
return nil, fmt.Errorf("failed acl role lookup: %v", err)
|
||||||
|
|
|
@ -218,15 +218,15 @@ func aclPolicyInsert(tx *txn, policy *structs.ACLPolicy) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclPolicyGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func aclPolicyGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("acl-policies", "id", id)
|
return tx.FirstWatch("acl-policies", "id", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclPolicyGetByName(tx *txn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func aclPolicyGetByName(tx ReadTxn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("acl-policies", "name", name)
|
return tx.FirstWatch("acl-policies", "name", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclPolicyList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func aclPolicyList(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("acl-policies", "id")
|
return tx.Get("acl-policies", "id")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -343,15 +343,15 @@ func aclRoleInsert(tx *txn, role *structs.ACLRole) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclRoleGetByID(tx *txn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func aclRoleGetByID(tx ReadTxn, id string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("acl-roles", "id", id)
|
return tx.FirstWatch("acl-roles", "id", id)
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclRoleGetByName(tx *txn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func aclRoleGetByName(tx ReadTxn, name string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("acl-roles", "name", name)
|
return tx.FirstWatch("acl-roles", "name", name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func aclRoleList(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func aclRoleList(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("acl-roles", "id")
|
return tx.Get("acl-roles", "id")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -4105,7 +4105,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
_, err := resolveACLLinks(tx, links, func(*txn, string) (string, error) {
|
_, err := resolveACLLinks(tx, links, func(ReadTxn, string) (string, error) {
|
||||||
err := fmt.Errorf("Should not be attempting to resolve an empty id")
|
err := fmt.Errorf("Should not be attempting to resolve an empty id")
|
||||||
require.Fail(t, err.Error())
|
require.Fail(t, err.Error())
|
||||||
return "", err
|
return "", err
|
||||||
|
@ -4131,7 +4131,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
numValid, err := resolveACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
|
numValid, err := resolveACLLinks(tx, links, func(_ ReadTxn, linkID string) (string, error) {
|
||||||
switch linkID {
|
switch linkID {
|
||||||
case "e81887b4-836b-4053-a1fa-7e8305902be9":
|
case "e81887b4-836b-4053-a1fa-7e8305902be9":
|
||||||
return "foo", nil
|
return "foo", nil
|
||||||
|
@ -4161,7 +4161,7 @@ func TestStateStore_resolveACLLinks(t *testing.T) {
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
numValid, err := resolveACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
|
numValid, err := resolveACLLinks(tx, links, func(_ ReadTxn, linkID string) (string, error) {
|
||||||
require.Equal(t, "b985e082-25d3-45a9-9dd8-fd1a41b83b0d", linkID)
|
require.Equal(t, "b985e082-25d3-45a9-9dd8-fd1a41b83b0d", linkID)
|
||||||
return "", nil
|
return "", nil
|
||||||
})
|
})
|
||||||
|
@ -4201,7 +4201,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
|
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ ReadTxn, linkID string) (string, error) {
|
||||||
switch linkID {
|
switch linkID {
|
||||||
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
||||||
return "foo", nil
|
return "foo", nil
|
||||||
|
@ -4228,7 +4228,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
|
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ ReadTxn, linkID string) (string, error) {
|
||||||
switch linkID {
|
switch linkID {
|
||||||
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
||||||
return "foo", nil
|
return "foo", nil
|
||||||
|
@ -4260,7 +4260,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ *txn, linkID string) (string, error) {
|
newLinks, cloned, err := fixupACLLinks(tx, links, func(_ ReadTxn, linkID string) (string, error) {
|
||||||
switch linkID {
|
switch linkID {
|
||||||
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
case "40b57f86-97ea-40e4-a99a-c399cc81f4dd":
|
||||||
return "foo", nil
|
return "foo", nil
|
||||||
|
@ -4287,7 +4287,7 @@ func TestStateStore_fixupACLLinks(t *testing.T) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
_, _, err := fixupACLLinks(tx, links, func(*txn, string) (string, error) {
|
_, _, err := fixupACLLinks(tx, links, func(ReadTxn, string) (string, error) {
|
||||||
return "", fmt.Errorf("Resolver Error")
|
return "", fmt.Errorf("Resolver Error")
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
|
@ -77,7 +77,7 @@ func (s *Store) AutopilotSetConfig(idx uint64, config *autopilot.Config) error {
|
||||||
tx := s.db.WriteTxn(idx)
|
tx := s.db.WriteTxn(idx)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
if err := s.autopilotSetConfigTxn(idx, tx, config); err != nil {
|
if err := autopilotSetConfigTxn(tx, idx, config); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.autopilotSetConfigTxn(idx, tx, config); err != nil {
|
if err := autopilotSetConfigTxn(tx, idx, config); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -113,7 +113,7 @@ func (s *Store) AutopilotCASConfig(idx, cidx uint64, config *autopilot.Config) (
|
||||||
return err == nil, err
|
return err == nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) autopilotSetConfigTxn(idx uint64, tx *txn, config *autopilot.Config) error {
|
func autopilotSetConfigTxn(tx *txn, idx uint64, config *autopilot.Config) error {
|
||||||
// Check for an existing config
|
// Check for an existing config
|
||||||
existing, err := tx.First("autopilot-config", "id")
|
existing, err := tx.First("autopilot-config", "id")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -2194,20 +2194,19 @@ func (s *Store) CheckServiceTagNodes(ws memdb.WatchSet, serviceName string, tags
|
||||||
func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *structs.EnterpriseMeta) (uint64, structs.GatewayServices, error) {
|
func (s *Store) GatewayServices(ws memdb.WatchSet, gateway string, entMeta *structs.EnterpriseMeta) (uint64, structs.GatewayServices, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
var maxIdx uint64
|
|
||||||
|
|
||||||
iter, err := gatewayServices(tx, gateway, entMeta)
|
iter, err := gatewayServices(tx, gateway, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err)
|
return 0, nil, fmt.Errorf("failed gateway services lookup: %s", err)
|
||||||
}
|
}
|
||||||
ws.Add(iter.WatchCh())
|
ws.Add(iter.WatchCh())
|
||||||
|
|
||||||
|
var maxIdx uint64
|
||||||
var results structs.GatewayServices
|
var results structs.GatewayServices
|
||||||
for service := iter.Next(); service != nil; service = iter.Next() {
|
for service := iter.Next(); service != nil; service = iter.Next() {
|
||||||
svc := service.(*structs.GatewayService)
|
svc := service.(*structs.GatewayService)
|
||||||
|
|
||||||
if svc.Service.Name != structs.WildcardSpecifier {
|
if svc.Service.Name != structs.WildcardSpecifier {
|
||||||
idx, matches, err := s.checkProtocolMatch(tx, ws, svc)
|
idx, matches, err := checkProtocolMatch(tx, ws, svc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, fmt.Errorf("failed checking protocol: %s", err)
|
return 0, nil, fmt.Errorf("failed checking protocol: %s", err)
|
||||||
}
|
}
|
||||||
|
@ -2778,11 +2777,7 @@ func serviceGatewayNodes(tx *txn, ws memdb.WatchSet, service string, kind struct
|
||||||
|
|
||||||
// checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol
|
// checkProtocolMatch filters out any GatewayService entries added from a wildcard with a protocol
|
||||||
// that doesn't match the one configured in their discovery chain.
|
// that doesn't match the one configured in their discovery chain.
|
||||||
func (s *Store) checkProtocolMatch(
|
func checkProtocolMatch(tx ReadTxn, ws memdb.WatchSet, svc *structs.GatewayService) (uint64, bool, error) {
|
||||||
tx *txn,
|
|
||||||
ws memdb.WatchSet,
|
|
||||||
svc *structs.GatewayService,
|
|
||||||
) (uint64, bool, error) {
|
|
||||||
if svc.GatewayKind != structs.ServiceKindIngressGateway || !svc.FromWildcard {
|
if svc.GatewayKind != structs.ServiceKindIngressGateway || !svc.FromWildcard {
|
||||||
return 0, true, nil
|
return 0, true, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -223,46 +223,46 @@ func catalogInsertService(tx *txn, svc *structs.ServiceNode) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServicesMaxIndex(tx *txn, _ *structs.EnterpriseMeta) uint64 {
|
func catalogServicesMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
|
||||||
return maxIndexTxn(tx, "services")
|
return maxIndexTxn(tx, "services")
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceMaxIndex(tx *txn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
func catalogServiceMaxIndex(tx ReadTxn, serviceName string, _ *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil))
|
return tx.FirstWatch("index", "id", serviceIndexName(serviceName, nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceKindMaxIndex(tx *txn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
|
func catalogServiceKindMaxIndex(tx ReadTxn, ws memdb.WatchSet, kind structs.ServiceKind, entMeta *structs.EnterpriseMeta) uint64 {
|
||||||
return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil))
|
return maxIndexWatchTxn(tx, ws, serviceKindIndexName(kind, nil))
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceList(tx *txn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
func catalogServiceList(tx ReadTxn, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("services", "id")
|
return tx.Get("services", "id")
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceListByKind(tx *txn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogServiceListByKind(tx ReadTxn, kind structs.ServiceKind, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("services", "kind", string(kind))
|
return tx.Get("services", "kind", string(kind))
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceListByNode(tx *txn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
func catalogServiceListByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta, _ bool) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("services", "node", node)
|
return tx.Get("services", "node", node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceNodeList(tx *txn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogServiceNodeList(tx ReadTxn, name string, index string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("services", index, name)
|
return tx.Get("services", index, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogServiceLastExtinctionIndex(tx *txn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
func catalogServiceLastExtinctionIndex(tx ReadTxn, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||||
return tx.First("index", "id", serviceLastExtinctionIndexName)
|
return tx.First("index", "id", serviceLastExtinctionIndexName)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogMaxIndex(tx *txn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
func catalogMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||||
if checks {
|
if checks {
|
||||||
return maxIndexTxn(tx, "nodes", "services", "checks")
|
return maxIndexTxn(tx, "nodes", "services", "checks")
|
||||||
}
|
}
|
||||||
return maxIndexTxn(tx, "nodes", "services")
|
return maxIndexTxn(tx, "nodes", "services")
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogMaxIndexWatch(tx *txn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
func catalogMaxIndexWatch(tx ReadTxn, ws memdb.WatchSet, _ *structs.EnterpriseMeta, checks bool) uint64 {
|
||||||
if checks {
|
if checks {
|
||||||
return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks")
|
return maxIndexWatchTxn(tx, ws, "nodes", "services", "checks")
|
||||||
}
|
}
|
||||||
|
@ -277,32 +277,32 @@ func catalogUpdateCheckIndexes(tx *txn, idx uint64, _ *structs.EnterpriseMeta) e
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogChecksMaxIndex(tx *txn, _ *structs.EnterpriseMeta) uint64 {
|
func catalogChecksMaxIndex(tx ReadTxn, _ *structs.EnterpriseMeta) uint64 {
|
||||||
return maxIndexTxn(tx, "checks")
|
return maxIndexTxn(tx, "checks")
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListChecksByNode(tx *txn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogListChecksByNode(tx ReadTxn, node string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "node", node)
|
return tx.Get("checks", "node", node)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListChecksByService(tx *txn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogListChecksByService(tx ReadTxn, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "service", service)
|
return tx.Get("checks", "service", service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListChecksInState(tx *txn, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogListChecksInState(tx ReadTxn, state string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
// simpler than normal due to the use of the CompoundMultiIndex
|
// simpler than normal due to the use of the CompoundMultiIndex
|
||||||
return tx.Get("checks", "status", state)
|
return tx.Get("checks", "status", state)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListChecks(tx *txn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "id")
|
return tx.Get("checks", "id")
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListNodeChecks(tx *txn, node string) (memdb.ResultIterator, error) {
|
func catalogListNodeChecks(tx ReadTxn, node string) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "node_service_check", node, false)
|
return tx.Get("checks", "node_service_check", node, false)
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogListServiceChecks(tx *txn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "node_service", node, service)
|
return tx.Get("checks", "node_service", node, service)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -319,14 +319,14 @@ func catalogInsertCheck(tx *txn, chk *structs.HealthCheck, idx uint64) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func catalogChecksForNodeService(tx *txn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get("checks", "node_service", node, service)
|
return tx.Get("checks", "node_service", node, service)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateRegisterRequestTxn(tx *txn, args *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
|
func validateRegisterRequestTxn(_ *txn, _ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) ValidateRegisterRequest(args *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
|
func (s *Store) ValidateRegisterRequest(_ *structs.RegisterRequest) (*structs.EnterpriseMeta, error) {
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,7 @@ func (s *Store) ConfigEntry(ws memdb.WatchSet, kind, name string, entMeta *struc
|
||||||
return configEntryTxn(tx, ws, kind, name, entMeta)
|
return configEntryTxn(tx, ws, kind, name, entMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func configEntryTxn(tx *txn, ws memdb.WatchSet, kind, name string, entMeta *structs.EnterpriseMeta) (uint64, structs.ConfigEntry, error) {
|
func configEntryTxn(tx ReadTxn, ws memdb.WatchSet, kind, name string, entMeta *structs.EnterpriseMeta) (uint64, structs.ConfigEntry, error) {
|
||||||
// Get the index
|
// Get the index
|
||||||
idx := maxIndexTxn(tx, configTableName)
|
idx := maxIndexTxn(tx, configTableName)
|
||||||
|
|
||||||
|
@ -141,7 +141,7 @@ func (s *Store) ConfigEntriesByKind(ws memdb.WatchSet, kind string, entMeta *str
|
||||||
return configEntriesByKindTxn(tx, ws, kind, entMeta)
|
return configEntriesByKindTxn(tx, ws, kind, entMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func configEntriesByKindTxn(tx *txn, ws memdb.WatchSet, kind string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
|
func configEntriesByKindTxn(tx ReadTxn, ws memdb.WatchSet, kind string, entMeta *structs.EnterpriseMeta) (uint64, []structs.ConfigEntry, error) {
|
||||||
// Get the index
|
// Get the index
|
||||||
idx := maxIndexTxn(tx, configTableName)
|
idx := maxIndexTxn(tx, configTableName)
|
||||||
|
|
||||||
|
@ -170,7 +170,7 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry, entMeta
|
||||||
tx := s.db.WriteTxn(idx)
|
tx := s.db.WriteTxn(idx)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
|
|
||||||
if err := s.ensureConfigEntryTxn(tx, idx, conf, entMeta); err != nil {
|
if err := ensureConfigEntryTxn(tx, idx, conf, entMeta); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -178,7 +178,7 @@ func (s *Store) EnsureConfigEntry(idx uint64, conf structs.ConfigEntry, entMeta
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensureConfigEntryTxn upserts a config entry inside of a transaction.
|
// ensureConfigEntryTxn upserts a config entry inside of a transaction.
|
||||||
func (s *Store) ensureConfigEntryTxn(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
|
func ensureConfigEntryTxn(tx *txn, idx uint64, conf structs.ConfigEntry, entMeta *structs.EnterpriseMeta) error {
|
||||||
// Check for existing configuration.
|
// Check for existing configuration.
|
||||||
existing, err := firstConfigEntryWithTxn(tx, conf.GetKind(), conf.GetName(), entMeta)
|
existing, err := firstConfigEntryWithTxn(tx, conf.GetKind(), conf.GetName(), entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -195,7 +195,7 @@ func (s *Store) ensureConfigEntryTxn(tx *txn, idx uint64, conf structs.ConfigEnt
|
||||||
}
|
}
|
||||||
raftIndex.ModifyIndex = idx
|
raftIndex.ModifyIndex = idx
|
||||||
|
|
||||||
err = s.validateProposedConfigEntryInGraph(tx, conf.GetKind(), conf.GetName(), conf, entMeta)
|
err = validateProposedConfigEntryInGraph(tx, conf.GetKind(), conf.GetName(), conf, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // Err is already sufficiently decorated.
|
return err // Err is already sufficiently decorated.
|
||||||
}
|
}
|
||||||
|
@ -234,7 +234,7 @@ func (s *Store) EnsureConfigEntryCAS(idx, cidx uint64, conf structs.ConfigEntry,
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := s.ensureConfigEntryTxn(tx, idx, conf, entMeta); err != nil {
|
if err := ensureConfigEntryTxn(tx, idx, conf, entMeta); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,7 +266,7 @@ func (s *Store) DeleteConfigEntry(idx uint64, kind, name string, entMeta *struct
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err = s.validateProposedConfigEntryInGraph(tx, kind, name, nil, entMeta)
|
err = validateProposedConfigEntryInGraph(tx, kind, name, nil, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err // Err is already sufficiently decorated.
|
return err // Err is already sufficiently decorated.
|
||||||
}
|
}
|
||||||
|
@ -313,8 +313,8 @@ func insertConfigEntryWithTxn(tx *txn, idx uint64, conf structs.ConfigEntry) err
|
||||||
//
|
//
|
||||||
// May return *ConfigEntryGraphValidationError if there is a concern to surface
|
// May return *ConfigEntryGraphValidationError if there is a concern to surface
|
||||||
// to the caller that they can correct.
|
// to the caller that they can correct.
|
||||||
func (s *Store) validateProposedConfigEntryInGraph(
|
func validateProposedConfigEntryInGraph(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
kind, name string,
|
kind, name string,
|
||||||
next structs.ConfigEntry,
|
next structs.ConfigEntry,
|
||||||
entMeta *structs.EnterpriseMeta,
|
entMeta *structs.EnterpriseMeta,
|
||||||
|
@ -346,11 +346,11 @@ func (s *Store) validateProposedConfigEntryInGraph(
|
||||||
return fmt.Errorf("unhandled kind %q during validation of %q", kind, name)
|
return fmt.Errorf("unhandled kind %q during validation of %q", kind, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
return s.validateProposedConfigEntryInServiceGraph(tx, kind, name, next, validateAllChains, entMeta)
|
return validateProposedConfigEntryInServiceGraph(tx, kind, name, next, validateAllChains, entMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkGatewayClash(
|
func checkGatewayClash(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
name, selfKind, otherKind string,
|
name, selfKind, otherKind string,
|
||||||
entMeta *structs.EnterpriseMeta,
|
entMeta *structs.EnterpriseMeta,
|
||||||
) error {
|
) error {
|
||||||
|
@ -371,8 +371,8 @@ var serviceGraphKinds = []string{
|
||||||
structs.ServiceResolver,
|
structs.ServiceResolver,
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) validateProposedConfigEntryInServiceGraph(
|
func validateProposedConfigEntryInServiceGraph(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
kind, name string,
|
kind, name string,
|
||||||
next structs.ConfigEntry,
|
next structs.ConfigEntry,
|
||||||
validateAllChains bool,
|
validateAllChains bool,
|
||||||
|
@ -475,7 +475,7 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
|
||||||
svcTopNodeType = make(map[structs.ServiceID]string)
|
svcTopNodeType = make(map[structs.ServiceID]string)
|
||||||
)
|
)
|
||||||
for chain := range checkChains {
|
for chain := range checkChains {
|
||||||
protocol, topNode, err := s.testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta)
|
protocol, topNode, err := testCompileDiscoveryChain(tx, chain.ID, overrides, &chain.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -524,13 +524,13 @@ func (s *Store) validateProposedConfigEntryInServiceGraph(
|
||||||
// testCompileDiscoveryChain speculatively compiles a discovery chain with
|
// testCompileDiscoveryChain speculatively compiles a discovery chain with
|
||||||
// pending modifications to see if it would be valid. Also returns the computed
|
// pending modifications to see if it would be valid. Also returns the computed
|
||||||
// protocol and topmost discovery chain node.
|
// protocol and topmost discovery chain node.
|
||||||
func (s *Store) testCompileDiscoveryChain(
|
func testCompileDiscoveryChain(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
chainName string,
|
chainName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
entMeta *structs.EnterpriseMeta,
|
entMeta *structs.EnterpriseMeta,
|
||||||
) (string, *structs.DiscoveryGraphNode, error) {
|
) (string, *structs.DiscoveryGraphNode, error) {
|
||||||
_, speculativeEntries, err := s.readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta)
|
_, speculativeEntries, err := readDiscoveryChainConfigEntriesTxn(tx, nil, chainName, overrides, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return "", nil, err
|
return "", nil, err
|
||||||
}
|
}
|
||||||
|
@ -587,11 +587,11 @@ func (s *Store) readDiscoveryChainConfigEntries(
|
||||||
) (uint64, *structs.DiscoveryChainConfigEntries, error) {
|
) (uint64, *structs.DiscoveryChainConfigEntries, error) {
|
||||||
tx := s.db.Txn(false)
|
tx := s.db.Txn(false)
|
||||||
defer tx.Abort()
|
defer tx.Abort()
|
||||||
return s.readDiscoveryChainConfigEntriesTxn(tx, ws, serviceName, overrides, entMeta)
|
return readDiscoveryChainConfigEntriesTxn(tx, ws, serviceName, overrides, entMeta)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
func readDiscoveryChainConfigEntriesTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
serviceName string,
|
serviceName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -619,7 +619,7 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
||||||
sid := structs.NewServiceID(serviceName, entMeta)
|
sid := structs.NewServiceID(serviceName, entMeta)
|
||||||
|
|
||||||
// Grab the proxy defaults if they exist.
|
// Grab the proxy defaults if they exist.
|
||||||
idx, proxy, err := s.getProxyConfigEntryTxn(tx, ws, structs.ProxyConfigGlobal, overrides, structs.DefaultEnterpriseMeta())
|
idx, proxy, err := getProxyConfigEntryTxn(tx, ws, structs.ProxyConfigGlobal, overrides, structs.DefaultEnterpriseMeta())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
} else if proxy != nil {
|
} else if proxy != nil {
|
||||||
|
@ -630,7 +630,7 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
||||||
todoDefaults[sid] = struct{}{}
|
todoDefaults[sid] = struct{}{}
|
||||||
|
|
||||||
// first fetch the router, of which we only collect 1 per chain eval
|
// first fetch the router, of which we only collect 1 per chain eval
|
||||||
_, router, err := s.getRouterConfigEntryTxn(tx, ws, serviceName, overrides, entMeta)
|
_, router, err := getRouterConfigEntryTxn(tx, ws, serviceName, overrides, entMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
} else if router != nil {
|
} else if router != nil {
|
||||||
|
@ -660,7 +660,7 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
||||||
// Yes, even for splitters.
|
// Yes, even for splitters.
|
||||||
todoDefaults[splitID] = struct{}{}
|
todoDefaults[splitID] = struct{}{}
|
||||||
|
|
||||||
_, splitter, err := s.getSplitterConfigEntryTxn(tx, ws, splitID.ID, overrides, &splitID.EnterpriseMeta)
|
_, splitter, err := getSplitterConfigEntryTxn(tx, ws, splitID.ID, overrides, &splitID.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
@ -697,7 +697,7 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
||||||
// And resolvers, too.
|
// And resolvers, too.
|
||||||
todoDefaults[resolverID] = struct{}{}
|
todoDefaults[resolverID] = struct{}{}
|
||||||
|
|
||||||
_, resolver, err := s.getResolverConfigEntryTxn(tx, ws, resolverID.ID, overrides, &resolverID.EnterpriseMeta)
|
_, resolver, err := getResolverConfigEntryTxn(tx, ws, resolverID.ID, overrides, &resolverID.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
@ -725,7 +725,7 @@ func (s *Store) readDiscoveryChainConfigEntriesTxn(
|
||||||
continue // already fetched
|
continue // already fetched
|
||||||
}
|
}
|
||||||
|
|
||||||
_, entry, err := s.getServiceConfigEntryTxn(tx, ws, svcID.ID, overrides, &svcID.EnterpriseMeta)
|
_, entry, err := getServiceConfigEntryTxn(tx, ws, svcID.ID, overrides, &svcID.EnterpriseMeta)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return 0, nil, err
|
return 0, nil, err
|
||||||
}
|
}
|
||||||
|
@ -779,8 +779,8 @@ func anyKey(m map[structs.ServiceID]struct{}) (structs.ServiceID, bool) {
|
||||||
// proxy-defaults kind of config entry.
|
// proxy-defaults kind of config entry.
|
||||||
//
|
//
|
||||||
// If an override is returned the index returned will be 0.
|
// If an override is returned the index returned will be 0.
|
||||||
func (s *Store) getProxyConfigEntryTxn(
|
func getProxyConfigEntryTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
name string,
|
name string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -804,8 +804,8 @@ func (s *Store) getProxyConfigEntryTxn(
|
||||||
// service-defaults kind of config entry.
|
// service-defaults kind of config entry.
|
||||||
//
|
//
|
||||||
// If an override is returned the index returned will be 0.
|
// If an override is returned the index returned will be 0.
|
||||||
func (s *Store) getServiceConfigEntryTxn(
|
func getServiceConfigEntryTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
serviceName string,
|
serviceName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -829,8 +829,8 @@ func (s *Store) getServiceConfigEntryTxn(
|
||||||
// service-router kind of config entry.
|
// service-router kind of config entry.
|
||||||
//
|
//
|
||||||
// If an override is returned the index returned will be 0.
|
// If an override is returned the index returned will be 0.
|
||||||
func (s *Store) getRouterConfigEntryTxn(
|
func getRouterConfigEntryTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
serviceName string,
|
serviceName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -854,8 +854,8 @@ func (s *Store) getRouterConfigEntryTxn(
|
||||||
// service-splitter kind of config entry.
|
// service-splitter kind of config entry.
|
||||||
//
|
//
|
||||||
// If an override is returned the index returned will be 0.
|
// If an override is returned the index returned will be 0.
|
||||||
func (s *Store) getSplitterConfigEntryTxn(
|
func getSplitterConfigEntryTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
serviceName string,
|
serviceName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -879,8 +879,8 @@ func (s *Store) getSplitterConfigEntryTxn(
|
||||||
// service-resolver kind of config entry.
|
// service-resolver kind of config entry.
|
||||||
//
|
//
|
||||||
// If an override is returned the index returned will be 0.
|
// If an override is returned the index returned will be 0.
|
||||||
func (s *Store) getResolverConfigEntryTxn(
|
func getResolverConfigEntryTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
serviceName string,
|
serviceName string,
|
||||||
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
overrides map[structs.ConfigEntryKindName]structs.ConfigEntry,
|
||||||
|
@ -901,7 +901,7 @@ func (s *Store) getResolverConfigEntryTxn(
|
||||||
}
|
}
|
||||||
|
|
||||||
func configEntryWithOverridesTxn(
|
func configEntryWithOverridesTxn(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
kind string,
|
kind string,
|
||||||
name string,
|
name string,
|
||||||
|
@ -923,7 +923,7 @@ func configEntryWithOverridesTxn(
|
||||||
// protocolForService returns the service graph protocol associated to the
|
// protocolForService returns the service graph protocol associated to the
|
||||||
// provided service, checking all relevant config entries.
|
// provided service, checking all relevant config entries.
|
||||||
func protocolForService(
|
func protocolForService(
|
||||||
tx *txn,
|
tx ReadTxn,
|
||||||
ws memdb.WatchSet,
|
ws memdb.WatchSet,
|
||||||
svc structs.ServiceName,
|
svc structs.ServiceName,
|
||||||
) (uint64, string, error) {
|
) (uint64, string, error) {
|
||||||
|
|
|
@ -49,25 +49,27 @@ func configTableSchema() *memdb.TableSchema {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstConfigEntryWithTxn(tx *txn,
|
func firstConfigEntryWithTxn(tx ReadTxn, kind, name string, _ *structs.EnterpriseMeta) (interface{}, error) {
|
||||||
kind, name string, entMeta *structs.EnterpriseMeta) (interface{}, error) {
|
|
||||||
return tx.First(configTableName, "id", kind, name)
|
return tx.First(configTableName, "id", kind, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func firstWatchConfigEntryWithTxn(tx *txn,
|
func firstWatchConfigEntryWithTxn(
|
||||||
kind, name string, entMeta *structs.EnterpriseMeta) (<-chan struct{}, interface{}, error) {
|
tx ReadTxn,
|
||||||
|
kind string,
|
||||||
|
name string,
|
||||||
|
_ *structs.EnterpriseMeta,
|
||||||
|
) (<-chan struct{}, interface{}, error) {
|
||||||
return tx.FirstWatch(configTableName, "id", kind, name)
|
return tx.FirstWatch(configTableName, "id", kind, name)
|
||||||
}
|
}
|
||||||
|
|
||||||
func validateConfigEntryEnterprise(tx *txn, conf structs.ConfigEntry) error {
|
func validateConfigEntryEnterprise(_ ReadTxn, _ structs.ConfigEntry) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func getAllConfigEntriesWithTxn(tx *txn, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
func getAllConfigEntriesWithTxn(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
return tx.Get(configTableName, "id")
|
return tx.Get(configTableName, "id")
|
||||||
}
|
}
|
||||||
|
|
||||||
func getConfigEntryKindsWithTxn(tx *txn,
|
func getConfigEntryKindsWithTxn(tx ReadTxn, kind string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||||
kind string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
|
||||||
return tx.Get(configTableName, "kind", kind)
|
return tx.Get(configTableName, "kind", kind)
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,6 +10,8 @@ import (
|
||||||
// ReadTxn is implemented by memdb.Txn to perform read operations.
|
// ReadTxn is implemented by memdb.Txn to perform read operations.
|
||||||
type ReadTxn interface {
|
type ReadTxn interface {
|
||||||
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
|
Get(table, index string, args ...interface{}) (memdb.ResultIterator, error)
|
||||||
|
First(table, index string, args ...interface{}) (interface{}, error)
|
||||||
|
FirstWatch(table, index string, args ...interface{}) (<-chan struct{}, interface{}, error)
|
||||||
Abort()
|
Abort()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -258,11 +258,11 @@ func (s *Store) maxIndex(tables ...string) uint64 {
|
||||||
|
|
||||||
// maxIndexTxn is a helper used to retrieve the highest known index
|
// maxIndexTxn is a helper used to retrieve the highest known index
|
||||||
// amongst a set of tables in the db.
|
// amongst a set of tables in the db.
|
||||||
func maxIndexTxn(tx *txn, tables ...string) uint64 {
|
func maxIndexTxn(tx ReadTxn, tables ...string) uint64 {
|
||||||
return maxIndexWatchTxn(tx, nil, tables...)
|
return maxIndexWatchTxn(tx, nil, tables...)
|
||||||
}
|
}
|
||||||
|
|
||||||
func maxIndexWatchTxn(tx *txn, ws memdb.WatchSet, tables ...string) uint64 {
|
func maxIndexWatchTxn(tx ReadTxn, ws memdb.WatchSet, tables ...string) uint64 {
|
||||||
var lindex uint64
|
var lindex uint64
|
||||||
for _, table := range tables {
|
for _, table := range tables {
|
||||||
ch, ti, err := tx.FirstWatch("index", "id", table)
|
ch, ti, err := tx.FirstWatch("index", "id", table)
|
||||||
|
|
|
@ -110,7 +110,7 @@ func (s *Store) txnKVS(tx *txn, idx uint64, op *structs.TxnKVOp) (structs.TxnRes
|
||||||
}
|
}
|
||||||
|
|
||||||
// txnSession handles all Session-related operations.
|
// txnSession handles all Session-related operations.
|
||||||
func (s *Store) txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
|
func txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
switch op.Verb {
|
switch op.Verb {
|
||||||
|
@ -127,7 +127,7 @@ func (s *Store) txnSession(tx *txn, idx uint64, op *structs.TxnSessionOp) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// txnIntention handles all Intention-related operations.
|
// txnIntention handles all Intention-related operations.
|
||||||
func (s *Store) txnIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
|
func txnIntention(tx *txn, idx uint64, op *structs.TxnIntentionOp) error {
|
||||||
switch op.Op {
|
switch op.Op {
|
||||||
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
|
case structs.IntentionOpCreate, structs.IntentionOpUpdate:
|
||||||
return intentionSetTxn(tx, idx, op.Intention)
|
return intentionSetTxn(tx, idx, op.Intention)
|
||||||
|
@ -344,7 +344,7 @@ func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.Tx
|
||||||
case op.KV != nil:
|
case op.KV != nil:
|
||||||
ret, err = s.txnKVS(tx, idx, op.KV)
|
ret, err = s.txnKVS(tx, idx, op.KV)
|
||||||
case op.Intention != nil:
|
case op.Intention != nil:
|
||||||
err = s.txnIntention(tx, idx, op.Intention)
|
err = txnIntention(tx, idx, op.Intention)
|
||||||
case op.Node != nil:
|
case op.Node != nil:
|
||||||
ret, err = s.txnNode(tx, idx, op.Node)
|
ret, err = s.txnNode(tx, idx, op.Node)
|
||||||
case op.Service != nil:
|
case op.Service != nil:
|
||||||
|
@ -352,7 +352,7 @@ func (s *Store) txnDispatch(tx *txn, idx uint64, ops structs.TxnOps) (structs.Tx
|
||||||
case op.Check != nil:
|
case op.Check != nil:
|
||||||
ret, err = s.txnCheck(tx, idx, op.Check)
|
ret, err = s.txnCheck(tx, idx, op.Check)
|
||||||
case op.Session != nil:
|
case op.Session != nil:
|
||||||
err = s.txnSession(tx, idx, op.Session)
|
err = txnSession(tx, idx, op.Session)
|
||||||
default:
|
default:
|
||||||
err = fmt.Errorf("no operation specified")
|
err = fmt.Errorf("no operation specified")
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue