mirror of https://github.com/hashicorp/consul
150 lines
4.7 KiB
Go
150 lines
4.7 KiB
Go
|
package peering
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"errors"
|
||
|
"fmt"
|
||
|
"time"
|
||
|
|
||
|
"github.com/hashicorp/go-hclog"
|
||
|
"github.com/hashicorp/go-memdb"
|
||
|
|
||
|
"github.com/hashicorp/consul/agent/cache"
|
||
|
"github.com/hashicorp/consul/agent/structs"
|
||
|
"github.com/hashicorp/consul/agent/submatview"
|
||
|
"github.com/hashicorp/consul/lib/retry"
|
||
|
"github.com/hashicorp/consul/proto/pbservice"
|
||
|
)
|
||
|
|
||
|
type MaterializedViewStore interface {
|
||
|
Get(ctx context.Context, req submatview.Request) (submatview.Result, error)
|
||
|
Notify(ctx context.Context, req submatview.Request, cID string, ch chan<- cache.UpdateEvent) error
|
||
|
}
|
||
|
|
||
|
type SubscriptionBackend interface {
|
||
|
Subscriber
|
||
|
Store() Store
|
||
|
}
|
||
|
|
||
|
// subscriptionManager handlers requests to subscribe to events from an events publisher.
|
||
|
type subscriptionManager struct {
|
||
|
logger hclog.Logger
|
||
|
viewStore MaterializedViewStore
|
||
|
backend SubscriptionBackend
|
||
|
|
||
|
// watchedServices is a map of exported services to a cancel function for their subscription notifier.
|
||
|
watchedServices map[structs.ServiceName]context.CancelFunc
|
||
|
}
|
||
|
|
||
|
// TODO(peering): Maybe centralize so that there is a single manager per datacenter, rather than per peering.
|
||
|
func newSubscriptionManager(ctx context.Context, logger hclog.Logger, backend SubscriptionBackend) *subscriptionManager {
|
||
|
logger = logger.Named("subscriptions")
|
||
|
store := submatview.NewStore(logger.Named("viewstore"))
|
||
|
go store.Run(ctx)
|
||
|
|
||
|
return &subscriptionManager{
|
||
|
logger: logger,
|
||
|
viewStore: store,
|
||
|
backend: backend,
|
||
|
watchedServices: make(map[structs.ServiceName]context.CancelFunc),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// subscribe returns a channel that will contain updates to exported service instances for a given peer.
|
||
|
func (m *subscriptionManager) subscribe(ctx context.Context, peerID string) <-chan cache.UpdateEvent {
|
||
|
updateCh := make(chan cache.UpdateEvent, 1)
|
||
|
go m.syncSubscriptions(ctx, peerID, updateCh)
|
||
|
|
||
|
return updateCh
|
||
|
}
|
||
|
|
||
|
func (m *subscriptionManager) syncSubscriptions(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) {
|
||
|
waiter := &retry.Waiter{
|
||
|
MinFailures: 1,
|
||
|
Factor: 500 * time.Millisecond,
|
||
|
MaxWait: 60 * time.Second,
|
||
|
Jitter: retry.NewJitter(100),
|
||
|
}
|
||
|
|
||
|
for {
|
||
|
if err := m.syncSubscriptionsAndBlock(ctx, peerID, updateCh); err != nil {
|
||
|
m.logger.Error("failed to sync subscriptions", "error", err)
|
||
|
}
|
||
|
|
||
|
if err := waiter.Wait(ctx); err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, context.DeadlineExceeded) {
|
||
|
m.logger.Error("failed to wait before re-trying sync", "error", err)
|
||
|
}
|
||
|
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
default:
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// syncSubscriptionsAndBlock ensures that the subscriptions to the subscription backend
|
||
|
// match the list of services exported to the peer.
|
||
|
func (m *subscriptionManager) syncSubscriptionsAndBlock(ctx context.Context, peerID string, updateCh chan<- cache.UpdateEvent) error {
|
||
|
store := m.backend.Store()
|
||
|
|
||
|
ws := memdb.NewWatchSet()
|
||
|
ws.Add(store.AbandonCh())
|
||
|
ws.Add(ctx.Done())
|
||
|
|
||
|
// Get exported services for peer id
|
||
|
_, services, err := store.ExportedServicesForPeer(ws, peerID)
|
||
|
if err != nil {
|
||
|
return fmt.Errorf("failed to watch exported services for peer %q: %w", peerID, err)
|
||
|
}
|
||
|
|
||
|
// seen contains the set of exported service names and is used to reconcile the list of watched services.
|
||
|
seen := make(map[structs.ServiceName]struct{})
|
||
|
|
||
|
// Ensure there is a subscription for each service exported to the peer.
|
||
|
for _, svc := range services {
|
||
|
seen[svc] = struct{}{}
|
||
|
|
||
|
if _, ok := m.watchedServices[svc]; ok {
|
||
|
// Exported service is already being watched, nothing to do.
|
||
|
continue
|
||
|
}
|
||
|
|
||
|
notifyCtx, cancel := context.WithCancel(ctx)
|
||
|
m.watchedServices[svc] = cancel
|
||
|
|
||
|
if err := m.Notify(notifyCtx, svc, updateCh); err != nil {
|
||
|
m.logger.Error("failed to subscribe to service", "service", svc.String())
|
||
|
continue
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// For every subscription without an exported service, call the associated cancel fn.
|
||
|
for svc, cancel := range m.watchedServices {
|
||
|
if _, ok := seen[svc]; !ok {
|
||
|
cancel()
|
||
|
|
||
|
// Send an empty event to the stream handler to trigger sending a DELETE message.
|
||
|
// Cancelling the subscription context above is necessary, but does not yield a useful signal on its own.
|
||
|
updateCh <- cache.UpdateEvent{
|
||
|
CorrelationID: subExportedService + svc.String(),
|
||
|
Result: &pbservice.IndexedCheckServiceNodes{},
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Block for any changes to the state store.
|
||
|
ws.WatchCh(ctx)
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
const (
|
||
|
subExportedService = "exported-service:"
|
||
|
)
|
||
|
|
||
|
// Notify the given channel when there are updates to the requested service.
|
||
|
func (m *subscriptionManager) Notify(ctx context.Context, svc structs.ServiceName, updateCh chan<- cache.UpdateEvent) error {
|
||
|
sr := newExportedServiceRequest(m.logger, svc, m.backend)
|
||
|
return m.viewStore.Notify(ctx, sr, subExportedService+svc.String(), updateCh)
|
||
|
}
|