You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
consul/agent/rpc/peering/stream_tracker.go

213 lines
5.0 KiB

package peering
import (
"fmt"
"sync"
"time"
)
// streamTracker contains a map of (PeerID -> StreamStatus).
// As streams are opened and closed we track details about their status.
type streamTracker struct {
mu sync.RWMutex
streams map[string]*lockableStreamStatus
// timeNow is a shim for testing.
timeNow func() time.Time
}
func newStreamTracker() *streamTracker {
return &streamTracker{
streams: make(map[string]*lockableStreamStatus),
timeNow: time.Now,
}
}
// connected registers a stream for a given peer, and marks it as connected.
// It also enforces that there is only one active stream for a peer.
func (t *streamTracker) connected(id string) (*lockableStreamStatus, error) {
t.mu.Lock()
defer t.mu.Unlock()
status, ok := t.streams[id]
if !ok {
status = newLockableStreamStatus(t.timeNow)
t.streams[id] = status
return status, nil
}
if status.connected() {
return nil, fmt.Errorf("there is an active stream for the given PeerID %q", id)
}
status.trackConnected()
return status, nil
}
// disconnected ensures that if a peer id's stream status is tracked, it is marked as disconnected.
func (t *streamTracker) disconnected(id string) {
t.mu.Lock()
defer t.mu.Unlock()
if status, ok := t.streams[id]; ok {
status.trackDisconnected()
}
}
func (t *streamTracker) streamStatus(id string) (resp StreamStatus, found bool) {
t.mu.RLock()
defer t.mu.RUnlock()
s, ok := t.streams[id]
if !ok {
return StreamStatus{}, false
}
return s.status(), true
}
func (t *streamTracker) connectedStreams() map[string]chan struct{} {
t.mu.RLock()
defer t.mu.RUnlock()
resp := make(map[string]chan struct{})
for peer, status := range t.streams {
if status.connected() {
resp[peer] = status.doneCh
}
}
return resp
}
func (t *streamTracker) deleteStatus(id string) {
t.mu.Lock()
defer t.mu.Unlock()
delete(t.streams, id)
}
type lockableStreamStatus struct {
mu sync.RWMutex
// timeNow is a shim for testing.
timeNow func() time.Time
// doneCh allows for shutting down a stream gracefully by sending a termination message
// to the peer before the stream's context is cancelled.
doneCh chan struct{}
StreamStatus
}
// StreamStatus contains information about the replication stream to a peer cluster.
// TODO(peering): There's a lot of fields here...
type StreamStatus struct {
// Connected is true when there is an open stream for the peer.
Connected bool
// If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero.
DisconnectTime time.Time
// LastAck tracks the time we received the last ACK for a resource replicated TO the peer.
LastAck time.Time
// LastNack tracks the time we received the last NACK for a resource replicated to the peer.
LastNack time.Time
// LastNackMessage tracks the reported error message associated with the last NACK from a peer.
LastNackMessage string
// LastSendError tracks the time of the last error sending into the stream.
LastSendError time.Time
// LastSendErrorMessage tracks the last error message when sending into the stream.
LastSendErrorMessage string
// LastReceiveSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
LastReceiveSuccess time.Time
// LastReceiveError tracks either:
// - The time we failed to store a resource replicated FROM the peer.
// - The time of the last error when receiving from the stream.
LastReceiveError time.Time
// LastReceiveError tracks either:
// - The error message when we failed to store a resource replicated FROM the peer.
// - The last error message when receiving from the stream.
LastReceiveErrorMessage string
}
func newLockableStreamStatus(now func() time.Time) *lockableStreamStatus {
return &lockableStreamStatus{
StreamStatus: StreamStatus{
Connected: true,
},
timeNow: now,
doneCh: make(chan struct{}),
}
}
func (s *lockableStreamStatus) trackAck() {
s.mu.Lock()
s.LastAck = s.timeNow().UTC()
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackSendError(error string) {
s.mu.Lock()
s.LastSendError = s.timeNow().UTC()
s.LastSendErrorMessage = error
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackReceiveSuccess() {
s.mu.Lock()
s.LastReceiveSuccess = s.timeNow().UTC()
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackReceiveError(error string) {
s.mu.Lock()
s.LastReceiveError = s.timeNow().UTC()
s.LastReceiveErrorMessage = error
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackNack(msg string) {
s.mu.Lock()
s.LastNack = s.timeNow().UTC()
s.LastNackMessage = msg
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackConnected() {
s.mu.Lock()
s.Connected = true
s.DisconnectTime = time.Time{}
s.mu.Unlock()
}
func (s *lockableStreamStatus) trackDisconnected() {
s.mu.Lock()
s.Connected = false
s.DisconnectTime = s.timeNow().UTC()
s.mu.Unlock()
}
func (s *lockableStreamStatus) connected() bool {
var resp bool
s.mu.RLock()
resp = s.Connected
s.mu.RUnlock()
return resp
}
func (s *lockableStreamStatus) status() StreamStatus {
s.mu.RLock()
copy := s.StreamStatus
s.mu.RUnlock()
return copy
}