mirror of https://github.com/hashicorp/consul
state: peering ID assignment cannot happen inside of the state store (#13525)
Move peering ID assignment outisde of the FSM, so that the ID is written to the raft log and the same ID is used by all voters, and after restarts.pull/13528/head
parent
cb01702cd2
commit
e8ea3d7c3b
|
@ -476,6 +476,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
|
|||
|
||||
// Peerings
|
||||
require.NoError(t, fsm.state.PeeringWrite(31, &pbpeering.Peering{
|
||||
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
|
||||
Name: "baz",
|
||||
}))
|
||||
|
||||
|
|
|
@ -7,13 +7,13 @@ import (
|
|||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/stretchr/testify/require"
|
||||
"google.golang.org/grpc"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/sdk/testutil/retry"
|
||||
"github.com/hashicorp/consul/testrpc"
|
||||
|
@ -62,6 +62,10 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
_, found := s1.peeringService.StreamStatus(token.PeerID)
|
||||
require.False(t, found)
|
||||
|
||||
var (
|
||||
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
)
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
|
@ -73,6 +77,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
|
@ -92,6 +97,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
|
|||
|
||||
// Delete the peering to trigger the termination sequence.
|
||||
deleted := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -151,6 +157,11 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
var token structs.PeeringToken
|
||||
require.NoError(t, json.Unmarshal(tokenJSON, &token))
|
||||
|
||||
var (
|
||||
s1PeerID = token.PeerID
|
||||
s2PeerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
)
|
||||
|
||||
// Bring up s2 and store s1's token so that it attempts to dial.
|
||||
_, s2 := testServerWithConfig(t, func(c *Config) {
|
||||
c.NodeName = "s2.dc2"
|
||||
|
@ -162,6 +173,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering with data from a peering token.
|
||||
// Eventually the leader in dc2 should dial and connect to the leader in dc1.
|
||||
p := &pbpeering.Peering{
|
||||
ID: s2PeerID,
|
||||
Name: "my-peer-s1",
|
||||
PeerID: token.PeerID,
|
||||
PeerCAPems: token.CA,
|
||||
|
@ -181,6 +193,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
|
|||
|
||||
// Delete the peering from the server peer to trigger the termination sequence.
|
||||
deleted := &pbpeering.Peering{
|
||||
ID: s1PeerID,
|
||||
Name: "my-peer-s2",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -216,6 +229,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
testrpc.WaitForLeader(t, s1.RPC, "dc1")
|
||||
|
||||
var (
|
||||
peerID = "cc56f0b8-3885-4e78-8d7b-614a0c45712d"
|
||||
peerName = "my-peer-s2"
|
||||
defaultMeta = acl.DefaultEnterpriseMeta()
|
||||
lastIdx = uint64(0)
|
||||
|
@ -224,6 +238,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
// Simulate a peering initiation event by writing a peering to the state store.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
}))
|
||||
|
||||
|
@ -233,6 +248,7 @@ func TestLeader_Peering_DeferredDeletion(t *testing.T) {
|
|||
// Mark the peering for deletion to trigger the termination sequence.
|
||||
lastIdx++
|
||||
require.NoError(t, s1.fsm.State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID,
|
||||
Name: peerName,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
|
|
@ -143,6 +143,17 @@ type peeringApply struct {
|
|||
srv *Server
|
||||
}
|
||||
|
||||
func (a *peeringApply) CheckPeeringUUID(id string) (bool, error) {
|
||||
state := a.srv.fsm.State()
|
||||
if _, existing, err := state.PeeringReadByID(nil, id); err != nil {
|
||||
return false, err
|
||||
} else if existing != nil {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return true, nil
|
||||
}
|
||||
|
||||
func (a *peeringApply) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||
_, err := a.srv.raftApplyProtobuf(structs.PeeringWriteType, req)
|
||||
return err
|
||||
|
|
|
@ -1,12 +1,12 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
|
@ -191,50 +191,47 @@ func (s *Store) peeringListTxn(ws memdb.WatchSet, tx ReadTxn, entMeta acl.Enterp
|
|||
return idx, result, nil
|
||||
}
|
||||
|
||||
func generatePeeringUUID(tx ReadTxn) (string, error) {
|
||||
for {
|
||||
uuid, err := uuid.GenerateUUID()
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to generate UUID: %w", err)
|
||||
}
|
||||
existing, err := peeringReadByIDTxn(tx, nil, uuid)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("failed to read peering: %w", err)
|
||||
}
|
||||
if existing == nil {
|
||||
return uuid, nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
|
||||
tx := s.db.WriteTxn(idx)
|
||||
defer tx.Abort()
|
||||
|
||||
q := Query{
|
||||
Value: p.Name,
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition),
|
||||
// Check that the ID and Name are set.
|
||||
if p.ID == "" {
|
||||
return errors.New("Missing Peering ID")
|
||||
}
|
||||
existingRaw, err := tx.First(tablePeering, indexName, q)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed peering lookup: %w", err)
|
||||
if p.Name == "" {
|
||||
return errors.New("Missing Peering Name")
|
||||
}
|
||||
|
||||
existing, ok := existingRaw.(*pbpeering.Peering)
|
||||
if existingRaw != nil && !ok {
|
||||
return fmt.Errorf("invalid type %T", existingRaw)
|
||||
// ensure the name is unique (cannot conflict with another peering with a different ID)
|
||||
_, existing, err := peeringReadTxn(tx, nil, Query{
|
||||
Value: p.Name,
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(p.Partition),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if existing != nil {
|
||||
if p.ID != existing.ID {
|
||||
return fmt.Errorf("A peering already exists with the name %q and a different ID %q", p.Name, existing.ID)
|
||||
}
|
||||
// Prevent modifications to Peering marked for deletion
|
||||
if !existing.IsActive() {
|
||||
return fmt.Errorf("cannot write to peering that is marked for deletion")
|
||||
}
|
||||
|
||||
p.CreateIndex = existing.CreateIndex
|
||||
p.ID = existing.ID
|
||||
|
||||
p.ModifyIndex = idx
|
||||
} else {
|
||||
idMatch, err := peeringReadByIDTxn(tx, nil, p.ID)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if idMatch != nil {
|
||||
return fmt.Errorf("A peering already exists with the ID %q and a different name %q", p.Name, existing.ID)
|
||||
}
|
||||
|
||||
if !p.IsActive() {
|
||||
return fmt.Errorf("cannot create a new peering marked for deletion")
|
||||
}
|
||||
|
@ -242,13 +239,8 @@ func (s *Store) PeeringWrite(idx uint64, p *pbpeering.Peering) error {
|
|||
// TODO(peering): consider keeping PeeringState enum elsewhere?
|
||||
p.State = pbpeering.PeeringState_INITIAL
|
||||
p.CreateIndex = idx
|
||||
|
||||
p.ID, err = generatePeeringUUID(tx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to generate peering id: %w", err)
|
||||
}
|
||||
p.ModifyIndex = idx
|
||||
}
|
||||
p.ModifyIndex = idx
|
||||
|
||||
if err := tx.Insert(tablePeering, p); err != nil {
|
||||
return fmt.Errorf("failed inserting peering: %w", err)
|
||||
|
|
|
@ -1,13 +1,10 @@
|
|||
package state
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/hashicorp/go-memdb"
|
||||
"github.com/hashicorp/go-uuid"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/hashicorp/consul/acl"
|
||||
|
@ -17,6 +14,12 @@ import (
|
|||
"github.com/hashicorp/consul/sdk/testutil"
|
||||
)
|
||||
|
||||
const (
|
||||
testFooPeerID = "9e650110-ac74-4c5a-a6a8-9348b2bed4e9"
|
||||
testBarPeerID = "5ebcff30-5509-4858-8142-a8e580f1863f"
|
||||
testBazPeerID = "432feb2f-5476-4ae2-b33c-e43640ca0e86"
|
||||
)
|
||||
|
||||
func insertTestPeerings(t *testing.T, s *Store) {
|
||||
t.Helper()
|
||||
|
||||
|
@ -26,7 +29,7 @@ func insertTestPeerings(t *testing.T, s *Store) {
|
|||
err := tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -36,7 +39,7 @@ func insertTestPeerings(t *testing.T, s *Store) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -97,16 +100,16 @@ func TestStateStore_PeeringReadByID(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, peering, err := s.PeeringReadByID(nil, tc.id)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, peering)
|
||||
prototest.AssertDeepEqual(t, tc.expect, peering)
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
name: "get foo",
|
||||
id: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
id: testFooPeerID,
|
||||
expect: &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -114,11 +117,11 @@ func TestStateStore_PeeringReadByID(t *testing.T) {
|
|||
},
|
||||
{
|
||||
name: "get bar",
|
||||
id: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
id: testBarPeerID,
|
||||
expect: &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -149,7 +152,7 @@ func TestStateStore_PeeringRead(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, peering, err := s.PeeringRead(nil, tc.query)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, peering)
|
||||
prototest.AssertDeepEqual(t, tc.expect, peering)
|
||||
}
|
||||
tcs := []testcase{
|
||||
{
|
||||
|
@ -160,7 +163,7 @@ func TestStateStore_PeeringRead(t *testing.T) {
|
|||
expect: &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -189,6 +192,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
|
||||
// set up initial write
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -210,6 +214,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -229,6 +234,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// unrelated write shouldn't fire watch
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -237,6 +243,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// foo write should fire watch
|
||||
lastIdx++
|
||||
err = s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
})
|
||||
|
@ -261,6 +268,7 @@ func TestStore_Peering_Watch(t *testing.T) {
|
|||
// mark for deletion before actually deleting
|
||||
lastIdx++
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testBarPeerID,
|
||||
Name: "bar",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
})
|
||||
|
@ -293,7 +301,7 @@ func TestStore_PeeringList(t *testing.T) {
|
|||
{
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -301,7 +309,7 @@ func TestStore_PeeringList(t *testing.T) {
|
|||
{
|
||||
Name: "bar",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
|
@ -336,6 +344,7 @@ func TestStore_PeeringList_Watch(t *testing.T) {
|
|||
lastIdx++
|
||||
// insert a peering
|
||||
err := s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
})
|
||||
|
@ -357,6 +366,7 @@ func TestStore_PeeringList_Watch(t *testing.T) {
|
|||
// update peering
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -422,6 +432,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "create baz",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
},
|
||||
|
@ -429,6 +440,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "update baz",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
State: pbpeering.PeeringState_FAILING,
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -437,6 +449,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "mark baz for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
State: pbpeering.PeeringState_TERMINATED,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
|
@ -446,6 +459,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "cannot update peering marked for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testBazPeerID,
|
||||
Name: "baz",
|
||||
// Attempt to add metadata
|
||||
Meta: map[string]string{
|
||||
|
@ -458,6 +472,7 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
{
|
||||
name: "cannot create peering marked for deletion",
|
||||
input: &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
|
||||
|
@ -472,54 +487,6 @@ func TestStore_PeeringWrite(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
func TestStore_PeeringWrite_GenerateUUID(t *testing.T) {
|
||||
rand.Seed(1)
|
||||
|
||||
s := NewStateStore(nil)
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
||||
partition := entMeta.PartitionOrDefault()
|
||||
|
||||
for i := 1; i < 11; i++ {
|
||||
require.NoError(t, s.PeeringWrite(uint64(i), &pbpeering.Peering{
|
||||
Name: fmt.Sprintf("peering-%d", i),
|
||||
Partition: partition,
|
||||
}))
|
||||
}
|
||||
|
||||
idx, peerings, err := s.PeeringList(nil, *entMeta)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(10), idx)
|
||||
require.Len(t, peerings, 10)
|
||||
|
||||
// Ensure that all assigned UUIDs are unique.
|
||||
uniq := make(map[string]struct{})
|
||||
for _, p := range peerings {
|
||||
uniq[p.ID] = struct{}{}
|
||||
}
|
||||
require.Len(t, uniq, 10)
|
||||
|
||||
// Ensure that the ID of an existing peering cannot be overwritten.
|
||||
updated := &pbpeering.Peering{
|
||||
Name: peerings[0].Name,
|
||||
Partition: peerings[0].Partition,
|
||||
}
|
||||
|
||||
// Attempt to overwrite ID.
|
||||
updated.ID, err = uuid.GenerateUUID()
|
||||
require.NoError(t, err)
|
||||
require.NoError(t, s.PeeringWrite(11, updated))
|
||||
|
||||
q := Query{
|
||||
Value: updated.Name,
|
||||
EnterpriseMeta: *entMeta,
|
||||
}
|
||||
idx, got, err := s.PeeringRead(nil, q)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, uint64(11), idx)
|
||||
require.Equal(t, peerings[0].ID, got.ID)
|
||||
}
|
||||
|
||||
func TestStore_PeeringDelete(t *testing.T) {
|
||||
s := NewStateStore(nil)
|
||||
insertTestPeerings(t, s)
|
||||
|
@ -532,6 +499,7 @@ func TestStore_PeeringDelete(t *testing.T) {
|
|||
|
||||
testutil.RunStep(t, "can delete after marking for deletion", func(t *testing.T) {
|
||||
require.NoError(t, s.PeeringWrite(11, &pbpeering.Peering{
|
||||
ID: testFooPeerID,
|
||||
Name: "foo",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
@ -550,7 +518,7 @@ func TestStore_PeeringTerminateByID(t *testing.T) {
|
|||
insertTestPeerings(t, s)
|
||||
|
||||
// id corresponding to default/foo
|
||||
id := "9e650110-ac74-4c5a-a6a8-9348b2bed4e9"
|
||||
const id = testFooPeerID
|
||||
|
||||
require.NoError(t, s.PeeringTerminateByID(10, id))
|
||||
|
||||
|
@ -607,7 +575,7 @@ func TestStateStore_PeeringTrustBundleRead(t *testing.T) {
|
|||
run := func(t *testing.T, tc testcase) {
|
||||
_, ptb, err := s.PeeringTrustBundleRead(nil, tc.query)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, tc.expect, ptb)
|
||||
prototest.AssertDeepEqual(t, tc.expect, ptb)
|
||||
}
|
||||
|
||||
entMeta := structs.NodeEnterpriseMetaInDefaultPartition()
|
||||
|
@ -708,6 +676,7 @@ func TestStateStore_ExportedServicesForPeer(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(),
|
||||
Name: "my-peering",
|
||||
}))
|
||||
|
||||
|
@ -1000,6 +969,9 @@ func TestStateStore_PeeringsForService(t *testing.T) {
|
|||
var lastIdx uint64
|
||||
// Create peerings
|
||||
for _, tp := range tc.peerings {
|
||||
if tp.peering.ID == "" {
|
||||
tp.peering.ID = testUUID()
|
||||
}
|
||||
lastIdx++
|
||||
require.NoError(t, s.PeeringWrite(lastIdx, tp.peering))
|
||||
|
||||
|
@ -1009,6 +981,7 @@ func TestStateStore_PeeringsForService(t *testing.T) {
|
|||
lastIdx++
|
||||
|
||||
copied := pbpeering.Peering{
|
||||
ID: tp.peering.ID,
|
||||
Name: tp.peering.Name,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}
|
||||
|
@ -1247,6 +1220,11 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
var lastIdx uint64
|
||||
ws := memdb.NewWatchSet()
|
||||
|
||||
var (
|
||||
peerID1 = testUUID()
|
||||
peerID2 = testUUID()
|
||||
)
|
||||
|
||||
testutil.RunStep(t, "no results on initial setup", func(t *testing.T) {
|
||||
idx, resp, err := store.TrustBundleListByService(ws, "foo", entMeta)
|
||||
require.NoError(t, err)
|
||||
|
@ -1279,6 +1257,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "creating peering does not yield trust bundles", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID1,
|
||||
Name: "peer1",
|
||||
}))
|
||||
|
||||
|
@ -1377,6 +1356,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "bundles for other peers are ignored", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID2,
|
||||
Name: "peer2",
|
||||
}))
|
||||
|
||||
|
@ -1431,6 +1411,7 @@ func TestStore_TrustBundleListByService(t *testing.T) {
|
|||
testutil.RunStep(t, "deleting the peering excludes its trust bundle", func(t *testing.T) {
|
||||
lastIdx++
|
||||
require.NoError(t, store.PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: peerID1,
|
||||
Name: "peer1",
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
}))
|
||||
|
@ -1470,7 +1451,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err := tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "foo",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "9e650110-ac74-4c5a-a6a8-9348b2bed4e9",
|
||||
ID: testFooPeerID,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
CreateIndex: 1,
|
||||
ModifyIndex: 1,
|
||||
|
@ -1480,7 +1461,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "bar",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "5ebcff30-5509-4858-8142-a8e580f1863f",
|
||||
ID: testBarPeerID,
|
||||
CreateIndex: 2,
|
||||
ModifyIndex: 2,
|
||||
})
|
||||
|
@ -1489,7 +1470,7 @@ func TestStateStore_Peering_ListDeleted(t *testing.T) {
|
|||
err = tx.Insert(tablePeering, &pbpeering.Peering{
|
||||
Name: "baz",
|
||||
Partition: acl.DefaultPartitionName,
|
||||
ID: "432feb2f-5476-4ae2-b33c-e43640ca0e86",
|
||||
ID: testBazPeerID,
|
||||
DeletedAt: structs.TimeToProto(time.Now()),
|
||||
CreateIndex: 3,
|
||||
ModifyIndex: 3,
|
||||
|
|
|
@ -24,6 +24,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/dns"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
)
|
||||
|
||||
|
@ -140,6 +141,7 @@ type Store interface {
|
|||
|
||||
// Apply provides a write-only interface for persisting Peering data.
|
||||
type Apply interface {
|
||||
CheckPeeringUUID(id string) (bool, error)
|
||||
PeeringWrite(req *pbpeering.PeeringWriteRequest) error
|
||||
PeeringTerminateByID(req *pbpeering.PeeringTerminateByIDRequest) error
|
||||
PeeringTrustBundleWrite(req *pbpeering.PeeringTrustBundleWriteRequest) error
|
||||
|
@ -189,8 +191,16 @@ func (s *Service) GenerateToken(
|
|||
return nil, err
|
||||
}
|
||||
|
||||
canRetry := true
|
||||
RETRY_ONCE:
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
writeReq := pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: id,
|
||||
Name: req.PeerName,
|
||||
// TODO(peering): Normalize from ACL token once this endpoint is guarded by ACLs.
|
||||
Partition: req.PartitionOrDefault(),
|
||||
|
@ -198,6 +208,15 @@ func (s *Service) GenerateToken(
|
|||
},
|
||||
}
|
||||
if err := s.Backend.Apply().PeeringWrite(&writeReq); err != nil {
|
||||
// There's a possible race where two servers call Generate Token at the
|
||||
// same time with the same peer name for the first time. They both
|
||||
// generate an ID and try to insert and only one wins. This detects the
|
||||
// collision and forces the loser to discard its generated ID and use
|
||||
// the one from the other server.
|
||||
if canRetry && strings.Contains(err.Error(), "A peering already exists with the name") {
|
||||
canRetry = false
|
||||
goto RETRY_ONCE
|
||||
}
|
||||
return nil, fmt.Errorf("failed to write peering: %w", err)
|
||||
}
|
||||
|
||||
|
@ -270,6 +289,11 @@ func (s *Service) Establish(
|
|||
serverAddrs[i] = addr
|
||||
}
|
||||
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.PeerName, req.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// as soon as a peering is written with a list of ServerAddresses that is
|
||||
// non-empty, the leader routine will see the peering and attempt to
|
||||
// establish a connection with the remote peer.
|
||||
|
@ -278,6 +302,7 @@ func (s *Service) Establish(
|
|||
// RemotePeerID(PeerID) but at this point the other peer does not.
|
||||
writeReq := &pbpeering.PeeringWriteRequest{
|
||||
Peering: &pbpeering.Peering{
|
||||
ID: id,
|
||||
Name: req.PeerName,
|
||||
PeerCAPems: tok.CA,
|
||||
PeerServerAddresses: serverAddrs,
|
||||
|
@ -368,6 +393,16 @@ func (s *Service) PeeringWrite(ctx context.Context, req *pbpeering.PeeringWriteR
|
|||
defer metrics.MeasureSince([]string{"peering", "write"}, time.Now())
|
||||
// TODO(peering): ACL check request token
|
||||
|
||||
if req.Peering == nil {
|
||||
return nil, fmt.Errorf("missing required peering body")
|
||||
}
|
||||
|
||||
id, err := s.getExistingOrCreateNewPeerID(req.Peering.Name, req.Peering.Partition)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
req.Peering.ID = id
|
||||
|
||||
// TODO(peering): handle blocking queries
|
||||
err = s.Backend.Apply().PeeringWrite(req)
|
||||
if err != nil {
|
||||
|
@ -418,6 +453,7 @@ func (s *Service) PeeringDelete(ctx context.Context, req *pbpeering.PeeringDelet
|
|||
// We only need to include the name and partition for the peering to be identified.
|
||||
// All other data associated with the peering can be discarded because once marked
|
||||
// for deletion the peering is effectively gone.
|
||||
ID: existing.ID,
|
||||
Name: req.Name,
|
||||
Partition: req.Partition,
|
||||
DeletedAt: structs.TimeToProto(time.Now().UTC()),
|
||||
|
@ -837,6 +873,26 @@ func getTrustDomain(store Store, logger hclog.Logger) (string, error) {
|
|||
return connect.SpiffeIDSigningForCluster(cfg.ClusterID).Host(), nil
|
||||
}
|
||||
|
||||
func (s *Service) getExistingOrCreateNewPeerID(peerName, partition string) (string, error) {
|
||||
q := state.Query{
|
||||
Value: strings.ToLower(peerName),
|
||||
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(partition),
|
||||
}
|
||||
_, peering, err := s.Backend.Store().PeeringRead(nil, q)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
if peering != nil {
|
||||
return peering.ID, nil
|
||||
}
|
||||
|
||||
id, err := lib.GenerateUUID(s.Backend.Apply().CheckPeeringUUID)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
return id, nil
|
||||
}
|
||||
|
||||
func (s *Service) StreamStatus(peer string) (resp StreamStatus, found bool) {
|
||||
return s.streams.streamStatus(peer)
|
||||
}
|
||||
|
|
|
@ -30,6 +30,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/agent/token"
|
||||
"github.com/hashicorp/consul/api"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
"github.com/hashicorp/consul/proto/prototest"
|
||||
|
@ -224,6 +225,7 @@ func TestPeeringService_Read(t *testing.T) {
|
|||
|
||||
// insert peering directly to state store
|
||||
p := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -279,6 +281,7 @@ func TestPeeringService_Delete(t *testing.T) {
|
|||
s := newTestServer(t, nil)
|
||||
|
||||
p := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -316,6 +319,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
// Note that the state store holds reference to the underlying
|
||||
// variables; do not modify them after writing.
|
||||
foo := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerCAPems: nil,
|
||||
|
@ -324,6 +328,7 @@ func TestPeeringService_List(t *testing.T) {
|
|||
}
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(10, foo))
|
||||
bar := &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "bar",
|
||||
State: pbpeering.PeeringState_ACTIVE,
|
||||
PeerCAPems: nil,
|
||||
|
@ -405,6 +410,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "foo",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerServerName: "test",
|
||||
|
@ -413,6 +419,7 @@ func TestPeeringService_TrustBundleListByService(t *testing.T) {
|
|||
|
||||
lastIdx++
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(lastIdx, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "bar",
|
||||
State: pbpeering.PeeringState_INITIAL,
|
||||
PeerServerName: "test-bar",
|
||||
|
@ -513,6 +520,7 @@ func Test_StreamHandler_UpsertServices(t *testing.T) {
|
|||
)
|
||||
|
||||
require.NoError(t, s.Server.FSM().State().PeeringWrite(0, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: "my-peer",
|
||||
}))
|
||||
|
||||
|
@ -998,7 +1006,9 @@ func newDefaultDeps(t *testing.T, c *consul.Config) consul.Deps {
|
|||
}
|
||||
|
||||
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||
t.Helper()
|
||||
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
@ -1009,3 +1019,9 @@ func setupTestPeering(t *testing.T, store *state.Store, name string, index uint6
|
|||
|
||||
return p.ID
|
||||
}
|
||||
|
||||
func testUUID(t *testing.T) string {
|
||||
v, err := lib.GenerateUUID(nil)
|
||||
require.NoError(t, err)
|
||||
return v
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ import (
|
|||
"github.com/hashicorp/consul/agent/consul/state"
|
||||
"github.com/hashicorp/consul/agent/consul/stream"
|
||||
"github.com/hashicorp/consul/agent/structs"
|
||||
"github.com/hashicorp/consul/lib"
|
||||
"github.com/hashicorp/consul/proto/pbcommon"
|
||||
"github.com/hashicorp/consul/proto/pbpeering"
|
||||
"github.com/hashicorp/consul/proto/pbservice"
|
||||
|
@ -1030,6 +1031,10 @@ type testApplier struct {
|
|||
store *state.Store
|
||||
}
|
||||
|
||||
func (a *testApplier) CheckPeeringUUID(id string) (bool, error) {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
||||
func (a *testApplier) PeeringWrite(req *pbpeering.PeeringWriteRequest) error {
|
||||
panic("not implemented")
|
||||
}
|
||||
|
@ -1216,6 +1221,7 @@ func writeEstablishedPeering(t *testing.T, store *state.Store, idx uint64, peerN
|
|||
require.NoError(t, err)
|
||||
|
||||
peering := pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: peerName,
|
||||
PeerID: remotePeerID,
|
||||
}
|
||||
|
@ -2169,5 +2175,10 @@ func requireEqualInstances(t *testing.T, expect, got structs.CheckServiceNodes)
|
|||
require.Equal(t, expect[i].Checks[j].PartitionOrDefault(), got[i].Checks[j].PartitionOrDefault(), "partition mismatch")
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testUUID(t *testing.T) string {
|
||||
v, err := lib.GenerateUUID(nil)
|
||||
require.NoError(t, err)
|
||||
return v
|
||||
}
|
||||
|
|
|
@ -589,6 +589,7 @@ func (b *testSubscriptionBackend) ensureCARoots(t *testing.T, roots ...*structs.
|
|||
|
||||
func setupTestPeering(t *testing.T, store *state.Store, name string, index uint64) string {
|
||||
err := store.PeeringWrite(index, &pbpeering.Peering{
|
||||
ID: testUUID(t),
|
||||
Name: name,
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
|
Loading…
Reference in New Issue