From 8c5b70d2274b8bdebdb957c7ecc162e7019a8b5a Mon Sep 17 00:00:00 2001 From: Luke Kysow <1034429+lkysow@users.noreply.github.com> Date: Mon, 25 Jul 2022 16:08:03 -0700 Subject: [PATCH] Rename receive to recv in tracker (#13896) Because it's shorter --- .../services/peerstream/stream_resources.go | 10 +-- .../services/peerstream/stream_test.go | 64 +++++++++---------- .../services/peerstream/stream_tracker.go | 36 +++++------ 3 files changed, 54 insertions(+), 56 deletions(-) diff --git a/agent/grpc-external/services/peerstream/stream_resources.go b/agent/grpc-external/services/peerstream/stream_resources.go index 000e26ca9d..1feb7f01d6 100644 --- a/agent/grpc-external/services/peerstream/stream_resources.go +++ b/agent/grpc-external/services/peerstream/stream_resources.go @@ -265,11 +265,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { if err == io.EOF { logger.Info("stream ended by peer") - status.TrackReceiveError(err.Error()) + status.TrackRecvError(err.Error()) return } logger.Error("failed to receive from stream", "error", err) - status.TrackReceiveError(err.Error()) + status.TrackRecvError(err.Error()) return } }() @@ -459,9 +459,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger) if err != nil { logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID) - status.TrackReceiveError(err.Error()) + status.TrackRecvError(err.Error()) } else { - status.TrackReceiveResourceSuccess() + status.TrackRecvResourceSuccess() } if err := streamSend(reply); err != nil { @@ -482,7 +482,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error { } if msg.GetHeartbeat() != nil { - status.TrackReceiveHeartbeat() + status.TrackRecvHeartbeat() // Reset the heartbeat timeout by creating a new context. // We first must cancel the old context so there's no leaks. This is safe to do because we're only diff --git a/agent/grpc-external/services/peerstream/stream_test.go b/agent/grpc-external/services/peerstream/stream_test.go index d9e18a1e62..174ecf59f3 100644 --- a/agent/grpc-external/services/peerstream/stream_test.go +++ b/agent/grpc-external/services/peerstream/stream_test.go @@ -475,11 +475,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - LastReceiveResourceSuccess: lastRecvResourceSuccess, + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastRecvResourceSuccess: lastRecvResourceSuccess, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -534,13 +534,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - LastReceiveResourceSuccess: lastRecvResourceSuccess, - LastReceiveError: lastRecvError, - LastReceiveErrorMessage: lastRecvErrorMsg, + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastRecvResourceSuccess: lastRecvResourceSuccess, + LastRecvError: lastRecvError, + LastRecvErrorMessage: lastRecvErrorMsg, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -553,27 +553,27 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { }) }) - var lastReceiveHeartbeat time.Time + var lastRecvHeartbeat time.Time testutil.RunStep(t, "receives heartbeat", func(t *testing.T) { resp := &pbpeerstream.ReplicationMessage{ Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{ Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{}, }, } - lastReceiveHeartbeat = it.FutureNow(1) + lastRecvHeartbeat = it.FutureNow(1) err := client.Send(resp) require.NoError(t, err) api := structs.NewServiceName("api", nil) expect := Status{ - Connected: true, - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - LastReceiveResourceSuccess: lastRecvResourceSuccess, - LastReceiveError: lastRecvError, - LastReceiveErrorMessage: lastRecvErrorMsg, - LastReceiveHeartbeat: lastReceiveHeartbeat, + Connected: true, + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + LastRecvResourceSuccess: lastRecvResourceSuccess, + LastRecvError: lastRecvError, + LastRecvErrorMessage: lastRecvErrorMsg, + LastRecvHeartbeat: lastRecvHeartbeat, ImportedServices: map[string]struct{}{ api.String(): {}, }, @@ -596,16 +596,16 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) { api := structs.NewServiceName("api", nil) expect := Status{ - Connected: false, - DisconnectErrorMessage: "stream ended unexpectedly", - LastAck: lastSendSuccess, - LastNack: lastNack, - LastNackMessage: lastNackMsg, - DisconnectTime: disconnectTime, - LastReceiveResourceSuccess: lastRecvResourceSuccess, - LastReceiveError: lastRecvError, - LastReceiveErrorMessage: lastRecvErrorMsg, - LastReceiveHeartbeat: lastReceiveHeartbeat, + Connected: false, + DisconnectErrorMessage: "stream ended unexpectedly", + LastAck: lastSendSuccess, + LastNack: lastNack, + LastNackMessage: lastNackMsg, + DisconnectTime: disconnectTime, + LastRecvResourceSuccess: lastRecvResourceSuccess, + LastRecvError: lastRecvError, + LastRecvErrorMessage: lastRecvErrorMsg, + LastRecvHeartbeat: lastRecvHeartbeat, ImportedServices: map[string]struct{}{ api.String(): {}, }, diff --git a/agent/grpc-external/services/peerstream/stream_tracker.go b/agent/grpc-external/services/peerstream/stream_tracker.go index 8632f36eca..f7a451595d 100644 --- a/agent/grpc-external/services/peerstream/stream_tracker.go +++ b/agent/grpc-external/services/peerstream/stream_tracker.go @@ -167,21 +167,19 @@ type Status struct { // LastSendErrorMessage tracks the last error message when sending into the stream. LastSendErrorMessage string - // LastReceiveHeartbeat tracks when we last received a heartbeat from our peer. - LastReceiveHeartbeat time.Time + // LastRecvHeartbeat tracks when we last received a heartbeat from our peer. + LastRecvHeartbeat time.Time - // LastReceiveResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. - LastReceiveResourceSuccess time.Time + // LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer. + LastRecvResourceSuccess time.Time - // LastReceiveError tracks either: + // LastRecvError tracks either: // - The time we failed to store a resource replicated FROM the peer. // - The time of the last error when receiving from the stream. - LastReceiveError time.Time + LastRecvError time.Time - // LastReceiveError tracks either: - // - The error message when we failed to store a resource replicated FROM the peer. - // - The last error message when receiving from the stream. - LastReceiveErrorMessage string + // LastRecvErrorMessage tracks the last error message when receiving from the stream. + LastRecvErrorMessage string // TODO(peering): consider keeping track of imported and exported services thru raft // ImportedServices keeps track of which service names are imported for the peer @@ -225,24 +223,24 @@ func (s *MutableStatus) TrackSendError(error string) { s.mu.Unlock() } -// TrackReceiveResourceSuccess tracks receiving a replicated resource. -func (s *MutableStatus) TrackReceiveResourceSuccess() { +// TrackRecvResourceSuccess tracks receiving a replicated resource. +func (s *MutableStatus) TrackRecvResourceSuccess() { s.mu.Lock() - s.LastReceiveResourceSuccess = s.timeNow().UTC() + s.LastRecvResourceSuccess = s.timeNow().UTC() s.mu.Unlock() } -// TrackReceiveHeartbeat tracks receiving a heartbeat from our peer. -func (s *MutableStatus) TrackReceiveHeartbeat() { +// TrackRecvHeartbeat tracks receiving a heartbeat from our peer. +func (s *MutableStatus) TrackRecvHeartbeat() { s.mu.Lock() - s.LastReceiveHeartbeat = s.timeNow().UTC() + s.LastRecvHeartbeat = s.timeNow().UTC() s.mu.Unlock() } -func (s *MutableStatus) TrackReceiveError(error string) { +func (s *MutableStatus) TrackRecvError(error string) { s.mu.Lock() - s.LastReceiveError = s.timeNow().UTC() - s.LastReceiveErrorMessage = error + s.LastRecvError = s.timeNow().UTC() + s.LastRecvErrorMessage = error s.mu.Unlock() }