mirror of https://github.com/hashicorp/consul
stream: close all subs when EventProcessor is shutdown.
parent
a99a4103bd
commit
ed69feca6d
|
@ -103,8 +103,7 @@ func (e *EventPublisher) handleUpdates(ctx context.Context) {
|
|||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
// TODO: also close all subscriptions so the subscribers are moved
|
||||
// to the new publisher?
|
||||
e.subscriptions.closeAll()
|
||||
return
|
||||
case update := <-e.publishCh:
|
||||
e.sendEvents(update)
|
||||
|
@ -249,6 +248,17 @@ func (s *subscriptions) unsubscribe(req *SubscribeRequest) func() {
|
|||
}
|
||||
}
|
||||
|
||||
func (s *subscriptions) closeAll() {
|
||||
s.lock.Lock()
|
||||
defer s.lock.Unlock()
|
||||
|
||||
for _, byRequest := range s.byToken {
|
||||
for _, sub := range byRequest {
|
||||
sub.forceClose()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *EventPublisher) getSnapshotLocked(req *SubscribeRequest, topicHead *bufferItem) (*eventSnapshot, error) {
|
||||
topicSnaps, ok := e.snapCache[req.Topic]
|
||||
if !ok {
|
||||
|
|
|
@ -111,3 +111,45 @@ func assertNoResult(t *testing.T, eventCh <-chan subNextResult) {
|
|||
case <-time.After(100 * time.Millisecond):
|
||||
}
|
||||
}
|
||||
|
||||
func TestEventPublisher_ShutdownClosesSubscriptions(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
t.Cleanup(cancel)
|
||||
|
||||
handlers := newTestSnapshotHandlers()
|
||||
fn := func(req *SubscribeRequest, buf SnapshotAppender) (uint64, error) {
|
||||
return 0, nil
|
||||
}
|
||||
handlers[intTopic(22)] = fn
|
||||
handlers[intTopic(33)] = fn
|
||||
|
||||
publisher := NewEventPublisher(ctx, handlers, time.Second)
|
||||
|
||||
sub1, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(22)})
|
||||
require.NoError(t, err)
|
||||
defer sub1.Unsubscribe()
|
||||
|
||||
sub2, err := publisher.Subscribe(&SubscribeRequest{Topic: intTopic(33)})
|
||||
require.NoError(t, err)
|
||||
defer sub2.Unsubscribe()
|
||||
|
||||
cancel() // Shutdown
|
||||
|
||||
err = consumeSub(context.Background(), sub1)
|
||||
require.Equal(t, err, ErrSubscriptionClosed)
|
||||
|
||||
_, err = sub2.Next(context.Background())
|
||||
require.Equal(t, err, ErrSubscriptionClosed)
|
||||
}
|
||||
|
||||
func consumeSub(ctx context.Context, sub *Subscription) error {
|
||||
for {
|
||||
events, err := sub.Next(ctx)
|
||||
switch {
|
||||
case err != nil:
|
||||
return err
|
||||
case len(events) == 1 && events[0].IsEndOfSnapshot():
|
||||
continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue