diff --git a/agent/consul/state/event_publisher.go b/agent/consul/state/event_publisher.go index b2ad5b5f36..6a5751f91c 100644 --- a/agent/consul/state/event_publisher.go +++ b/agent/consul/state/event_publisher.go @@ -176,7 +176,7 @@ func (s *subscriptions) handleACLUpdate(tx ReadTxn, event stream.Event) error { case stream.Topic_ACLTokens: token := event.Payload.(*structs.ACLToken) for _, sub := range s.byToken[token.SecretID] { - sub.ForceReload() + sub.Close() } case stream.Topic_ACLPolicies: @@ -220,7 +220,7 @@ func (s *subscriptions) closeSubscriptionsForTokens(tokens memdb.ResultIterator) token := token.(*structs.ACLToken) if subs, ok := s.byToken[token.SecretID]; ok { for _, sub := range subs { - sub.ForceReload() + sub.Close() } } } diff --git a/agent/consul/stream/subscription.go b/agent/consul/stream/subscription.go index a129ddef15..36e4bc5e0f 100644 --- a/agent/consul/stream/subscription.go +++ b/agent/consul/stream/subscription.go @@ -1,18 +1,20 @@ package stream import ( - context "context" + "context" "errors" "sync/atomic" ) const ( - // SubscriptionStateOpen is the default state of a subscription - SubscriptionStateOpen uint32 = 0 + // subscriptionStateOpen is the default state of a subscription. An open + // subscription may receive new events. + subscriptionStateOpen uint32 = 0 - // SubscriptionStateCloseReload signals that the subscription was closed by - // server and client should retry. - SubscriptionStateCloseReload uint32 = 1 + // subscriptionStateClosed indicates that the subscription was closed, possibly + // as a result of a change to an ACL token, and will not receive new events. + // The subscriber must issue a new Subscribe request. + subscriptionStateClosed uint32 = 1 ) var ( @@ -48,6 +50,8 @@ type Subscription struct { Unsubscribe func() } +// SubscribeRequest identifies the types of events the subscriber would like to +// receiver. Topic and Token are required. type SubscribeRequest struct { Topic Topic Key string @@ -55,7 +59,8 @@ type SubscribeRequest struct { Index uint64 } -// NewSubscription return a new subscription. +// NewSubscription return a new subscription. The caller is responsible for +// calling Unsubscribe when it is done with the subscription, to free resources. func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferItem) *Subscription { subCtx, cancel := context.WithCancel(ctx) return &Subscription{ @@ -69,8 +74,7 @@ func NewSubscription(ctx context.Context, req *SubscribeRequest, item *BufferIte // Next returns the next set of events to deliver. It must only be called from a // single goroutine concurrently as it mutates the Subscription. func (s *Subscription) Next() ([]Event, error) { - state := atomic.LoadUint32(&s.state) - if state == SubscriptionStateCloseReload { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { return nil, ErrSubscriptionReload } @@ -78,8 +82,7 @@ func (s *Subscription) Next() ([]Event, error) { next, err := s.currentItem.Next(s.ctx) if err != nil { // Check we didn't return because of a state change cancelling the context - state := atomic.LoadUint32(&s.state) - if state == SubscriptionStateCloseReload { + if atomic.LoadUint32(&s.state) == subscriptionStateClosed { return nil, ErrSubscriptionReload } return nil, err @@ -120,12 +123,11 @@ func (s *Subscription) Next() ([]Event, error) { } } -// ForceReload closes the stream and signals that the subscriber should reload. +// Close the subscription. Subscribers will receive an error when they call Next, +// and will need to perform a new Subscribe request. // It is safe to call from any goroutine. -func (s *Subscription) ForceReload() { - swapped := atomic.CompareAndSwapUint32(&s.state, SubscriptionStateOpen, - SubscriptionStateCloseReload) - +func (s *Subscription) Close() { + swapped := atomic.CompareAndSwapUint32(&s.state, subscriptionStateOpen, subscriptionStateClosed) if swapped { s.cancelFn() } diff --git a/agent/consul/stream/subscription_test.go b/agent/consul/stream/subscription_test.go index cd55785917..615f7f30cb 100644 --- a/agent/consul/stream/subscription_test.go +++ b/agent/consul/stream/subscription_test.go @@ -88,7 +88,7 @@ func TestSubscription(t *testing.T) { "Event should have been delivered after short time, took %s", elapsed) } -func TestSubscriptionCloseReload(t *testing.T) { +func TestSubscription_Close(t *testing.T) { eb := NewEventBuffer() index := uint64(100) @@ -118,11 +118,11 @@ func TestSubscriptionCloseReload(t *testing.T) { require.Len(t, got, 1) require.Equal(t, index, got[0].Index) - // Schedule a ForceReload simulating the server deciding this subscroption + // Schedule a Close simulating the server deciding this subscroption // needs to reset (e.g. on ACL perm change). start = time.Now() time.AfterFunc(200*time.Millisecond, func() { - sub.ForceReload() + sub.Close() }) _, err = sub.Next()