From 617a5f2dc2648a09d9847ae5a0537718792ad9ad Mon Sep 17 00:00:00 2001 From: malizz Date: Thu, 3 Nov 2022 11:51:22 -0700 Subject: [PATCH] convert stream status time fields to pointers (#15252) --- .../services/peerstream/stream_test.go | 46 ++++++------- .../services/peerstream/stream_tracker.go | 64 +++++++++++-------- .../peerstream/stream_tracker_test.go | 44 ++++++------- agent/rpc/peering/service.go | 15 +++-- api/peering.go | 6 +- proto/pbpeering/peering.go | 12 ++-- 6 files changed, 102 insertions(+), 85 deletions(-) diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index e53cb8cc71..512ccd6b63 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -569,7 +569,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) var lastSendAck time.Time - var lastSendSuccess time.Time + var lastSendSuccess *time.Time client.DrainStream(t) @@ -604,7 +604,7 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, + LastAck: &lastSendAck, ExportedServices: []string{}, } retry.Run(t, func(r *retry.R) { @@ -641,8 +641,8 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, - LastNack: lastNack, + LastAck: &lastSendAck, + LastNack: &lastNack, LastNackMessage: lastNackMsg, ExportedServices: []string{}, } @@ -693,10 +693,10 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, - LastNack: lastNack, + LastAck: &lastSendAck, + LastNack: &lastNack, LastNackMessage: lastNackMsg, - LastRecvResourceSuccess: lastRecvResourceSuccess, + LastRecvResourceSuccess: &lastRecvResourceSuccess, ExportedServices: []string{}, } @@ -749,11 +749,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, - LastNack: lastNack, + LastAck: &lastSendAck, + LastNack: &lastNack, LastNackMessage: lastNackMsg, - LastRecvResourceSuccess: lastRecvResourceSuccess, - LastRecvError: lastRecvError, + LastRecvResourceSuccess: &lastRecvResourceSuccess, + LastRecvError: &lastRecvError, LastRecvErrorMessage: lastRecvErrorMsg, ExportedServices: []string{}, } @@ -779,13 +779,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { expect := Status{ Connected: true, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, - LastNack: lastNack, + LastAck: &lastSendAck, + LastNack: &lastNack, LastNackMessage: lastNackMsg, - LastRecvResourceSuccess: lastRecvResourceSuccess, - LastRecvError: lastRecvError, + LastRecvResourceSuccess: &lastRecvResourceSuccess, + LastRecvError: &lastRecvError, LastRecvErrorMessage: lastRecvErrorMsg, - LastRecvHeartbeat: lastRecvHeartbeat, + LastRecvHeartbeat: &lastRecvHeartbeat, ExportedServices: []string{}, } @@ -807,14 +807,14 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { Connected: false, DisconnectErrorMessage: lastRecvErrorMsg, LastSendSuccess: lastSendSuccess, - LastAck: lastSendAck, - LastNack: lastNack, + LastAck: &lastSendAck, + LastNack: &lastNack, LastNackMessage: lastNackMsg, - DisconnectTime: disconnectTime, - LastRecvResourceSuccess: lastRecvResourceSuccess, - LastRecvError: lastRecvError, + DisconnectTime: &disconnectTime, + LastRecvResourceSuccess: &lastRecvResourceSuccess, + LastRecvError: &lastRecvError, LastRecvErrorMessage: lastRecvErrorMsg, - LastRecvHeartbeat: lastRecvHeartbeat, + LastRecvHeartbeat: &lastRecvHeartbeat, ExportedServices: []string{}, } @@ -1236,7 +1236,7 @@ func TestStreamResources_Server_DisconnectsOnHeartbeatTimeout(t *testing.T) { }) testutil.RunStep(t, "stream is disconnected due to heartbeat timeout", func(t *testing.T) { - disconnectTime := it.FutureNow(1) + disconnectTime := ptr(it.FutureNow(1)) retry.Run(t, func(r *retry.R) { status, ok := srv.StreamStatus(testPeerID) require.True(r, ok) diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index daf891d38a..f36ca055d1 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -148,22 +148,32 @@ func (t *Tracker) DeleteStatus(id string) { func (t *Tracker) IsHealthy(s Status) bool { // If stream is in a disconnected state for longer than the configured // heartbeat timeout, report as unhealthy. - if !s.DisconnectTime.IsZero() && - t.timeNow().Sub(s.DisconnectTime) > t.heartbeatTimeout { + if s.DisconnectTime != nil && + t.timeNow().Sub(*s.DisconnectTime) > t.heartbeatTimeout { return false } // If last Nack is after last Ack, it means the peer is unable to - // handle our replication message. - if s.LastNack.After(s.LastAck) && - t.timeNow().Sub(s.LastAck) > t.heartbeatTimeout { + // handle our replication message + if s.LastAck == nil { + s.LastAck = &time.Time{} + } + + if s.LastNack != nil && + s.LastNack.After(*s.LastAck) && + t.timeNow().Sub(*s.LastAck) > t.heartbeatTimeout { return false } // If last recv error is newer than last recv success, we were unable // to handle the peer's replication message. - if s.LastRecvError.After(s.LastRecvResourceSuccess) && - t.timeNow().Sub(s.LastRecvError) > t.heartbeatTimeout { + if s.LastRecvResourceSuccess == nil { + s.LastRecvResourceSuccess = &time.Time{} + } + + if s.LastRecvError != nil && + s.LastRecvError.After(*s.LastRecvResourceSuccess) && + t.timeNow().Sub(*s.LastRecvError) > t.heartbeatTimeout { return false } @@ -197,36 +207,36 @@ type Status struct { DisconnectErrorMessage string // If the status is not connected, DisconnectTime tracks when the stream was closed. Else it's zero. - DisconnectTime time.Time + DisconnectTime *time.Time // LastAck tracks the time we received the last ACK for a resource replicated TO the peer. - LastAck time.Time + LastAck *time.Time // LastNack tracks the time we received the last NACK for a resource replicated to the peer. - LastNack time.Time + LastNack *time.Time // LastNackMessage tracks the reported error message associated with the last NACK from a peer. LastNackMessage string // LastSendError tracks the time of the last error sending into the stream. - LastSendError time.Time + LastSendError *time.Time // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string // LastSendSuccess tracks the time we last successfully sent a resource TO the peer. - LastSendSuccess time.Time + LastSendSuccess *time.Time // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. - LastRecvHeartbeat time.Time + LastRecvHeartbeat *time.Time // LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. - LastRecvResourceSuccess time.Time + LastRecvResourceSuccess *time.Time // LastRecvError tracks either: // - The time we failed to store a resource replicated FROM the peer. // - The time of the last error when receiving from the stream. - LastRecvError time.Time + LastRecvError *time.Time // LastRecvErrorMessage tracks the last error message when receiving from the stream. LastRecvErrorMessage string @@ -263,47 +273,47 @@ func (s *MutableStatus) Done() <-chan struct{} { func (s *MutableStatus) TrackAck() { s.mu.Lock() - s.LastAck = s.timeNow().UTC() + s.LastAck = ptr(s.timeNow().UTC()) s.mu.Unlock() } func (s *MutableStatus) TrackSendError(error string) { s.mu.Lock() - s.LastSendError = s.timeNow().UTC() + s.LastSendError = ptr(s.timeNow().UTC()) s.LastSendErrorMessage = error s.mu.Unlock() } func (s *MutableStatus) TrackSendSuccess() { s.mu.Lock() - s.LastSendSuccess = s.timeNow().UTC() + s.LastSendSuccess = ptr(s.timeNow().UTC()) s.mu.Unlock() } // TrackRecvResourceSuccess tracks receiving a replicated resource. func (s *MutableStatus) TrackRecvResourceSuccess() { s.mu.Lock() - s.LastRecvResourceSuccess = s.timeNow().UTC() + s.LastRecvResourceSuccess = ptr(s.timeNow().UTC()) s.mu.Unlock() } // TrackRecvHeartbeat tracks receiving a heartbeat from our peer. func (s *MutableStatus) TrackRecvHeartbeat() { s.mu.Lock() - s.LastRecvHeartbeat = s.timeNow().UTC() + s.LastRecvHeartbeat = ptr(s.timeNow().UTC()) s.mu.Unlock() } func (s *MutableStatus) TrackRecvError(error string) { s.mu.Lock() - s.LastRecvError = s.timeNow().UTC() + s.LastRecvError = ptr(s.timeNow().UTC()) s.LastRecvErrorMessage = error s.mu.Unlock() } func (s *MutableStatus) TrackNack(msg string) { s.mu.Lock() - s.LastNack = s.timeNow().UTC() + s.LastNack = ptr(s.timeNow().UTC()) s.LastNackMessage = msg s.mu.Unlock() } @@ -311,7 +321,7 @@ func (s *MutableStatus) TrackNack(msg string) { func (s *MutableStatus) TrackConnected() { s.mu.Lock() s.Connected = true - s.DisconnectTime = time.Time{} + s.DisconnectTime = &time.Time{} s.DisconnectErrorMessage = "" s.mu.Unlock() } @@ -321,7 +331,7 @@ func (s *MutableStatus) TrackConnected() { func (s *MutableStatus) TrackDisconnectedGracefully() { s.mu.Lock() s.Connected = false - s.DisconnectTime = s.timeNow().UTC() + s.DisconnectTime = ptr(s.timeNow().UTC()) s.DisconnectErrorMessage = "" s.mu.Unlock() } @@ -331,7 +341,7 @@ func (s *MutableStatus) TrackDisconnectedGracefully() { func (s *MutableStatus) TrackDisconnectedDueToError(error string) { s.mu.Lock() s.Connected = false - s.DisconnectTime = s.timeNow().UTC() + s.DisconnectTime = ptr(s.timeNow().UTC()) s.DisconnectErrorMessage = error s.mu.Unlock() } @@ -389,3 +399,7 @@ func (s *MutableStatus) GetExportedServicesCount() int { return len(s.ExportedServices) } + +func ptr[T any](x T) *T { + return &x +} diff --git a/agent/grpc-external/services/peerstream/stream_tracker_test.go b/agent/grpc-external/services/peerstream/stream_tracker_test.go index bb018b4b46..cfe95d4012 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker_test.go +++ b/agent/grpc-external/services/peerstream/stream_tracker_test.go @@ -29,7 +29,7 @@ func TestTracker_IsHealthy(t *testing.T) { tracker: NewTracker(defaultIncomingHeartbeatTimeout), expectedVal: true, modifierFunc: func(status *MutableStatus) { - status.DisconnectTime = time.Now() + status.DisconnectTime = ptr(time.Now()) }, }, { @@ -37,7 +37,7 @@ func TestTracker_IsHealthy(t *testing.T) { tracker: NewTracker(1 * time.Millisecond), expectedVal: false, modifierFunc: func(status *MutableStatus) { - status.DisconnectTime = time.Now().Add(-1 * time.Minute) + status.DisconnectTime = ptr(time.Now().Add(-1 * time.Minute)) }, }, { @@ -46,8 +46,8 @@ func TestTracker_IsHealthy(t *testing.T) { expectedVal: true, modifierFunc: func(status *MutableStatus) { now := time.Now() - status.LastRecvResourceSuccess = now - status.LastRecvError = now.Add(1 * time.Second) + status.LastRecvResourceSuccess = &now + status.LastRecvError = ptr(now.Add(1 * time.Second)) }, }, { @@ -56,8 +56,8 @@ func TestTracker_IsHealthy(t *testing.T) { expectedVal: true, modifierFunc: func(status *MutableStatus) { now := time.Now() - status.LastRecvResourceSuccess = now - status.LastRecvError = now.Add(1 * time.Second) + status.LastRecvResourceSuccess = &now + status.LastRecvError = ptr(now.Add(1 * time.Second)) }, }, { @@ -66,8 +66,8 @@ func TestTracker_IsHealthy(t *testing.T) { expectedVal: false, modifierFunc: func(status *MutableStatus) { now := time.Now().Add(-2 * time.Second) - status.LastRecvResourceSuccess = now - status.LastRecvError = now.Add(1 * time.Second) + status.LastRecvResourceSuccess = &now + status.LastRecvError = ptr(now.Add(1 * time.Second)) }, }, { @@ -76,8 +76,8 @@ func TestTracker_IsHealthy(t *testing.T) { expectedVal: true, modifierFunc: func(status *MutableStatus) { now := time.Now() - status.LastAck = now - status.LastNack = now.Add(1 * time.Second) + status.LastAck = &now + status.LastNack = ptr(now.Add(1 * time.Second)) }, }, { @@ -86,8 +86,8 @@ func TestTracker_IsHealthy(t *testing.T) { expectedVal: false, modifierFunc: func(status *MutableStatus) { now := time.Now().Add(-2 * time.Second) - status.LastAck = now - status.LastNack = now.Add(1 * time.Second) + status.LastAck = &now + status.LastNack = ptr(now.Add(1 * time.Second)) }, }, { @@ -148,7 +148,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { }) var sequence uint64 - var lastSuccess time.Time + var lastSuccess *time.Time testutil.RunStep(t, "stream updated", func(t *testing.T) { statusPtr.TrackAck() @@ -157,7 +157,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { status, ok := tracker.StreamStatus(peerID) require.True(t, ok) - lastSuccess = it.base.Add(time.Duration(sequence) * time.Second).UTC() + lastSuccess = ptr(it.base.Add(time.Duration(sequence) * time.Second).UTC()) expect := Status{ Connected: true, LastAck: lastSuccess, @@ -171,7 +171,7 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { expect := Status{ Connected: false, - DisconnectTime: it.base.Add(time.Duration(sequence) * time.Second).UTC(), + DisconnectTime: ptr(it.base.Add(time.Duration(sequence) * time.Second).UTC()), LastAck: lastSuccess, } status, ok := tracker.StreamStatus(peerID) @@ -184,9 +184,9 @@ func TestTracker_EnsureConnectedDisconnected(t *testing.T) { require.NoError(t, err) expect := Status{ - Connected: true, - LastAck: lastSuccess, - + Connected: true, + LastAck: lastSuccess, + DisconnectTime: &time.Time{}, // DisconnectTime gets cleared on re-connect. } @@ -271,7 +271,7 @@ func TestMutableStatus_TrackConnected(t *testing.T) { s := MutableStatus{ Status: Status{ Connected: false, - DisconnectTime: time.Now(), + DisconnectTime: ptr(time.Now()), DisconnectErrorMessage: "disconnected", }, } @@ -279,7 +279,7 @@ func TestMutableStatus_TrackConnected(t *testing.T) { require.True(t, s.IsConnected()) require.True(t, s.Connected) - require.Equal(t, time.Time{}, s.DisconnectTime) + require.Equal(t, &time.Time{}, s.DisconnectTime) require.Empty(t, s.DisconnectErrorMessage) } @@ -287,7 +287,7 @@ func TestMutableStatus_TrackDisconnectedGracefully(t *testing.T) { it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), } - disconnectTime := it.FutureNow(1) + disconnectTime := ptr(it.FutureNow(1)) s := MutableStatus{ timeNow: it.Now, @@ -308,7 +308,7 @@ func TestMutableStatus_TrackDisconnectedDueToError(t *testing.T) { it := incrementalTime{ base: time.Date(2000, time.January, 1, 0, 0, 0, 0, time.UTC), } - disconnectTime := it.FutureNow(1) + disconnectTime := ptr(it.FutureNow(1)) s := MutableStatus{ timeNow: it.Now, diff --git a/agent/rpc/peering/service.go b/agent/rpc/peering/service.go index b6a8e6cfec..a66205e12f 100644 --- a/agent/rpc/peering/service.go +++ b/agent/rpc/peering/service.go @@ -705,14 +705,17 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering cp.State = pbpeering.PeeringState_FAILING } - latest := func(tt ...time.Time) time.Time { + latest := func(tt ...*time.Time) *time.Time { latest := time.Time{} for _, t := range tt { + if t == nil { + continue + } if t.After(latest) { - latest = t + latest = *t } } - return latest + return &latest } lastRecv := latest(streamState.LastRecvHeartbeat, streamState.LastRecvError, streamState.LastRecvResourceSuccess) @@ -721,9 +724,9 @@ func (s *Server) reconcilePeering(peering *pbpeering.Peering) *pbpeering.Peering cp.StreamStatus = &pbpeering.StreamStatus{ ImportedServices: streamState.ImportedServices, ExportedServices: streamState.ExportedServices, - LastHeartbeat: structs.TimeToProto(streamState.LastRecvHeartbeat), - LastReceive: structs.TimeToProto(lastRecv), - LastSend: structs.TimeToProto(lastSend), + LastHeartbeat: pbpeering.TimePtrToProto(streamState.LastRecvHeartbeat), + LastReceive: pbpeering.TimePtrToProto(lastRecv), + LastSend: pbpeering.TimePtrToProto(lastSend), } return cp diff --git a/api/peering.go b/api/peering.go index e77a74f089..d77f8cd8ca 100644 --- a/api/peering.go +++ b/api/peering.go @@ -85,11 +85,11 @@ type PeeringStreamStatus struct { // ExportedServices is the list of services exported to this peering. ExportedServices []string // LastHeartbeat represents when the last heartbeat message was received. - LastHeartbeat time.Time + LastHeartbeat *time.Time // LastReceive represents when any message was last received, regardless of success or error. - LastReceive time.Time + LastReceive *time.Time // LastSend represents when any message was last sent, regardless of success or error. - LastSend time.Time + LastSend *time.Time } type PeeringReadResponse struct { diff --git a/proto/pbpeering/peering.go b/proto/pbpeering/peering.go index 511de42c57..2133cee2e9 100644 --- a/proto/pbpeering/peering.go +++ b/proto/pbpeering/peering.go @@ -147,9 +147,9 @@ func StreamStatusToAPI(status *StreamStatus) api.PeeringStreamStatus { return api.PeeringStreamStatus{ ImportedServices: status.ImportedServices, ExportedServices: status.ExportedServices, - LastHeartbeat: structs.TimeFromProto(status.LastHeartbeat), - LastReceive: structs.TimeFromProto(status.LastReceive), - LastSend: structs.TimeFromProto(status.LastSend), + LastHeartbeat: TimePtrFromProto(status.LastHeartbeat), + LastReceive: TimePtrFromProto(status.LastReceive), + LastSend: TimePtrFromProto(status.LastSend), } } @@ -157,9 +157,9 @@ func StreamStatusFromAPI(status api.PeeringStreamStatus) *StreamStatus { return &StreamStatus{ ImportedServices: status.ImportedServices, ExportedServices: status.ExportedServices, - LastHeartbeat: structs.TimeToProto(status.LastHeartbeat), - LastReceive: structs.TimeToProto(status.LastReceive), - LastSend: structs.TimeToProto(status.LastSend), + LastHeartbeat: TimePtrToProto(status.LastHeartbeat), + LastReceive: TimePtrToProto(status.LastReceive), + LastSend: TimePtrToProto(status.LastSend), } }