mirror of https://github.com/hashicorp/consul
Merge pull request #9796 from hashicorp/dnephin/state-cleanup-catalog-index-oss
state: remove duplicate tableCheck indexespull/9863/head
commit
948d1a317d
|
@ -1316,9 +1316,14 @@ func (s *Store) deleteServiceTxn(tx WriteTxn, idx uint64, nodeName, serviceID st
|
|||
return nil
|
||||
}
|
||||
|
||||
// TODO: accept a non-pointer value for EnterpriseMeta
|
||||
if entMeta == nil {
|
||||
entMeta = structs.DefaultEnterpriseMeta()
|
||||
}
|
||||
// Delete any checks associated with the service. This will invalidate
|
||||
// sessions as necessary.
|
||||
checks, err := catalogChecksForNodeService(tx, nodeName, serviceID, entMeta)
|
||||
q := NodeServiceQuery{Node: nodeName, Service: serviceID, EnterpriseMeta: *entMeta}
|
||||
checks, err := tx.Get(tableChecks, indexNodeService, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed service check lookup: %s", err)
|
||||
}
|
||||
|
@ -1767,6 +1772,13 @@ func (s *Store) deleteCheckCASTxn(tx WriteTxn, idx, cidx uint64, node string, ch
|
|||
return true, nil
|
||||
}
|
||||
|
||||
// NodeServiceQuery is a type used to query the checks table.
|
||||
type NodeServiceQuery struct {
|
||||
Node string
|
||||
Service string
|
||||
structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
// deleteCheckTxn is the inner method used to call a health
|
||||
// check deletion within an existing transaction.
|
||||
func (s *Store) deleteCheckTxn(tx WriteTxn, idx uint64, node string, checkID types.CheckID, entMeta *structs.EnterpriseMeta) error {
|
||||
|
@ -2142,7 +2154,8 @@ func parseCheckServiceNodes(
|
|||
// First add the node-level checks. These always apply to any
|
||||
// service on the node.
|
||||
var checks structs.HealthChecks
|
||||
iter, err := catalogListNodeChecks(tx, sn.Node)
|
||||
q := NodeServiceQuery{Node: sn.Node, EnterpriseMeta: *structs.DefaultEnterpriseMeta()}
|
||||
iter, err := tx.Get(tableChecks, indexNodeService, q)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
@ -2152,7 +2165,8 @@ func parseCheckServiceNodes(
|
|||
}
|
||||
|
||||
// Now add the service-specific checks.
|
||||
iter, err = catalogListServiceChecks(tx, sn.Node, sn.ServiceID, &sn.EnterpriseMeta)
|
||||
q = NodeServiceQuery{Node: sn.Node, Service: sn.ServiceID, EnterpriseMeta: sn.EnterpriseMeta}
|
||||
iter, err = tx.Get(tableChecks, indexNodeService, q)
|
||||
if err != nil {
|
||||
return 0, nil, err
|
||||
}
|
||||
|
|
|
@ -4,6 +4,7 @@ package state
|
|||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
memdb "github.com/hashicorp/go-memdb"
|
||||
|
||||
|
@ -12,6 +13,34 @@ import (
|
|||
|
||||
func withEnterpriseSchema(_ *memdb.DBSchema) {}
|
||||
|
||||
func indexNodeServiceFromHealthCheck(raw interface{}) ([]byte, error) {
|
||||
hc, ok := raw.(*structs.HealthCheck)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for structs.HealthCheck index", raw)
|
||||
}
|
||||
|
||||
if hc.Node == "" {
|
||||
return nil, errMissingValueForIndex
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(hc.Node))
|
||||
b.String(strings.ToLower(hc.ServiceID))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func indexFromNodeServiceQuery(arg interface{}) ([]byte, error) {
|
||||
hc, ok := arg.(NodeServiceQuery)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("unexpected type %T for NodeServiceQuery index", arg)
|
||||
}
|
||||
|
||||
var b indexBuilder
|
||||
b.String(strings.ToLower(hc.Node))
|
||||
b.String(strings.ToLower(hc.Service))
|
||||
return b.Bytes(), nil
|
||||
}
|
||||
|
||||
func serviceIndexName(name string, _ *structs.EnterpriseMeta) string {
|
||||
return fmt.Sprintf("service.%s", name)
|
||||
}
|
||||
|
@ -156,14 +185,6 @@ func catalogListChecks(tx ReadTxn, _ *structs.EnterpriseMeta) (memdb.ResultItera
|
|||
return tx.Get("checks", "id")
|
||||
}
|
||||
|
||||
func catalogListNodeChecks(tx ReadTxn, node string) (memdb.ResultIterator, error) {
|
||||
return tx.Get("checks", "node_service_check", node, false)
|
||||
}
|
||||
|
||||
func catalogListServiceChecks(tx ReadTxn, node string, service string, _ *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get("checks", "node_service", node, service)
|
||||
}
|
||||
|
||||
func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error {
|
||||
// Insert the check
|
||||
if err := tx.Insert("checks", chk); err != nil {
|
||||
|
@ -177,10 +198,6 @@ func catalogInsertCheck(tx WriteTxn, chk *structs.HealthCheck, idx uint64) error
|
|||
return nil
|
||||
}
|
||||
|
||||
func catalogChecksForNodeService(tx ReadTxn, node string, service string, entMeta *structs.EnterpriseMeta) (memdb.ResultIterator, error) {
|
||||
return tx.Get("checks", "node_service", node, service)
|
||||
}
|
||||
|
||||
func validateRegisterRequestTxn(_ ReadTxn, _ *structs.RegisterRequest, _ bool) (*structs.EnterpriseMeta, error) {
|
||||
return nil, nil
|
||||
}
|
||||
|
|
|
@ -0,0 +1,26 @@
|
|||
// +build !consulent
|
||||
|
||||
package state
|
||||
|
||||
import "github.com/hashicorp/consul/agent/structs"
|
||||
|
||||
func testIndexerTableChecks() map[string]indexerTestCase {
|
||||
return map[string]indexerTestCase{
|
||||
indexNodeService: {
|
||||
read: indexValue{
|
||||
source: NodeServiceQuery{
|
||||
Node: "NoDe",
|
||||
Service: "SeRvIcE",
|
||||
},
|
||||
expected: []byte("node\x00service\x00"),
|
||||
},
|
||||
write: indexValue{
|
||||
source: &structs.HealthCheck{
|
||||
Node: "NoDe",
|
||||
ServiceID: "SeRvIcE",
|
||||
},
|
||||
expected: []byte("node\x00service\x00"),
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
|
@ -17,13 +17,12 @@ const (
|
|||
tableGatewayServices = "gateway-services"
|
||||
tableMeshTopology = "mesh-topology"
|
||||
|
||||
indexID = "id"
|
||||
indexServiceName = "service"
|
||||
indexConnect = "connect"
|
||||
indexKind = "kind"
|
||||
indexStatus = "status"
|
||||
indexNodeServiceCheck = "node_service_check"
|
||||
indexNodeService = "node_service"
|
||||
indexID = "id"
|
||||
indexServiceName = "service"
|
||||
indexConnect = "connect"
|
||||
indexKind = "kind"
|
||||
indexStatus = "status"
|
||||
indexNodeService = "node_service"
|
||||
)
|
||||
|
||||
// nodesTableSchema returns a new table schema used for storing node
|
||||
|
@ -170,37 +169,13 @@ func checksTableSchema() *memdb.TableSchema {
|
|||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
indexNodeServiceCheck: {
|
||||
Name: indexNodeServiceCheck,
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.FieldSetIndex{
|
||||
Field: "ServiceID",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
indexNodeService: {
|
||||
Name: indexNodeService,
|
||||
AllowMissing: true,
|
||||
Unique: false,
|
||||
Indexer: &memdb.CompoundIndex{
|
||||
Indexes: []memdb.Indexer{
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "Node",
|
||||
Lowercase: true,
|
||||
},
|
||||
&memdb.StringFieldIndex{
|
||||
Field: "ServiceID",
|
||||
Lowercase: true,
|
||||
},
|
||||
},
|
||||
Indexer: indexerSingle{
|
||||
readIndex: readIndex(indexFromNodeServiceQuery),
|
||||
writeIndex: writeIndex(indexNodeServiceFromHealthCheck),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
|
|
@ -0,0 +1,94 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"fmt"
|
||||
)
|
||||
|
||||
// indexerSingle implements both memdb.Indexer and memdb.SingleIndexer. It may
|
||||
// be used in a memdb.IndexSchema to specify functions that generate the index
|
||||
// value for memdb.Txn operations.
|
||||
type indexerSingle struct {
|
||||
// readIndex is used by memdb for Txn.Get, Txn.First, and other operations
|
||||
// that read data.
|
||||
readIndex
|
||||
// writeIndex is used by memdb for Txn.Insert, Txn.Delete, for operations
|
||||
// that write data to the index.
|
||||
writeIndex
|
||||
}
|
||||
|
||||
// indexerMulti implements both memdb.Indexer and memdb.MultiIndexer. It may
|
||||
// be used in a memdb.IndexSchema to specify functions that generate the index
|
||||
// value for memdb.Txn operations.
|
||||
type indexerMulti struct {
|
||||
// readIndex is used by memdb for Txn.Get, Txn.First, and other operations
|
||||
// that read data.
|
||||
readIndex
|
||||
// writeIndexMulti is used by memdb for Txn.Insert, Txn.Delete, for operations
|
||||
// that write data to the index.
|
||||
writeIndexMulti
|
||||
}
|
||||
|
||||
// readIndex implements memdb.Indexer. It exists so that a function can be used
|
||||
// to provide the interface.
|
||||
//
|
||||
// Unlike memdb.Indexer, a readIndex function accepts only a single argument. To
|
||||
// generate an index from multiple values, use a struct type with multiple fields.
|
||||
type readIndex func(arg interface{}) ([]byte, error)
|
||||
|
||||
func (f readIndex) FromArgs(args ...interface{}) ([]byte, error) {
|
||||
if len(args) != 1 {
|
||||
return nil, fmt.Errorf("index supports only a single arg")
|
||||
}
|
||||
return f(args[0])
|
||||
}
|
||||
|
||||
var errMissingValueForIndex = fmt.Errorf("object is missing a value for this index")
|
||||
|
||||
// writeIndex implements memdb.SingleIndexer. It is used so that a function
|
||||
// can be used to provide this interface.
|
||||
//
|
||||
// Instead of a bool return value, writeIndex expects errMissingValueForIndex to
|
||||
// indicate that an index could not be build for the object. It will translate
|
||||
// this error into a false value to satisfy the memdb.SingleIndexer interface.
|
||||
type writeIndex func(raw interface{}) ([]byte, error)
|
||||
|
||||
func (f writeIndex) FromObject(raw interface{}) (bool, []byte, error) {
|
||||
v, err := f(raw)
|
||||
if errors.Is(err, errMissingValueForIndex) {
|
||||
return false, nil, nil
|
||||
}
|
||||
return err == nil, v, err
|
||||
}
|
||||
|
||||
// writeIndexMulti implements memdb.MultiIndexer. It is used so that a function
|
||||
// can be used to provide this interface.
|
||||
//
|
||||
// Instead of a bool return value, writeIndexMulti expects errMissingValueForIndex to
|
||||
// indicate that an index could not be build for the object. It will translate
|
||||
// this error into a false value to satisfy the memdb.MultiIndexer interface.
|
||||
type writeIndexMulti func(raw interface{}) ([][]byte, error)
|
||||
|
||||
func (f writeIndexMulti) FromObject(raw interface{}) (bool, [][]byte, error) {
|
||||
v, err := f(raw)
|
||||
if errors.Is(err, errMissingValueForIndex) {
|
||||
return false, nil, nil
|
||||
}
|
||||
return err == nil, v, err
|
||||
}
|
||||
|
||||
const null = "\x00"
|
||||
|
||||
// indexBuilder is a buffer used to construct memdb index values.
|
||||
type indexBuilder bytes.Buffer
|
||||
|
||||
// String appends the string and a null terminator to the buffer.
|
||||
func (b *indexBuilder) String(v string) {
|
||||
(*bytes.Buffer)(b).WriteString(v)
|
||||
(*bytes.Buffer)(b).WriteString(null)
|
||||
}
|
||||
|
||||
func (b *indexBuilder) Bytes() []byte {
|
||||
return (*bytes.Buffer)(b).Bytes()
|
||||
}
|
|
@ -14,7 +14,9 @@ import (
|
|||
"github.com/hashicorp/consul/internal/testing/golden"
|
||||
)
|
||||
|
||||
func TestStateStoreSchema(t *testing.T) {
|
||||
// TODO: once TestNewDBSchema_Indexers has test cases for all tables and indexes
|
||||
// it is probably safe to remove this test
|
||||
func TestNewDBSchema(t *testing.T) {
|
||||
schema := newDBSchema()
|
||||
require.NoError(t, schema.Validate())
|
||||
|
||||
|
@ -67,25 +69,30 @@ func formatIndexer(buf *bytes.Buffer, indexer memdb.Indexer) {
|
|||
for i := 0; i < typ.NumField(); i++ {
|
||||
fmt.Fprintf(buf, " %v=", typ.Field(i).Name)
|
||||
|
||||
field := v.Field(i)
|
||||
switch typ.Field(i).Type.Kind() {
|
||||
case reflect.Slice:
|
||||
buf.WriteString("[")
|
||||
for j := 0; j < field.Len(); j++ {
|
||||
if j != 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
// TODO: handle other types of slices
|
||||
formatIndexer(buf, v.Field(i).Index(j).Interface().(memdb.Indexer))
|
||||
formatField(buf, v.Field(i))
|
||||
}
|
||||
}
|
||||
|
||||
func formatField(buf *bytes.Buffer, field reflect.Value) {
|
||||
switch field.Type().Kind() {
|
||||
case reflect.Slice:
|
||||
buf.WriteString("[")
|
||||
for j := 0; j < field.Len(); j++ {
|
||||
if j != 0 {
|
||||
buf.WriteString(", ")
|
||||
}
|
||||
buf.WriteString("]")
|
||||
case reflect.Func:
|
||||
// Functions are printed as pointer addresses, which change frequently.
|
||||
// Instead use the name.
|
||||
buf.WriteString(runtime.FuncForPC(field.Pointer()).Name())
|
||||
default:
|
||||
fmt.Fprintf(buf, "%v", field)
|
||||
// TODO: handle other types of slices
|
||||
formatIndexer(buf, field.Index(j).Interface().(memdb.Indexer))
|
||||
}
|
||||
buf.WriteString("]")
|
||||
case reflect.Func:
|
||||
// Functions are printed as pointer addresses, which change frequently.
|
||||
// Instead use the name.
|
||||
buf.WriteString(runtime.FuncForPC(field.Pointer()).Name())
|
||||
case reflect.Interface:
|
||||
formatField(buf, field.Elem())
|
||||
default:
|
||||
fmt.Fprintf(buf, "%v", field)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -98,3 +105,82 @@ func indexNames(table *memdb.TableSchema) []string {
|
|||
sort.Strings(indexes)
|
||||
return indexes
|
||||
}
|
||||
|
||||
type indexerTestCase struct {
|
||||
read indexValue
|
||||
write indexValue
|
||||
prefix []indexValue
|
||||
writeMulti indexValueMulti
|
||||
}
|
||||
|
||||
type indexValue struct {
|
||||
source interface{}
|
||||
expected []byte
|
||||
}
|
||||
|
||||
type indexValueMulti struct {
|
||||
source interface{}
|
||||
expected [][]byte
|
||||
}
|
||||
|
||||
func TestNewDBSchema_Indexers(t *testing.T) {
|
||||
schema := newDBSchema()
|
||||
require.NoError(t, schema.Validate())
|
||||
|
||||
var testcases = map[string]func() map[string]indexerTestCase{
|
||||
tableChecks: testIndexerTableChecks,
|
||||
}
|
||||
|
||||
for _, table := range schema.Tables {
|
||||
if testcases[table.Name] == nil {
|
||||
continue
|
||||
}
|
||||
t.Run(table.Name, func(t *testing.T) {
|
||||
tableTCs := testcases[table.Name]()
|
||||
|
||||
for _, index := range table.Indexes {
|
||||
t.Run(index.Name, func(t *testing.T) {
|
||||
indexer := index.Indexer
|
||||
tc, ok := tableTCs[index.Name]
|
||||
if !ok {
|
||||
t.Skip("TODO: missing test case")
|
||||
}
|
||||
|
||||
args := []interface{}{tc.read.source}
|
||||
if s, ok := tc.read.source.([]interface{}); ok {
|
||||
// Indexes using memdb.CompoundIndex must be expanded to multiple args
|
||||
args = s
|
||||
}
|
||||
|
||||
actual, err := indexer.FromArgs(args...)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.read.expected, actual)
|
||||
|
||||
if i, ok := indexer.(memdb.SingleIndexer); ok {
|
||||
valid, actual, err := i.FromObject(tc.write.source)
|
||||
require.NoError(t, err)
|
||||
require.True(t, valid)
|
||||
require.Equal(t, tc.write.expected, actual)
|
||||
}
|
||||
|
||||
if i, ok := indexer.(memdb.PrefixIndexer); ok {
|
||||
for _, c := range tc.prefix {
|
||||
t.Run("", func(t *testing.T) {
|
||||
actual, err := i.PrefixFromArgs(c.source)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, c.expected, actual)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
if i, ok := indexer.(memdb.MultiIndexer); ok {
|
||||
valid, actual, err := i.FromObject(tc.writeMulti.source)
|
||||
require.NoError(t, err)
|
||||
require.True(t, valid)
|
||||
require.Equal(t, tc.writeMulti.expected, actual)
|
||||
}
|
||||
})
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
|
@ -52,9 +52,7 @@ table=checks
|
|||
index=node allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true
|
||||
index=node_service allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceID Lowercase=true] AllowMissing=false
|
||||
index=node_service_check allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.CompoundIndex Indexes=[github.com/hashicorp/go-memdb.StringFieldIndex Field=Node Lowercase=true, github.com/hashicorp/go-memdb.FieldSetIndex Field=ServiceID] AllowMissing=false
|
||||
indexer=github.com/hashicorp/consul/agent/consul/state.indexerSingle readIndex=github.com/hashicorp/consul/agent/consul/state.indexFromNodeServiceQuery writeIndex=github.com/hashicorp/consul/agent/consul/state.indexNodeServiceFromHealthCheck
|
||||
index=service allow-missing
|
||||
indexer=github.com/hashicorp/go-memdb.StringFieldIndex Field=ServiceName Lowercase=true
|
||||
index=status
|
||||
|
|
Loading…
Reference in New Issue