@ -50,26 +50,30 @@ type UpdateEvent struct {
// value that allows them to disambiguate between events in the returned chan
// when sharing a chan between multiple cache entries. If the chan is closed,
// the notify loop will terminate.
func ( c * Cache ) Notify ( ctx context . Context , t string , r Request ,
correlationID string , ch chan <- UpdateEvent ) error {
// Get the type that we're fetching
func ( c * Cache ) Notify (
ctx context . Context ,
t string ,
r Request ,
correlationID string ,
ch chan <- UpdateEvent ,
) error {
c . typesLock . RLock ( )
tEntry , ok := c . types [ t ]
c . typesLock . RUnlock ( )
if ! ok {
return fmt . Errorf ( "unknown type in cache: %s" , t )
}
if tEntry . Type . SupportsBlocking ( ) {
go c . notifyBlockingQuery ( ctx , tEntry , r , correlationID , ch )
} else {
return nil
}
info := r . CacheInfo ( )
if info . MaxAge == 0 {
return fmt . Errorf ( "Cannot use Notify for polling cache types without specifying the MaxAge" )
}
go c . notifyPollingQuery ( ctx , tEntry , r , correlationID , ch , info . MaxAge )
}
return nil
}
@ -107,10 +111,10 @@ func (c *Cache) notifyBlockingQuery(ctx context.Context, tEntry typeEntry, r Req
index = meta . Index
}
var wait time . Duration
// Handle errors with backoff. Badly behaved blocking calls that returned
// a zero index are considered as failures since we need to not get stuck
// in a busy loop.
wait := 0 * time . Second
if err == nil && meta . Index > 0 {
failures = 0
} else {
@ -173,6 +177,7 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
failures ++
}
var wait time . Duration
// Determining how long to wait before the next poll is complicated.
// First off the happy path and the error path waits are handled distinctly
//
@ -194,23 +199,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// as this would eliminate the single-flighting of these requests in the cache and
// the efficiencies gained by it.
if failures > 0 {
errWait := backOffWait ( failures )
select {
case <- time . After ( errWait ) :
case <- ctx . Done ( ) :
return
}
wait = backOffWait ( failures )
} else {
// Default to immediately re-poll. This only will happen if the data
// we just got out of the cache is already too stale
pollWait := 0 * time . Second
// Calculate when the cached data's Age will get too stale and
// need to be re-queried. When the data's Age already exceeds the
// maxAge the pollWait value is left at 0 to immediately re-poll
if meta . Age <= maxAge {
pollW ait = maxAge - meta . Age
wait = maxAge - meta . Age
}
// Add a small amount of random jitter to the polling time. One
@ -222,13 +217,13 @@ func (c *Cache) notifyPollingQuery(ctx context.Context, tEntry typeEntry, r Requ
// and then immediately have to re-fetch again. That wouldn't
// be terrible but it would expend a bunch more cpu cycles when
// we can definitely avoid it.
pollWait += lib . RandomStagger ( maxAge / 16 )
wait += lib . RandomStagger ( maxAge / 16 )
}
select {
case <- time . After ( pollW ait) :
case <- time . After ( w ait) :
case <- ctx . Done ( ) :
return
}
}
}
}