package api
import (
"log"
"net/http"
"net/http/httptest"
"net/http/httputil"
"strings"
"sync"
"testing"
"time"
)
func createTestSemaphore ( t * testing . T , c * Client , prefix string , limit int ) ( * Semaphore , * Session ) {
t . Helper ( )
session := c . Session ( )
se := & SessionEntry {
Name : DefaultSemaphoreSessionName ,
TTL : DefaultSemaphoreSessionTTL ,
Behavior : SessionBehaviorDelete ,
}
id , _ , err := session . CreateNoChecks ( se , nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
opts := & SemaphoreOptions {
Prefix : prefix ,
Limit : limit ,
Session : id ,
SessionName : se . Name ,
SessionTTL : se . TTL ,
}
sema , err := c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
return sema , session
}
func TestAPI_SemaphoreAcquireRelease ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
// Initial release should fail
err := sema . Release ( )
if err != ErrSemaphoreNotHeld {
t . Fatalf ( "err: %v" , err )
}
// Should work
lockCh , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if lockCh == nil {
t . Fatalf ( "not hold" )
}
// Double lock should fail
_ , err = sema . Acquire ( nil )
if err != ErrSemaphoreHeld {
t . Fatalf ( "err: %v" , err )
}
// Should be held
select {
case <- lockCh :
t . Fatalf ( "should be held" )
default :
}
// Initial release should work
err = sema . Release ( )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Double unlock should fail
err = sema . Release ( )
if err != ErrSemaphoreNotHeld {
t . Fatalf ( "err: %v" , err )
}
// Should lose resource
select {
case <- lockCh :
case <- time . After ( time . Second ) :
t . Fatalf ( "should not be held" )
}
}
func TestAPI_SemaphoreForceInvalidate ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
// Should work
lockCh , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if lockCh == nil {
t . Fatalf ( "not acquired" )
}
defer sema . Release ( )
go func ( ) {
// Nuke the session, simulator an operator invalidation
// or a health check failure
session := c . Session ( )
session . Destroy ( sema . lockSession , nil )
} ( )
// Should loose slot
select {
case <- lockCh :
case <- time . After ( time . Second ) :
t . Fatalf ( "should not be locked" )
}
}
func TestAPI_SemaphoreDeleteKey ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
// Should work
lockCh , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if lockCh == nil {
t . Fatalf ( "not locked" )
}
defer sema . Release ( )
go func ( ) {
// Nuke the key, simulate an operator intervention
kv := c . KV ( )
kv . DeleteTree ( "test/semaphore" , nil )
} ( )
// Should loose leadership
select {
case <- lockCh :
case <- time . After ( time . Second ) :
t . Fatalf ( "should not be locked" )
}
}
func TestAPI_SemaphoreContend ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
wg := & sync . WaitGroup { }
acquired := make ( [ ] bool , 4 )
for idx := range acquired {
wg . Add ( 1 )
go func ( idx int ) {
defer wg . Done ( )
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
// Should work eventually, will contend
lockCh , err := sema . Acquire ( nil )
if err != nil {
t . Errorf ( "err: %v" , err )
return
}
if lockCh == nil {
t . Errorf ( "not locked" )
return
}
defer sema . Release ( )
log . Printf ( "Contender %d acquired" , idx )
// Set acquired and then leave
acquired [ idx ] = true
} ( idx )
}
// Wait for termination
doneCh := make ( chan struct { } )
go func ( ) {
wg . Wait ( )
close ( doneCh )
} ( )
// Wait for everybody to get a turn
select {
case <- doneCh :
case <- time . After ( 3 * DefaultLockRetryTime ) :
t . Fatalf ( "timeout" )
}
for idx , did := range acquired {
if ! did {
t . Fatalf ( "contender %d never acquired" , idx )
}
}
}
func TestAPI_SemaphoreBadLimit ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
_ , err := c . SemaphorePrefix ( "test/semaphore" , 0 )
if err == nil {
t . Fatalf ( "should error, limit must be positive" )
}
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 1 )
defer session . Destroy ( sema . opts . Session , nil )
_ , err = sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
sema2 , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
_ , err = sema2 . Acquire ( nil )
if err . Error ( ) != "semaphore limit conflict (lock: 1, local: 2)" {
t . Fatalf ( "err: %v" , err )
}
}
func TestAPI_SemaphoreDestroy ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
sema , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
sema2 , session := createTestSemaphore ( t , c , "test/semaphore" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
_ , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
_ , err = sema2 . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Destroy should fail, still held
if err := sema . Destroy ( ) ; err != ErrSemaphoreHeld {
t . Fatalf ( "err: %v" , err )
}
err = sema . Release ( )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Destroy should fail, still in use
if err := sema . Destroy ( ) ; err != ErrSemaphoreInUse {
t . Fatalf ( "err: %v" , err )
}
err = sema2 . Release ( )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Destroy should work
if err := sema . Destroy ( ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
// Destroy should work
if err := sema2 . Destroy ( ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
}
func TestAPI_SemaphoreConflict ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
lock , session := createTestLock ( t , c , "test/sema/.lock" )
defer session . Destroy ( lock . opts . Session , nil )
// Should work
leaderCh , err := lock . Lock ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if leaderCh == nil {
t . Fatalf ( "not leader" )
}
defer lock . Unlock ( )
sema , session := createTestSemaphore ( t , c , "test/sema/" , 2 )
defer session . Destroy ( sema . opts . Session , nil )
// Should conflict with lock
_ , err = sema . Acquire ( nil )
if err != ErrSemaphoreConflict {
t . Fatalf ( "err: %v" , err )
}
// Should conflict with lock
err = sema . Destroy ( )
if err != ErrSemaphoreConflict {
t . Fatalf ( "err: %v" , err )
}
}
func TestAPI_SemaphoreMonitorRetry ( t * testing . T ) {
t . Parallel ( )
raw , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
// Set up a server that always responds with 500 errors.
failer := func ( w http . ResponseWriter , req * http . Request ) {
w . WriteHeader ( 500 )
}
outage := httptest . NewServer ( http . HandlerFunc ( failer ) )
defer outage . Close ( )
// Set up a reverse proxy that will send some requests to the
// 500 server and pass everything else through to the real Consul
// server.
var mutex sync . Mutex
errors := 0
director := func ( req * http . Request ) {
mutex . Lock ( )
defer mutex . Unlock ( )
req . URL . Scheme = "http"
if errors > 0 && req . Method == "GET" && strings . Contains ( req . URL . Path , "/v1/kv/test/sema/.lock" ) {
req . URL . Host = outage . URL [ 7 : ] // Strip off "http://".
errors --
} else {
req . URL . Host = raw . config . Address
}
}
proxy := httptest . NewServer ( & httputil . ReverseProxy { Director : director } )
defer proxy . Close ( )
// Make another client that points at the proxy instead of the real
// Consul server.
config := raw . config
config . Address = proxy . URL [ 7 : ] // Strip off "http://".
c , err := NewClient ( & config )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Set up a lock with retries enabled.
opts := & SemaphoreOptions {
Prefix : "test/sema/.lock" ,
Limit : 2 ,
SessionTTL : "60s" ,
MonitorRetries : 3 ,
}
sema , err := c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Make sure the default got set.
if sema . opts . MonitorRetryTime != DefaultMonitorRetryTime {
t . Fatalf ( "bad: %d" , sema . opts . MonitorRetryTime )
}
// Now set a custom time for the test.
opts . MonitorRetryTime = 250 * time . Millisecond
sema , err = c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if sema . opts . MonitorRetryTime != 250 * time . Millisecond {
t . Fatalf ( "bad: %d" , sema . opts . MonitorRetryTime )
}
// Should get the lock.
ch , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if ch == nil {
t . Fatalf ( "didn't acquire" )
}
// Take the semaphore using the raw client to force the monitor to wake
// up and check the lock again. This time we will return errors for some
// of the responses.
mutex . Lock ( )
errors = 2
mutex . Unlock ( )
another , err := raw . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if _ , err := another . Acquire ( nil ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
time . Sleep ( 5 * opts . MonitorRetryTime )
// Should still have the semaphore.
select {
case <- ch :
t . Fatalf ( "lost the semaphore" )
default :
}
// Now return an overwhelming number of errors, using the raw client to
// poke the key and get the monitor to run again.
mutex . Lock ( )
errors = 10
mutex . Unlock ( )
if err := another . Release ( ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
time . Sleep ( 5 * opts . MonitorRetryTime )
// Should lose the semaphore.
select {
case <- ch :
case <- time . After ( time . Second ) :
t . Fatalf ( "should not have the semaphore" )
}
}
func TestAPI_SemaphoreOneShot ( t * testing . T ) {
t . Parallel ( )
c , s := makeClient ( t )
defer s . Stop ( )
s . WaitForSerfCheck ( t )
// Set up a semaphore as a one-shot.
opts := & SemaphoreOptions {
Prefix : "test/sema/.lock" ,
Limit : 2 ,
SemaphoreTryOnce : true ,
}
sema , err := c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
// Make sure the default got set.
if sema . opts . SemaphoreWaitTime != DefaultSemaphoreWaitTime {
t . Fatalf ( "bad: %d" , sema . opts . SemaphoreWaitTime )
}
// Now set a custom time for the test.
opts . SemaphoreWaitTime = 250 * time . Millisecond
sema , err = c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if sema . opts . SemaphoreWaitTime != 250 * time . Millisecond {
t . Fatalf ( "bad: %d" , sema . opts . SemaphoreWaitTime )
}
// Should acquire the semaphore.
ch , err := sema . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if ch == nil {
t . Fatalf ( "should have acquired the semaphore" )
}
// Try with another session.
another , err := c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
ch , err = another . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if ch == nil {
t . Fatalf ( "should have acquired the semaphore" )
}
// Try with a third one that shouldn't get it.
contender , err := c . SemaphoreOpts ( opts )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
start := time . Now ( )
ch , err = contender . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if ch != nil {
t . Fatalf ( "should not have acquired the semaphore" )
}
diff := time . Since ( start )
if diff < contender . opts . SemaphoreWaitTime {
t . Fatalf ( "time out of bounds: %9.6f" , diff . Seconds ( ) )
}
// Give up a slot and make sure the third one can get it.
if err := another . Release ( ) ; err != nil {
t . Fatalf ( "err: %v" , err )
}
ch , err = contender . Acquire ( nil )
if err != nil {
t . Fatalf ( "err: %v" , err )
}
if ch == nil {
t . Fatalf ( "should have acquired the semaphore" )
}
}