Consul is a distributed, highly available, and data center aware solution to connect and configure applications across dynamic, distributed infrastructure.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

157 lines
4.2 KiB

package routine
import (
"context"
"sync/atomic"
"testing"
"github.com/hashicorp/consul/sdk/testutil"
"github.com/hashicorp/consul/sdk/testutil/retry"
"github.com/stretchr/testify/require"
)
func TestManager(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
mgr := NewManager(testutil.Logger(t))
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
defer atomic.StoreUint32(&running, 0)
atomic.AddUint32(&runs, 1)
<-ctx.Done()
return nil
}
// IsRunning on unregistered service should be false
require.False(t, mgr.IsRunning("not-found"))
// start
require.NoError(t, mgr.Start(context.Background(), "run", run))
require.True(t, mgr.IsRunning("run"))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
doneCh := mgr.Stop("run")
require.NotNil(t, doneCh)
<-doneCh
// ensure the background go routine was actually cancelled
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
})
// restart and stop
require.NoError(t, mgr.Start(context.Background(), "run", run))
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(2), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
doneCh = mgr.Stop("run")
require.NotNil(t, doneCh)
<-doneCh
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
})
// start with a context
ctx, cancel := context.WithCancel(context.Background())
require.NoError(t, mgr.Start(ctx, "run", run))
cancel()
// The function should exit of its own accord due to the parent
// context being canceled
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(3), atomic.LoadUint32(&runs))
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
// the task should automatically set itself to not running if
// it exits early
require.False(r, mgr.IsRunning("run"))
})
}
func TestManager_StopAll(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
mgr := NewManager(testutil.Logger(t))
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
defer atomic.StoreUint32(&running, 0)
atomic.AddUint32(&runs, 1)
<-ctx.Done()
return nil
}
require.NoError(t, mgr.Start(context.Background(), "run1", run))
require.NoError(t, mgr.Start(context.Background(), "run2", run))
mgr.StopAll()
retry.Run(t, func(r *retry.R) {
require.False(r, mgr.IsRunning("run1"))
require.False(r, mgr.IsRunning("run2"))
})
}
// Test IsRunning when routine is a blocking call that does not
// immediately return when cancelled
func TestManager_StopBlocking(t *testing.T) {
t.Parallel()
var runs uint32
var running uint32
unblock := make(chan struct{}) // To simulate a blocking call
mgr := NewManager(testutil.Logger(t))
// A routine that will be still running for a while after cancelled
run := func(ctx context.Context) error {
atomic.StoreUint32(&running, 1)
defer atomic.StoreUint32(&running, 0)
atomic.AddUint32(&runs, 1)
<-ctx.Done()
<-unblock
return nil
}
require.NoError(t, mgr.Start(context.Background(), "blocking", run))
retry.Run(t, func(r *retry.R) {
require.True(r, mgr.IsRunning("blocking"))
require.Equal(r, uint32(1), atomic.LoadUint32(&runs))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
doneCh := mgr.Stop("blocking")
// IsRunning should return false, however &running is still 1
retry.Run(t, func(r *retry.R) {
require.False(r, mgr.IsRunning("blocking"))
require.Equal(r, uint32(1), atomic.LoadUint32(&running))
})
// New routine should be able to replace old "cancelled but running" routine.
require.NoError(t, mgr.Start(context.Background(), "blocking", func(ctx context.Context) error {
<-ctx.Done()
return nil
}))
defer mgr.Stop("blocking")
retry.Run(t, func(r *retry.R) {
require.True(r, mgr.IsRunning("blocking")) // New routine
require.Equal(r, uint32(1), atomic.LoadUint32(&running)) // Old routine
})
// Complete the blocking routine
close(unblock)
<-doneCh
retry.Run(t, func(r *retry.R) {
require.Equal(r, uint32(0), atomic.LoadUint32(&running))
})
}