@ -176,28 +176,27 @@ func (e *EventPublisher) Subscribe(req *SubscribeRequest) (*Subscription, error)
}
snapFromCache := e . getCachedSnapshotLocked ( req )
if req . Index == 0 && snapFromCache != nil {
return e . subscriptions . add ( req , snapFromCache . First ) , nil
if snapFromCache == nil {
snap := newEventSnapshot ( )
snap . appendAndSplice ( * req , handler , topicHead )
e . setCachedSnapshotLocked ( req , snap )
snapFromCache = snap
}
snap := newEventSnapshot ( )
// if the request has an Index the client view is stale and must be reset
// with a NewSnapshotToFollow event.
if req . Index > 0 {
snap . buffer . Append ( [ ] Event { {
Topic : req . Topic ,
Payload : newSnapshotToFollow { } ,
} } )
if snapFromCache != nil {
snap . buffer . AppendItem ( snapFromCache . First )
return e . subscriptions . add ( req , snap . First ) , nil
}
// If the request.Index is 0 the client has no view, send a full snapshot.
if req . Index == 0 {
return e . subscriptions . add ( req , snapFromCache . First ) , nil
}
snap . appendAndSplice ( * req , handler , topicHead )
e . setCachedSnapshotLocked ( req , snap )
return e . subscriptions . add ( req , snap . First ) , nil
// otherwise the request has an Index, the client view is stale and must be reset
// with a NewSnapshotToFollow event.
result := newEventSnapshot ( )
result . buffer . Append ( [ ] Event { {
Topic : req . Topic ,
Payload : newSnapshotToFollow { } ,
} } )
result . buffer . AppendItem ( snapFromCache . First )
return e . subscriptions . add ( req , result . First ) , nil
}
func ( s * subscriptions ) add ( req * SubscribeRequest , head * bufferItem ) * Subscription {