@ -942,13 +942,10 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
ctx , cancel := context . WithTimeout ( ctx , timeout )
defer cancel ( )
// instrument blockingQueries
// atomic inc our server's count of in-flight blockingQueries and store the new value
queriesBlocking := atomic . AddUint64 ( & s . queriesBlocking , 1 )
// atomic dec when we return from blockingQuery()
count := atomic . AddUint64 ( & s . queriesBlocking , 1 )
metrics . SetGauge ( [ ] string { "rpc" , "queries_blocking" } , float32 ( count ) )
// decrement the count when the function returns.
defer atomic . AddUint64 ( & s . queriesBlocking , ^ uint64 ( 0 ) )
// set the gauge directly to the new value of s.blockingQueries
metrics . SetGauge ( [ ] string { "rpc" , "queries_blocking" } , float32 ( queriesBlocking ) )
for {
if queryOpts . GetRequireConsistent ( ) {
@ -977,24 +974,17 @@ func (s *Server) blockingQuery(queryOpts structs.QueryOptionsCompat, queryMeta s
return nil
}
// block up to the timeout if we don't see anything fresh.
if err := ws . WatchCtx ( ctx ) ; err == nil {
// a non-nil error only occurs when the context is cancelled
// If a restore may have woken us up then bail out from
// the query immediately. This is slightly race-ey since
// this might have been interrupted for other reasons,
// but it's OK to kick it back to the caller in either
// case.
select {
case <- state . AbandonCh ( ) :
return nil
default :
}
// block until something changes, or the timeout
if err := ws . WatchCtx ( ctx ) ; err != nil {
// exit if we've reached the timeout, or other cancellation
return nil
}
if ctx . Err ( ) != nil {
// exit if the state store has been abandoned
select {
case <- state . AbandonCh ( ) :
return nil
default :
}
}
}