mirror of https://github.com/hashicorp/consul
peering: track exported services (#13784)
Signed-off-by: acpana <8968914+acpana@users.noreply.github.com>pull/13797/head
parent
d6dcef18c8
commit
a9ae2ff4fa
|
@ -616,7 +616,7 @@ func insertTestPeeringData(t *testing.T, store *state.Store, peer string, lastId
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
|
// TODO(peering): once we move away from leader only request for PeeringList, move this test to consul/server_test maybe
|
||||||
func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
func TestLeader_Peering_ImportedExportedServicesCount(t *testing.T) {
|
||||||
if testing.Short() {
|
if testing.Short() {
|
||||||
t.Skip("too slow for testing.Short")
|
t.Skip("too slow for testing.Short")
|
||||||
}
|
}
|
||||||
|
@ -747,10 +747,10 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
/// finished adding services
|
/// finished adding services
|
||||||
|
|
||||||
type testCase struct {
|
type testCase struct {
|
||||||
name string
|
name string
|
||||||
description string
|
description string
|
||||||
exportedService structs.ExportedServicesConfigEntry
|
exportedService structs.ExportedServicesConfigEntry
|
||||||
expectedImportedServicesCount uint64
|
expectedImportedExportedServicesCount uint64 // same count for a server that imports the services form a server that exports them
|
||||||
}
|
}
|
||||||
|
|
||||||
testCases := []testCase{
|
testCases := []testCase{
|
||||||
|
@ -770,7 +770,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedServicesCount: 4, // 3 services from above + the "consul" service
|
expectedImportedExportedServicesCount: 4, // 3 services from above + the "consul" service
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "no sync",
|
name: "no sync",
|
||||||
|
@ -778,7 +778,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
exportedService: structs.ExportedServicesConfigEntry{
|
exportedService: structs.ExportedServicesConfigEntry{
|
||||||
Name: "default",
|
Name: "default",
|
||||||
},
|
},
|
||||||
expectedImportedServicesCount: 0, // we want to see this decremented from 4 --> 0
|
expectedImportedExportedServicesCount: 0, // we want to see this decremented from 4 --> 0
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "just a, b services",
|
name: "just a, b services",
|
||||||
|
@ -804,7 +804,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedServicesCount: 2,
|
expectedImportedExportedServicesCount: 2,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "unexport b service",
|
name: "unexport b service",
|
||||||
|
@ -822,7 +822,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedServicesCount: 1,
|
expectedImportedExportedServicesCount: 1,
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "export c service",
|
name: "export c service",
|
||||||
|
@ -848,7 +848,7 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
expectedImportedServicesCount: 2,
|
expectedImportedExportedServicesCount: 2,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -866,11 +866,34 @@ func TestLeader_Peering_ImportedServicesCount(t *testing.T) {
|
||||||
lastIdx++
|
lastIdx++
|
||||||
require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService))
|
require.NoError(t, s1.fsm.State().EnsureConfigEntry(lastIdx, &tc.exportedService))
|
||||||
|
|
||||||
|
// Check that imported services count on S2 are what we expect
|
||||||
retry.Run(t, func(r *retry.R) {
|
retry.Run(t, func(r *retry.R) {
|
||||||
resp2, err := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
// on Read
|
||||||
|
resp, err := peeringClient2.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s1"})
|
||||||
require.NoError(r, err)
|
require.NoError(r, err)
|
||||||
|
require.NotNil(r, resp.Peering)
|
||||||
|
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ImportedServiceCount)
|
||||||
|
|
||||||
|
// on List
|
||||||
|
resp2, err2 := peeringClient2.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||||
|
require.NoError(r, err2)
|
||||||
require.NotEmpty(r, resp2.Peerings)
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
require.Equal(r, tc.expectedImportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
|
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ImportedServiceCount)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Check that exported services count on S1 are what we expect
|
||||||
|
retry.Run(t, func(r *retry.R) {
|
||||||
|
// on Read
|
||||||
|
resp, err := peeringClient.PeeringRead(ctx, &pbpeering.PeeringReadRequest{Name: "my-peer-s2"})
|
||||||
|
require.NoError(r, err)
|
||||||
|
require.NotNil(r, resp.Peering)
|
||||||
|
require.Equal(r, tc.expectedImportedExportedServicesCount, resp.Peering.ExportedServiceCount)
|
||||||
|
|
||||||
|
// on List
|
||||||
|
resp2, err2 := peeringClient.PeeringList(ctx, &pbpeering.PeeringListRequest{})
|
||||||
|
require.NoError(r, err2)
|
||||||
|
require.NotEmpty(r, resp2.Peerings)
|
||||||
|
require.Equal(r, tc.expectedImportedExportedServicesCount, resp2.Peerings[0].ExportedServiceCount)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,10 +36,14 @@ import (
|
||||||
// If there are no instances in the event, we consider that to be a de-registration.
|
// If there are no instances in the event, we consider that to be a de-registration.
|
||||||
func makeServiceResponse(
|
func makeServiceResponse(
|
||||||
logger hclog.Logger,
|
logger hclog.Logger,
|
||||||
|
mst *MutableStatus,
|
||||||
update cache.UpdateEvent,
|
update cache.UpdateEvent,
|
||||||
) (*pbpeerstream.ReplicationMessage_Response, error) {
|
) (*pbpeerstream.ReplicationMessage_Response, error) {
|
||||||
|
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
|
||||||
|
sn := structs.ServiceNameFromString(serviceName)
|
||||||
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
csn, ok := update.Result.(*pbservice.IndexedCheckServiceNodes)
|
||||||
if !ok {
|
if !ok {
|
||||||
|
logger.Error("did not increment or decrement exported services count", "service_name", serviceName)
|
||||||
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
|
return nil, fmt.Errorf("invalid type for service response: %T", update.Result)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -51,9 +55,6 @@ func makeServiceResponse(
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to marshal: %w", err)
|
return nil, fmt.Errorf("failed to marshal: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
serviceName := strings.TrimPrefix(update.CorrelationID, subExportedService)
|
|
||||||
|
|
||||||
// If no nodes are present then it's due to one of:
|
// If no nodes are present then it's due to one of:
|
||||||
// 1. The service is newly registered or exported and yielded a transient empty update.
|
// 1. The service is newly registered or exported and yielded a transient empty update.
|
||||||
// 2. All instances of the service were de-registered.
|
// 2. All instances of the service were de-registered.
|
||||||
|
@ -61,7 +62,10 @@ func makeServiceResponse(
|
||||||
//
|
//
|
||||||
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
// We don't distinguish when these three things occurred, but it's safe to send a DELETE Op in all cases, so we do that.
|
||||||
// Case #1 is a no-op for the importing peer.
|
// Case #1 is a no-op for the importing peer.
|
||||||
if len(export.Nodes) == 0 {
|
if len(csn.Nodes) == 0 {
|
||||||
|
logger.Trace("decrementing exported services count", "service_name", sn.String())
|
||||||
|
mst.RemoveExportedService(sn)
|
||||||
|
|
||||||
return &pbpeerstream.ReplicationMessage_Response{
|
return &pbpeerstream.ReplicationMessage_Response{
|
||||||
ResourceURL: pbpeerstream.TypeURLExportedService,
|
ResourceURL: pbpeerstream.TypeURLExportedService,
|
||||||
// TODO(peering): Nonce management
|
// TODO(peering): Nonce management
|
||||||
|
@ -71,6 +75,9 @@ func makeServiceResponse(
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger.Trace("incrementing exported services count", "service_name", sn.String())
|
||||||
|
mst.TrackExportedService(sn)
|
||||||
|
|
||||||
// If there are nodes in the response, we push them as an UPSERT operation.
|
// If there are nodes in the response, we push them as an UPSERT operation.
|
||||||
return &pbpeerstream.ReplicationMessage_Response{
|
return &pbpeerstream.ReplicationMessage_Response{
|
||||||
ResourceURL: pbpeerstream.TypeURLExportedService,
|
ResourceURL: pbpeerstream.TypeURLExportedService,
|
||||||
|
|
|
@ -424,7 +424,7 @@ func (s *Server) HandleStream(streamReq HandleStreamRequest) error {
|
||||||
var resp *pbpeerstream.ReplicationMessage_Response
|
var resp *pbpeerstream.ReplicationMessage_Response
|
||||||
switch {
|
switch {
|
||||||
case strings.HasPrefix(update.CorrelationID, subExportedService):
|
case strings.HasPrefix(update.CorrelationID, subExportedService):
|
||||||
resp, err = makeServiceResponse(logger, update)
|
resp, err = makeServiceResponse(logger, status, update)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
// Log the error and skip this response to avoid locking up peering due to a bad update event.
|
||||||
logger.Error("failed to create service response", "error", err)
|
logger.Error("failed to create service response", "error", err)
|
||||||
|
|
|
@ -22,6 +22,7 @@ import (
|
||||||
"google.golang.org/protobuf/types/known/anypb"
|
"google.golang.org/protobuf/types/known/anypb"
|
||||||
|
|
||||||
"github.com/hashicorp/consul/acl"
|
"github.com/hashicorp/consul/acl"
|
||||||
|
"github.com/hashicorp/consul/agent/cache"
|
||||||
"github.com/hashicorp/consul/agent/connect"
|
"github.com/hashicorp/consul/agent/connect"
|
||||||
"github.com/hashicorp/consul/agent/consul/state"
|
"github.com/hashicorp/consul/agent/consul/state"
|
||||||
"github.com/hashicorp/consul/agent/consul/stream"
|
"github.com/hashicorp/consul/agent/consul/stream"
|
||||||
|
@ -1006,6 +1007,53 @@ func (b *testStreamBackend) CatalogDeregister(req *structs.DeregisterRequest) er
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func Test_makeServiceResponse_ExportedServicesCount(t *testing.T) {
|
||||||
|
peerName := "billing"
|
||||||
|
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
|
||||||
|
|
||||||
|
srv, store := newTestServer(t, nil)
|
||||||
|
require.NoError(t, store.PeeringWrite(31, &pbpeering.Peering{
|
||||||
|
ID: peerID,
|
||||||
|
Name: peerName},
|
||||||
|
))
|
||||||
|
|
||||||
|
// connect the stream
|
||||||
|
mst, err := srv.Tracker.Connected(peerID)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
testutil.RunStep(t, "simulate an update to export a service", func(t *testing.T) {
|
||||||
|
update := cache.UpdateEvent{
|
||||||
|
CorrelationID: subExportedService + "api",
|
||||||
|
Result: &pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{
|
||||||
|
{
|
||||||
|
Service: &pbservice.NodeService{
|
||||||
|
ID: "api-1",
|
||||||
|
Service: "api",
|
||||||
|
PeerName: peerName,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}}
|
||||||
|
_, err := makeServiceResponse(srv.Logger, mst, update)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 1, mst.GetExportedServicesCount())
|
||||||
|
})
|
||||||
|
|
||||||
|
testutil.RunStep(t, "simulate a delete for an exported service", func(t *testing.T) {
|
||||||
|
update := cache.UpdateEvent{
|
||||||
|
CorrelationID: subExportedService + "api",
|
||||||
|
Result: &pbservice.IndexedCheckServiceNodes{
|
||||||
|
Nodes: []*pbservice.CheckServiceNode{},
|
||||||
|
}}
|
||||||
|
_, err := makeServiceResponse(srv.Logger, mst, update)
|
||||||
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
require.Equal(t, 0, mst.GetExportedServicesCount())
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
func Test_processResponse_Validation(t *testing.T) {
|
func Test_processResponse_Validation(t *testing.T) {
|
||||||
peerName := "billing"
|
peerName := "billing"
|
||||||
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
|
peerID := "1fabcd52-1d46-49b0-b1d8-71559aee47f5"
|
||||||
|
|
|
@ -166,9 +166,11 @@ type Status struct {
|
||||||
// - The last error message when receiving from the stream.
|
// - The last error message when receiving from the stream.
|
||||||
LastReceiveErrorMessage string
|
LastReceiveErrorMessage string
|
||||||
|
|
||||||
// TODO(peering): consider keeping track of imported service counts thru raft
|
// TODO(peering): consider keeping track of imported and exported services thru raft
|
||||||
// ImportedServices is set that keeps track of which service names are imported for the peer
|
// ImportedServices keeps track of which service names are imported for the peer
|
||||||
ImportedServices map[string]struct{}
|
ImportedServices map[string]struct{}
|
||||||
|
// ExportedServices keeps track of which service names a peer asks to export
|
||||||
|
ExportedServices map[string]struct{}
|
||||||
}
|
}
|
||||||
|
|
||||||
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
|
func newMutableStatus(now func() time.Time, connected bool) *MutableStatus {
|
||||||
|
@ -274,3 +276,28 @@ func (s *MutableStatus) GetImportedServicesCount() int {
|
||||||
|
|
||||||
return len(s.ImportedServices)
|
return len(s.ImportedServices)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s *MutableStatus) RemoveExportedService(sn structs.ServiceName) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
delete(s.ExportedServices, sn.String())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MutableStatus) TrackExportedService(sn structs.ServiceName) {
|
||||||
|
s.mu.Lock()
|
||||||
|
defer s.mu.Unlock()
|
||||||
|
|
||||||
|
if s.ExportedServices == nil {
|
||||||
|
s.ExportedServices = make(map[string]struct{})
|
||||||
|
}
|
||||||
|
|
||||||
|
s.ExportedServices[sn.String()] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *MutableStatus) GetExportedServicesCount() int {
|
||||||
|
s.mu.RLock()
|
||||||
|
defer s.mu.RUnlock()
|
||||||
|
|
||||||
|
return len(s.ExportedServices)
|
||||||
|
}
|
||||||
|
|
|
@ -346,6 +346,7 @@ func (s *Server) PeeringRead(ctx context.Context, req *pbpeering.PeeringReadRequ
|
||||||
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
|
s.Logger.Trace("did not find peer in stream tracker when reading peer", "peerID", peering.ID)
|
||||||
} else {
|
} else {
|
||||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
||||||
|
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
|
||||||
}
|
}
|
||||||
|
|
||||||
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
return &pbpeering.PeeringReadResponse{Peering: cp}, nil
|
||||||
|
@ -386,6 +387,7 @@ func (s *Server) PeeringList(ctx context.Context, req *pbpeering.PeeringListRequ
|
||||||
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
|
s.Logger.Trace("did not find peer in stream tracker when listing peers", "peerID", p.ID)
|
||||||
} else {
|
} else {
|
||||||
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
cp.ImportedServiceCount = uint64(len(st.ImportedServices))
|
||||||
|
cp.ExportedServiceCount = uint64(len(st.ExportedServices))
|
||||||
}
|
}
|
||||||
|
|
||||||
cPeerings = append(cPeerings, cp)
|
cPeerings = append(cPeerings, cp)
|
||||||
|
|
Loading…
Reference in New Issue