fix: persist peering CA updates to dialing clusters (#15243)

fix: persist peering CA updates to dialing clusters
pull/15269/head
Dan Stough 2 years ago committed by GitHub
parent 18d6c338f4
commit 553312ef61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -482,7 +482,7 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
require.NoError(t, fsm.state.PeeringWrite(31, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Name: "baz",
Name: "qux",
},
SecretsRequest: &pbpeering.SecretsWriteRequest{
PeerID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
@ -821,12 +821,12 @@ func TestFSM_SnapshotRestore_OSS(t *testing.T) {
// Verify peering is restored
idx, prngRestored, err := fsm2.state.PeeringRead(nil, state.Query{
Value: "baz",
Value: "qux",
})
require.NoError(t, err)
require.Equal(t, uint64(31), idx)
require.Equal(t, uint64(32), idx) // This is the index of the PTB write, which updates the peering
require.NotNil(t, prngRestored)
require.Equal(t, "baz", prngRestored.Name)
require.Equal(t, "qux", prngRestored.Name)
// Verify peering secrets are restored
secretsRestored, err := fsm2.state.PeeringSecretsRead(nil, "1fabcd52-1d46-49b0-b1d8-71559aee47f5")

@ -12,6 +12,7 @@ import (
"github.com/hashicorp/consul/acl"
"github.com/hashicorp/consul/agent/configentry"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/proto/pbpeering"
)
@ -1144,12 +1145,56 @@ func peeringTrustBundleReadTxn(tx ReadTxn, ws memdb.WatchSet, q Query) (uint64,
return ptb.ModifyIndex, ptb, nil
}
// PeeringTrustBundleWrite writes ptb to the state store. If there is an existing trust bundle with the given peer name,
// it will be overwritten.
// PeeringTrustBundleWrite writes ptb to the state store.
// It also updates the corresponding peering object with the new certs.
// If there is an existing trust bundle with the given peer name, it will be overwritten.
// If there is no corresponding peering, then an error is returned.
func (s *Store) PeeringTrustBundleWrite(idx uint64, ptb *pbpeering.PeeringTrustBundle) error {
tx := s.db.WriteTxn(idx)
defer tx.Abort()
if ptb.PeerName == "" {
return errors.New("missing peer name")
}
// Check for the existence of the peering object
_, existingPeering, err := peeringReadTxn(tx, nil, Query{
Value: ptb.PeerName,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(ptb.Partition),
})
if err != nil {
return err
}
if existingPeering == nil {
return fmt.Errorf("cannot write peering trust bundle for unknown peering %s", ptb.PeerName)
}
// Prevent modifications to Peering marked for deletion.
// This blocks generating new peering tokens or re-establishing the peering until the peering is done deleting.
if existingPeering.State == pbpeering.PeeringState_DELETING {
return fmt.Errorf("cannot write to peering that is marked for deletion")
}
c := proto.Clone(existingPeering)
clone, ok := c.(*pbpeering.Peering)
if !ok {
return fmt.Errorf("invalid type %T, expected *pbpeering.Peering", clone)
}
// Update the certs on the peering
rootPEMs := make([]string, 0, len(ptb.RootPEMs))
for _, c := range ptb.RootPEMs {
rootPEMs = append(rootPEMs, lib.EnsureTrailingNewline(c))
}
clone.PeerCAPems = rootPEMs
clone.ModifyIndex = idx
if err := tx.Insert(tablePeering, clone); err != nil {
return fmt.Errorf("failed inserting peering: %w", err)
}
if err := updatePeeringTableIndexes(tx, idx, clone.PartitionOrDefault()); err != nil {
return err
}
// Check for the existing trust bundle and update
q := Query{
Value: ptb.PeerName,
EnterpriseMeta: *structs.NodeEnterpriseMetaInPartition(ptb.Partition),
@ -1159,13 +1204,13 @@ func (s *Store) PeeringTrustBundleWrite(idx uint64, ptb *pbpeering.PeeringTrustB
return fmt.Errorf("failed peering trust bundle lookup: %w", err)
}
existing, ok := existingRaw.(*pbpeering.PeeringTrustBundle)
existingPTB, ok := existingRaw.(*pbpeering.PeeringTrustBundle)
if existingRaw != nil && !ok {
return fmt.Errorf("invalid type %T", existingRaw)
}
if existing != nil {
ptb.CreateIndex = existing.CreateIndex
if existingPTB != nil {
ptb.CreateIndex = existingPTB.CreateIndex
} else {
ptb.CreateIndex = idx

@ -95,13 +95,42 @@ func insertTestPeeringTrustBundles(t *testing.T, s *Store) {
tx := s.db.WriteTxn(0)
defer tx.Abort()
err := tx.Insert(tablePeeringTrustBundles, &pbpeering.PeeringTrustBundle{
// Insert peerings since it is assumed they exist before the trust bundle is created
err := tx.Insert(tablePeering, &pbpeering.Peering{
Name: "foo",
ID: "89b8209d-0b64-45e2-8692-6c60181edbe7",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
PeerCAPems: []string{},
PeerServerName: "foo.com",
CreateIndex: 1,
ModifyIndex: 1,
})
require.NoError(t, err)
err = tx.Insert(tablePeering, &pbpeering.Peering{
Name: "baz",
ID: "d8230482-ae98-4b82-903f-e1ada3000ad4",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
PeerCAPems: []string{"old baz certificate bundle"},
PeerServerName: "baz.com",
CreateIndex: 2,
ModifyIndex: 2,
})
require.NoError(t, err)
err = tx.Insert(tableIndex, &IndexEntry{
Key: tablePeering,
Value: 2,
})
require.NoError(t, err)
err = tx.Insert(tablePeeringTrustBundles, &pbpeering.PeeringTrustBundle{
TrustDomain: "foo.com",
PeerName: "foo",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
RootPEMs: []string{"foo certificate bundle"},
CreateIndex: 1,
ModifyIndex: 1,
CreateIndex: 3,
ModifyIndex: 3,
})
require.NoError(t, err)
@ -110,14 +139,14 @@ func insertTestPeeringTrustBundles(t *testing.T, s *Store) {
PeerName: "bar",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
RootPEMs: []string{"bar certificate bundle"},
CreateIndex: 2,
ModifyIndex: 2,
CreateIndex: 4,
ModifyIndex: 4,
})
require.NoError(t, err)
err = tx.Insert(tableIndex, &IndexEntry{
Key: tablePeeringTrustBundles,
Value: 2,
Value: 4,
})
require.NoError(t, err)
require.NoError(t, tx.Commit())
@ -1549,16 +1578,16 @@ func TestStateStore_PeeringTrustBundleList(t *testing.T) {
PeerName: "bar",
Partition: entMeta.PartitionOrEmpty(),
RootPEMs: []string{"bar certificate bundle"},
CreateIndex: 2,
ModifyIndex: 2,
CreateIndex: 4,
ModifyIndex: 4,
},
{
TrustDomain: "foo.com",
PeerName: "foo",
Partition: entMeta.PartitionOrEmpty(),
RootPEMs: []string{"foo certificate bundle"},
CreateIndex: 1,
ModifyIndex: 1,
CreateIndex: 3,
ModifyIndex: 3,
},
}
@ -1596,8 +1625,8 @@ func TestStateStore_PeeringTrustBundleRead(t *testing.T) {
PeerName: "foo",
Partition: entMeta.PartitionOrEmpty(),
RootPEMs: []string{"foo certificate bundle"},
CreateIndex: 1,
ModifyIndex: 1,
CreateIndex: 3,
ModifyIndex: 3,
},
},
{
@ -1619,11 +1648,14 @@ func TestStore_PeeringTrustBundleWrite(t *testing.T) {
s := NewStateStore(nil)
insertTestPeeringTrustBundles(t, s)
type testcase struct {
name string
input *pbpeering.PeeringTrustBundle
name string
input *pbpeering.PeeringTrustBundle
expectErr string
}
run := func(t *testing.T, tc testcase) {
require.NoError(t, s.PeeringTrustBundleWrite(10, tc.input))
run := func(t *testing.T, tc testcase) error {
if err := s.PeeringTrustBundleWrite(10, tc.input); err != nil {
return err
}
q := Query{
Value: tc.input.PeerName,
@ -1634,6 +1666,16 @@ func TestStore_PeeringTrustBundleWrite(t *testing.T) {
require.NotNil(t, ptb)
require.Equal(t, tc.input.TrustDomain, ptb.TrustDomain)
require.Equal(t, tc.input.PeerName, ptb.PeerName)
// Validate peering object has certs updated
_, peering, err := s.PeeringRead(nil, Query{
Value: tc.input.PeerName,
})
require.NoError(t, err)
require.NotNil(t, peering)
require.Equal(t, tc.input.RootPEMs, peering.PeerCAPems)
return nil
}
tcs := []testcase{
{
@ -1641,6 +1683,7 @@ func TestStore_PeeringTrustBundleWrite(t *testing.T) {
input: &pbpeering.PeeringTrustBundle{
TrustDomain: "baz.com",
PeerName: "baz",
RootPEMs: []string{"FAKE PEM HERE\n", "FAKE PEM HERE\n"},
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
},
@ -1648,14 +1691,37 @@ func TestStore_PeeringTrustBundleWrite(t *testing.T) {
name: "update foo",
input: &pbpeering.PeeringTrustBundle{
TrustDomain: "foo-updated.com",
RootPEMs: []string{"FAKE PEM HERE\n"},
PeerName: "foo",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
},
{
name: "create bar without existing peering",
input: &pbpeering.PeeringTrustBundle{
TrustDomain: "bar.com",
PeerName: "bar",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
expectErr: "cannot write peering trust bundle for unknown peering",
},
{
name: "create without a peer name",
input: &pbpeering.PeeringTrustBundle{
TrustDomain: "bar.com",
Partition: structs.NodeEnterpriseMetaInDefaultPartition().PartitionOrEmpty(),
},
expectErr: "missing peer name",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
run(t, tc)
err := run(t, tc)
if err != nil && tc.expectErr != "" {
require.Contains(t, err.Error(), tc.expectErr)
return
}
require.NoError(t, err, "received unexpected test case error")
})
}
}
@ -1668,7 +1734,7 @@ func TestStore_PeeringTrustBundleDelete(t *testing.T) {
require.NoError(t, s.PeeringTrustBundleDelete(10, q))
_, ptb, err := s.PeeringRead(nil, q)
_, ptb, err := s.PeeringTrustBundleRead(nil, q)
require.NoError(t, err)
require.Nil(t, ptb)
}
@ -2675,12 +2741,12 @@ func TestStateStore_Peering_Snapshot_Restore(t *testing.T) {
expectedPeering := &pbpeering.Peering{
ID: "1fabcd52-1d46-49b0-b1d8-71559aee47f5",
Name: "baz",
Name: "example",
}
expectedTrustBundle := &pbpeering.PeeringTrustBundle{
TrustDomain: "example.com",
PeerName: "example",
RootPEMs: []string{"example certificate bundle"},
RootPEMs: []string{"example certificate bundle\n"},
}
expectedSecret := &pbpeering.PeeringSecrets{
PeerID: expectedPeering.ID,
@ -2728,7 +2794,10 @@ func TestStateStore_Peering_Snapshot_Restore(t *testing.T) {
for entry := iter.Next(); entry != nil; entry = iter.Next() {
peeringDump = append(peeringDump, entry.(*pbpeering.Peering))
}
require.Equal(t, []*pbpeering.Peering{expectedPeering}, peeringDump)
expectedPeering.ModifyIndex = expectedTrustBundle.ModifyIndex
expectedPeering.PeerCAPems = expectedTrustBundle.RootPEMs
require.Len(t, peeringDump, 1)
prototest.AssertDeepEqual(t, expectedPeering, peeringDump[0])
}
// Verify trust bundles
{
@ -2771,7 +2840,8 @@ func TestStateStore_Peering_Snapshot_Restore(t *testing.T) {
{
idx, foundPeerings, err := s.PeeringList(nil, *acl.DefaultEnterpriseMeta())
require.NoError(t, err)
require.Equal(t, uint64(1001), idx)
// This is 1002 because the trust bundle write updates the underlying peering
require.Equal(t, uint64(1002), idx)
require.Equal(t, []*pbpeering.Peering{expectedPeering}, foundPeerings)
}
// Verify trust Bundles

@ -25,6 +25,14 @@ func TestServerTrustBundle(t *testing.T) {
store := state.NewStateStore(nil)
// Peering must exist for ptb write to succeed
require.NoError(t, store.PeeringWrite(index-1, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: peerName,
ID: "2ae8c79e-242e-4f4a-afd6-9aede8831c5f",
},
}))
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
PeerName: peerName,
TrustDomain: "before.com",
@ -67,6 +75,14 @@ func TestServerTrustBundle_ACLEnforcement(t *testing.T) {
store := state.NewStateStore(nil)
// Peering must exist for ptb write to succeed
require.NoError(t, store.PeeringWrite(index-1, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: peerName,
ID: "2ae8c79e-242e-4f4a-afd6-9aede8831c5f",
},
}))
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
PeerName: peerName,
TrustDomain: "before.com",
@ -186,6 +202,21 @@ func TestServerTrustBundleList(t *testing.T) {
store := state.NewStateStore(nil)
require.NoError(t, store.CASetConfig(index, &structs.CAConfiguration{ClusterID: "cluster-id"}))
// Peering must exist for ptb write to succeed
require.NoError(t, store.PeeringWrite(index, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer1",
ID: "2ae8c79e-242e-4f4a-afd6-9aede8831c5f",
},
}))
require.NoError(t, store.PeeringWrite(index, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer2",
ID: "e69f14e3-f253-43bc-bdbe-888994ca4f81",
},
}))
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
PeerName: "peer1",
}))
@ -301,6 +332,21 @@ func TestServerTrustBundleList_ACLEnforcement(t *testing.T) {
store := state.NewStateStore(nil)
require.NoError(t, store.CASetConfig(index, &structs.CAConfiguration{ClusterID: "cluster-id"}))
// Peering must exist for ptb write to succeed
require.NoError(t, store.PeeringWrite(index, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer1",
ID: "2ae8c79e-242e-4f4a-afd6-9aede8831c5f",
},
}))
require.NoError(t, store.PeeringWrite(index, &pbpeering.PeeringWriteRequest{
Peering: &pbpeering.Peering{
Name: "peer2",
ID: "e69f14e3-f253-43bc-bdbe-888994ca4f81",
},
}))
require.NoError(t, store.PeeringTrustBundleWrite(index, &pbpeering.PeeringTrustBundle{
PeerName: "peer1",
}))

@ -9,10 +9,13 @@ import (
"github.com/testcontainers/testcontainers-go"
)
const latestEnvoyVersion = "1.23.1"
const envoyEnvKey = "ENVOY_VERSION"
const (
envoyEnvKey = "ENVOY_VERSION"
envoyLogLevel = "info"
envoyVersion = "1.23.1"
const hashicorpDockerProxy = "docker.mirror.hashicorp.services"
hashicorpDockerProxy = "docker.mirror.hashicorp.services"
)
//go:embed assets/Dockerfile-consul-envoy
var consulEnvoyDockerfile string
@ -53,5 +56,5 @@ func getEnvoyVersion() string {
if version, ok := os.LookupEnv(envoyEnvKey); ok && version != "" {
return version
}
return latestEnvoyVersion
return envoyVersion
}

@ -94,7 +94,7 @@ func NewConnectService(ctx context.Context, name string, serviceName string, ser
"-grpc-addr", fmt.Sprintf("%s:8502", nodeIP),
"-http-addr", fmt.Sprintf("%s:8500", nodeIP),
"--",
"--log-level", "trace"},
"--log-level", envoyLogLevel},
ExposedPorts: []string{
fmt.Sprintf("%d/tcp", serviceBindPort), // Envoy Listener
"19000/tcp", // Envoy Admin Port

@ -89,7 +89,7 @@ func NewGatewayService(ctx context.Context, name string, kind string, node libno
fmt.Sprintf("-grpc-addr=%s:%d", nodeIP, 8502),
"-admin-bind", "0.0.0.0:19000",
"--",
"--log-level", "info"},
"--log-level", envoyLogLevel},
Env: map[string]string{"CONSUL_HTTP_ADDR": fmt.Sprintf("%s:%d", nodeIP, 8500)},
ExposedPorts: []string{
"8443/tcp", // Envoy Gateway Listener

@ -122,10 +122,17 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
})
t.Run("rotate exporting cluster's root CA", func(t *testing.T) {
config, meta, err := acceptingClient.Connect().CAGetConfig(&api.QueryOptions{})
// we will verify that the peering on the dialing side persists the updates CAs
peeringBefore, peerMeta, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{})
require.NoError(t, err)
t.Logf("%+v", config)
_, caMeta, err := acceptingClient.Connect().CAGetConfig(&api.QueryOptions{})
require.NoError(t, err)
// There should be one root cert
rootList, _, err := acceptingClient.Connect().CARoots(&api.QueryOptions{})
require.NoError(t, err)
require.Len(t, rootList.Roots, 1)
req := &api.CAConfig{
Provider: "consul",
@ -139,13 +146,22 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
// wait up to 30 seconds for the update
_, _, err = acceptingClient.Connect().CAGetConfig(&api.QueryOptions{
WaitIndex: meta.LastIndex,
WaitIndex: caMeta.LastIndex,
WaitTime: 30 * time.Second,
})
require.NoError(t, err)
// There should be two root certs now
rootList, _, err := acceptingClient.Connect().CARoots(&api.QueryOptions{})
// The peering object should reflect the update
peeringAfter, _, err := dialingClient.Peerings().Read(context.Background(), dialingPeerName, &api.QueryOptions{
WaitIndex: peerMeta.LastIndex,
WaitTime: 30 * time.Second,
})
require.NotEqual(t, peeringBefore.PeerCAPems, peeringAfter.PeerCAPems)
require.Len(t, peeringAfter.PeerCAPems, 2)
require.NoError(t, err)
// There should be two root certs now on the accepting side
rootList, _, err = acceptingClient.Connect().CARoots(&api.QueryOptions{})
require.NoError(t, err)
require.Len(t, rootList.Roots, 2)
@ -154,7 +170,6 @@ func TestPeering_RotateServerAndCAThenFail_(t *testing.T) {
libassert.HTTPServiceEchoes(t, "localhost", port)
verifySidecarHasTwoRootCAs(t, clientSidecarService)
})
t.Run("terminate exporting clusters servers and ensure imported services are still reachable", func(t *testing.T) {

Loading…
Cancel
Save