Browse Source

Avoid deleting peerings marked as terminated.

When our peer deletes the peering it is locally marked as terminated.
This termination should kick off deleting all imported data, but should
not delete the peering object itself.

Keeping peerings marked as terminated acts as a signal that the action
took place.
pull/13445/head
freddygv 2 years ago
parent
commit
f3843809da
  1. 33
      agent/consul/leader_peering.go
  2. 4
      agent/consul/leader_peering_test.go
  3. 5
      agent/peering_endpoint_test.go
  4. 2
      agent/rpc/peering/service_test.go

33
agent/consul/leader_peering.go

@ -120,7 +120,7 @@ func (s *Server) syncPeeringsAndBlock(ctx context.Context, logger hclog.Logger,
logger.Trace("evaluating stored peer", "peer", peer.Name, "should_dial", peer.ShouldDial(), "sequence_id", seq)
if !peer.IsActive() {
// The peering is marked for deletion, no need to dial or track them.
// The peering was marked for deletion by ourselves or our peer, no need to dial or track them.
continue
}
@ -334,22 +334,23 @@ func (s *Server) runPeeringDeletions(ctx context.Context) error {
}
for _, p := range peerings {
s.removePeeringAndData(ctx, logger, raftLimiter, p.Name, acl.PartitionOrDefault(p.Partition))
s.removePeeringAndData(ctx, logger, raftLimiter, p)
}
}
}
// removepPeeringAndData removes data imported for a peering and the peering itself.
func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer string, partition string) {
logger = logger.With("peer", peer, "partition", partition)
func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger, limiter *rate.Limiter, peer *pbpeering.Peering) {
logger = logger.With("peer_name", peer.Name, "peer_id", peer.ID)
entMeta := *structs.NodeEnterpriseMetaInPartition(peer.Partition)
// First delete all imported data.
// By deleting all imported nodes we also delete all services and checks registered on them.
if err := s.deleteAllNodes(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil {
if err := s.deleteAllNodes(ctx, limiter, entMeta, peer.Name); err != nil {
logger.Error("Failed to remove Nodes for peer", "error", err)
return
}
if err := s.deleteTrustBundleFromPeer(ctx, limiter, *structs.NodeEnterpriseMetaInPartition(partition), peer); err != nil {
if err := s.deleteTrustBundleFromPeer(ctx, limiter, entMeta, peer.Name); err != nil {
logger.Error("Failed to remove trust bundle for peer", "error", err)
return
}
@ -358,12 +359,18 @@ func (s *Server) removePeeringAndData(ctx context.Context, logger hclog.Logger,
return
}
if peer.State == pbpeering.PeeringState_TERMINATED {
// For peerings terminated by our peer we only clean up the local data, we do not delete the peering itself.
// This is to avoid a situation where the peering disappears without the local operator's knowledge.
return
}
// Once all imported data is deleted, the peering itself is also deleted.
req := pbpeering.PeeringDeleteRequest{
Name: peer,
Partition: partition,
req := &pbpeering.PeeringDeleteRequest{
Name: peer.Name,
Partition: acl.PartitionOrDefault(peer.Partition),
}
_, err := s.raftApplyProtobuf(structs.PeeringDeleteType, &req)
_, err := s.raftApplyProtobuf(structs.PeeringDeleteType, req)
if err != nil {
logger.Error("failed to apply full peering deletion", "error", err)
return
@ -402,7 +409,7 @@ func (s *Server) deleteAllNodes(ctx context.Context, limiter *rate.Limiter, entM
ops = append(ops, &op)
// Add entries to the transaction until it reaches the max batch size
batchSize += len(entry.Node) + len(entry.Partition)
batchSize += len(entry.Node) + len(entry.Partition) + len(entry.PeerName)
}
// Send each batch as a TXN Req to avoid sending one at a time
@ -441,10 +448,10 @@ func (s *Server) deleteTrustBundleFromPeer(ctx context.Context, limiter *rate.Li
return err
}
req := pbpeering.PeeringTrustBundleDeleteRequest{
req := &pbpeering.PeeringTrustBundleDeleteRequest{
Name: peerName,
Partition: entMeta.PartitionOrDefault(),
}
_, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, &req)
_, err = s.raftApplyProtobuf(structs.PeeringTrustBundleDeleteType, req)
return err
}

4
agent/consul/leader_peering_test.go

@ -109,7 +109,7 @@ func TestLeader_PeeringSync_Lifecycle_ClientDeletion(t *testing.T) {
Value: "my-peer-s2",
})
require.NoError(r, err)
require.Nil(r, peering)
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)
})
}
@ -198,7 +198,7 @@ func TestLeader_PeeringSync_Lifecycle_ServerDeletion(t *testing.T) {
Value: "my-peer-s1",
})
require.NoError(r, err)
require.Nil(r, peering)
require.Equal(r, pbpeering.PeeringState_TERMINATED, peering.State)
})
}

5
agent/peering_endpoint_test.go

@ -12,12 +12,13 @@ import (
"testing"
"time"
"github.com/stretchr/testify/require"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/proto/pbpeering"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/stretchr/testify/require"
)
var validCA = `
@ -346,7 +347,7 @@ func TestHTTP_Peering_Delete(t *testing.T) {
t.Run("now the token is deleted and reads should yield a 404", func(t *testing.T) {
retry.Run(t, func(r *retry.R) {
req, err := http.NewRequest("GET", "/v1/peering/foo", nil)
require.NoError(t, err)
require.NoError(r, err)
resp := httptest.NewRecorder()
a.srv.h.ServeHTTP(resp, req)
require.Equal(r, http.StatusNotFound, resp.Code)

2
agent/rpc/peering/service_test.go

@ -11,7 +11,6 @@ import (
"testing"
"time"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-uuid"
"github.com/stretchr/testify/require"
@ -36,6 +35,7 @@ import (
"github.com/hashicorp/consul/proto/prototest"
"github.com/hashicorp/consul/sdk/freeport"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/hashicorp/consul/testrpc"
"github.com/hashicorp/consul/tlsutil"
"github.com/hashicorp/consul/types"

Loading…
Cancel
Save