mirror of https://github.com/hashicorp/consul
parent
a1df5ae9b7
commit
8c5b70d227
|
@ -265,11 +265,11 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
||||||
|
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
logger.Info("stream ended by peer")
|
logger.Info("stream ended by peer")
|
||||||
status.TrackReceiveError(err.Error())
|
status.TrackRecvError(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
logger.Error("failed to receive from stream", "error", err)
|
logger.Error("failed to receive from stream", "error", err)
|
||||||
status.TrackReceiveError(err.Error())
|
status.TrackRecvError(err.Error())
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -459,9 +459,9 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
||||||
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger)
|
reply, err := s.processResponse(streamReq.PeerName, streamReq.Partition, status, resp, logger)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
logger.Error("failed to persist resource", "resourceURL", resp.ResourceURL, "resourceID", resp.ResourceID)
|
||||||
status.TrackReceiveError(err.Error())
|
status.TrackRecvError(err.Error())
|
||||||
} else {
|
} else {
|
||||||
status.TrackReceiveResourceSuccess()
|
status.TrackRecvResourceSuccess()
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := streamSend(reply); err != nil {
|
if err := streamSend(reply); err != nil {
|
||||||
|
@ -482,7 +482,7 @@ func (s *Server) realHandleStream(streamReq HandleStreamRequest) error {
|
||||||
}
|
}
|
||||||
|
|
||||||
if msg.GetHeartbeat() != nil {
|
if msg.GetHeartbeat() != nil {
|
||||||
status.TrackReceiveHeartbeat()
|
status.TrackRecvHeartbeat()
|
||||||
|
|
||||||
// Reset the heartbeat timeout by creating a new context.
|
// Reset the heartbeat timeout by creating a new context.
|
||||||
// We first must cancel the old context so there's no leaks. This is safe to do because we're only
|
// We first must cancel the old context so there's no leaks. This is safe to do because we're only
|
||||||
|
|
|
@ -475,11 +475,11 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
api := structs.NewServiceName("api", nil)
|
api := structs.NewServiceName("api", nil)
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
LastAck: lastSendSuccess,
|
LastAck: lastSendSuccess,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||||
ImportedServices: map[string]struct{}{
|
ImportedServices: map[string]struct{}{
|
||||||
api.String(): {},
|
api.String(): {},
|
||||||
},
|
},
|
||||||
|
@ -534,13 +534,13 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
api := structs.NewServiceName("api", nil)
|
api := structs.NewServiceName("api", nil)
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
LastAck: lastSendSuccess,
|
LastAck: lastSendSuccess,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||||
LastReceiveError: lastRecvError,
|
LastRecvError: lastRecvError,
|
||||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||||
ImportedServices: map[string]struct{}{
|
ImportedServices: map[string]struct{}{
|
||||||
api.String(): {},
|
api.String(): {},
|
||||||
},
|
},
|
||||||
|
@ -553,27 +553,27 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
var lastReceiveHeartbeat time.Time
|
var lastRecvHeartbeat time.Time
|
||||||
testutil.RunStep(t, "receives heartbeat", func(t *testing.T) {
|
testutil.RunStep(t, "receives heartbeat", func(t *testing.T) {
|
||||||
resp := &pbpeerstream.ReplicationMessage{
|
resp := &pbpeerstream.ReplicationMessage{
|
||||||
Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{
|
Payload: &pbpeerstream.ReplicationMessage_Heartbeat_{
|
||||||
Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{},
|
Heartbeat: &pbpeerstream.ReplicationMessage_Heartbeat{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
lastReceiveHeartbeat = it.FutureNow(1)
|
lastRecvHeartbeat = it.FutureNow(1)
|
||||||
err := client.Send(resp)
|
err := client.Send(resp)
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
api := structs.NewServiceName("api", nil)
|
api := structs.NewServiceName("api", nil)
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: true,
|
Connected: true,
|
||||||
LastAck: lastSendSuccess,
|
LastAck: lastSendSuccess,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||||
LastReceiveError: lastRecvError,
|
LastRecvError: lastRecvError,
|
||||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||||
LastReceiveHeartbeat: lastReceiveHeartbeat,
|
LastRecvHeartbeat: lastRecvHeartbeat,
|
||||||
ImportedServices: map[string]struct{}{
|
ImportedServices: map[string]struct{}{
|
||||||
api.String(): {},
|
api.String(): {},
|
||||||
},
|
},
|
||||||
|
@ -596,16 +596,16 @@ func TestStreamResources_Server_StreamTracker(t *testing.T) {
|
||||||
api := structs.NewServiceName("api", nil)
|
api := structs.NewServiceName("api", nil)
|
||||||
|
|
||||||
expect := Status{
|
expect := Status{
|
||||||
Connected: false,
|
Connected: false,
|
||||||
DisconnectErrorMessage: "stream ended unexpectedly",
|
DisconnectErrorMessage: "stream ended unexpectedly",
|
||||||
LastAck: lastSendSuccess,
|
LastAck: lastSendSuccess,
|
||||||
LastNack: lastNack,
|
LastNack: lastNack,
|
||||||
LastNackMessage: lastNackMsg,
|
LastNackMessage: lastNackMsg,
|
||||||
DisconnectTime: disconnectTime,
|
DisconnectTime: disconnectTime,
|
||||||
LastReceiveResourceSuccess: lastRecvResourceSuccess,
|
LastRecvResourceSuccess: lastRecvResourceSuccess,
|
||||||
LastReceiveError: lastRecvError,
|
LastRecvError: lastRecvError,
|
||||||
LastReceiveErrorMessage: lastRecvErrorMsg,
|
LastRecvErrorMessage: lastRecvErrorMsg,
|
||||||
LastReceiveHeartbeat: lastReceiveHeartbeat,
|
LastRecvHeartbeat: lastRecvHeartbeat,
|
||||||
ImportedServices: map[string]struct{}{
|
ImportedServices: map[string]struct{}{
|
||||||
api.String(): {},
|
api.String(): {},
|
||||||
},
|
},
|
||||||
|
|
|
@ -167,21 +167,19 @@ type Status struct {
|
||||||
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
// LastSendErrorMessage tracks the last error message when sending into the stream.
|
||||||
LastSendErrorMessage string
|
LastSendErrorMessage string
|
||||||
|
|
||||||
// LastReceiveHeartbeat tracks when we last received a heartbeat from our peer.
|
// LastRecvHeartbeat tracks when we last received a heartbeat from our peer.
|
||||||
LastReceiveHeartbeat time.Time
|
LastRecvHeartbeat time.Time
|
||||||
|
|
||||||
// LastReceiveResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
// LastRecvResourceSuccess tracks the time we last successfully stored a resource replicated FROM the peer.
|
||||||
LastReceiveResourceSuccess time.Time
|
LastRecvResourceSuccess time.Time
|
||||||
|
|
||||||
// LastReceiveError tracks either:
|
// LastRecvError tracks either:
|
||||||
// - The time we failed to store a resource replicated FROM the peer.
|
// - The time we failed to store a resource replicated FROM the peer.
|
||||||
// - The time of the last error when receiving from the stream.
|
// - The time of the last error when receiving from the stream.
|
||||||
LastReceiveError time.Time
|
LastRecvError time.Time
|
||||||
|
|
||||||
// LastReceiveError tracks either:
|
// LastRecvErrorMessage tracks the last error message when receiving from the stream.
|
||||||
// - The error message when we failed to store a resource replicated FROM the peer.
|
LastRecvErrorMessage string
|
||||||
// - The last error message when receiving from the stream.
|
|
||||||
LastReceiveErrorMessage string
|
|
||||||
|
|
||||||
// TODO(peering): consider keeping track of imported and exported services thru raft
|
// TODO(peering): consider keeping track of imported and exported services thru raft
|
||||||
// ImportedServices keeps track of which service names are imported for the peer
|
// ImportedServices keeps track of which service names are imported for the peer
|
||||||
|
@ -225,24 +223,24 @@ func (s *MutableStatus) TrackSendError(error string) {
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TrackReceiveResourceSuccess tracks receiving a replicated resource.
|
// TrackRecvResourceSuccess tracks receiving a replicated resource.
|
||||||
func (s *MutableStatus) TrackReceiveResourceSuccess() {
|
func (s *MutableStatus) TrackRecvResourceSuccess() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.LastReceiveResourceSuccess = s.timeNow().UTC()
|
s.LastRecvResourceSuccess = s.timeNow().UTC()
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TrackReceiveHeartbeat tracks receiving a heartbeat from our peer.
|
// TrackRecvHeartbeat tracks receiving a heartbeat from our peer.
|
||||||
func (s *MutableStatus) TrackReceiveHeartbeat() {
|
func (s *MutableStatus) TrackRecvHeartbeat() {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.LastReceiveHeartbeat = s.timeNow().UTC()
|
s.LastRecvHeartbeat = s.timeNow().UTC()
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *MutableStatus) TrackReceiveError(error string) {
|
func (s *MutableStatus) TrackRecvError(error string) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.LastReceiveError = s.timeNow().UTC()
|
s.LastRecvError = s.timeNow().UTC()
|
||||||
s.LastReceiveErrorMessage = error
|
s.LastRecvErrorMessage = error
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue