mirror of https://github.com/hashicorp/consul
state: handle terminating gateways in service health events
parent
3775392fb5
commit
e573e64d58
|
@ -218,17 +218,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
events = append(events, e)
|
events = append(events, e)
|
||||||
}
|
}
|
||||||
|
|
||||||
if before.ServiceKind == structs.ServiceKindConnectProxy &&
|
if e, ok := isConnectProxyDestinationServiceChange(changes.Index, before, after); ok {
|
||||||
before.ServiceProxy.DestinationServiceName != after.ServiceProxy.DestinationServiceName {
|
events = append(events, e)
|
||||||
// Connect proxy changed the service it is representing, need to issue a
|
|
||||||
// dereg for the old service on the Connect topic. We don't actually need
|
|
||||||
// to deregister this sidecar service though as it still exists and
|
|
||||||
// didn't change its name (or if it did that was caught just above). But
|
|
||||||
// our mechanism for connect events is to convert them so we generate
|
|
||||||
// the regular one, convert it to Connect topic and then discard the
|
|
||||||
// original.
|
|
||||||
e := newServiceHealthEventDeregister(changes.Index, before)
|
|
||||||
events = append(events, serviceHealthToConnectEvents(e)...)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -252,6 +243,22 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
||||||
return events, nil
|
return events, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// isConnectProxyDestinationServiceChange handles the case where a Connect proxy changed
|
||||||
|
// the service it is proxying. We need to issue a de-registration for the old
|
||||||
|
// service on the Connect topic. We don't actually need to deregister this sidecar
|
||||||
|
// service though as it still exists and didn't change its name.
|
||||||
|
func isConnectProxyDestinationServiceChange(idx uint64, before, after *structs.ServiceNode) (stream.Event, bool) {
|
||||||
|
if before.ServiceKind != structs.ServiceKindConnectProxy ||
|
||||||
|
before.ServiceProxy.DestinationServiceName == after.ServiceProxy.DestinationServiceName {
|
||||||
|
return stream.Event{}, false
|
||||||
|
}
|
||||||
|
|
||||||
|
e := newServiceHealthEventDeregister(idx, before)
|
||||||
|
e.Topic = TopicServiceHealthConnect
|
||||||
|
e.Key = getPayloadCheckServiceNode(e.Payload).Service.Proxy.DestinationServiceName
|
||||||
|
return e, true
|
||||||
|
}
|
||||||
|
|
||||||
type changeType uint8
|
type changeType uint8
|
||||||
|
|
||||||
const (
|
const (
|
||||||
|
@ -281,33 +288,35 @@ func changeTypeFromChange(change memdb.Change) changeType {
|
||||||
// switching connection details to be the proxy instead of the actual instance
|
// switching connection details to be the proxy instead of the actual instance
|
||||||
// in case of a sidecar.
|
// in case of a sidecar.
|
||||||
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
|
func serviceHealthToConnectEvents(events ...stream.Event) []stream.Event {
|
||||||
var serviceHealthConnectEvents []stream.Event
|
var result []stream.Event
|
||||||
for _, event := range events {
|
for _, event := range events {
|
||||||
if event.Topic != TopicServiceHealth {
|
if event.Topic != TopicServiceHealth {
|
||||||
// Skip non-health or any events already emitted to Connect topic
|
// Skip non-health or any events already emitted to Connect topic
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
node := getPayloadCheckServiceNode(event.Payload)
|
node := getPayloadCheckServiceNode(event.Payload)
|
||||||
// TODO: do we need to handle gateways here as well?
|
if node.Service == nil {
|
||||||
if node.Service == nil ||
|
|
||||||
(node.Service.Kind != structs.ServiceKindConnectProxy && !node.Service.Connect.Native) {
|
|
||||||
// Event is not a service instance (i.e. just a node registration)
|
|
||||||
// or is not a service that is not connect-enabled in some way.
|
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
connectEvent := event
|
connectEvent := event
|
||||||
connectEvent.Topic = TopicServiceHealthConnect
|
connectEvent.Topic = TopicServiceHealthConnect
|
||||||
|
|
||||||
// If this is a proxy, set the key to the destination service name.
|
switch {
|
||||||
if node.Service.Kind == structs.ServiceKindConnectProxy {
|
case node.Service.Connect.Native:
|
||||||
connectEvent.Key = node.Service.Proxy.DestinationServiceName
|
result = append(result, connectEvent)
|
||||||
}
|
|
||||||
|
|
||||||
serviceHealthConnectEvents = append(serviceHealthConnectEvents, connectEvent)
|
case node.Service.Kind == structs.ServiceKindConnectProxy:
|
||||||
|
connectEvent.Key = node.Service.Proxy.DestinationServiceName
|
||||||
|
result = append(result, connectEvent)
|
||||||
|
|
||||||
|
default:
|
||||||
|
// ServiceKindTerminatingGateway changes are handled separately.
|
||||||
|
// All other cases are not relevant to the connect topic
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return serviceHealthConnectEvents
|
return result
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
|
func getPayloadCheckServiceNode(payload interface{}) *structs.CheckServiceNode {
|
||||||
|
|
Loading…
Reference in New Issue