|
|
|
@ -123,6 +123,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|
|
|
|
|
|
|
|
|
var nodeChanges map[string]changeType
|
|
|
|
|
var serviceChanges map[nodeServiceTuple]serviceChange
|
|
|
|
|
var termGatewayChanges map[structs.ServiceName]map[structs.ServiceName]serviceChange
|
|
|
|
|
|
|
|
|
|
markNode := func(node string, typ changeType) {
|
|
|
|
|
if nodeChanges == nil {
|
|
|
|
@ -201,9 +202,45 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|
|
|
|
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
case gatewayServicesTableName:
|
|
|
|
|
gs := changeObject(change).(*structs.GatewayService)
|
|
|
|
|
if gs.GatewayKind != structs.ServiceKindTerminatingGateway {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
gsChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
|
|
|
|
if termGatewayChanges == nil {
|
|
|
|
|
termGatewayChanges = make(map[structs.ServiceName]map[structs.ServiceName]serviceChange)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
gatewayChanges, ok := termGatewayChanges[gs.Gateway]
|
|
|
|
|
if !ok {
|
|
|
|
|
termGatewayChanges[gs.Gateway] = map[structs.ServiceName]serviceChange{}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prevChange, ok := gatewayChanges[gs.Service]
|
|
|
|
|
if !ok {
|
|
|
|
|
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if changeTypeFromChange(change) == changeDelete {
|
|
|
|
|
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
prevGs := changeObject(prevChange.change).(*structs.GatewayService)
|
|
|
|
|
if !gs.IsSame(prevGs) {
|
|
|
|
|
gsChange.changeType = changeUpdate
|
|
|
|
|
termGatewayChanges[gs.Gateway][gs.Service] = gsChange
|
|
|
|
|
} else {
|
|
|
|
|
delete(termGatewayChanges[gs.Gateway], gs.Service)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
//fmt.Printf("term gateway map: %v", termGatewayChanges)
|
|
|
|
|
|
|
|
|
|
// Now act on those marked nodes/services
|
|
|
|
|
for node, changeType := range nodeChanges {
|
|
|
|
|
if changeType == changeDelete {
|
|
|
|
@ -221,9 +258,6 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for tuple, srvChange := range serviceChanges {
|
|
|
|
|
// change may be nil if there was a change that _affected_ the service
|
|
|
|
|
// like a change to checks but it didn't actually change the service
|
|
|
|
|
// record itself.
|
|
|
|
|
if srvChange.changeType == changeDelete {
|
|
|
|
|
sn := srvChange.change.Before.(*structs.ServiceNode)
|
|
|
|
|
e := newServiceHealthEventDeregister(changes.Index, sn)
|
|
|
|
@ -265,6 +299,53 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|
|
|
|
events = append(events, e)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
for gatewayName, serviceChanges := range termGatewayChanges {
|
|
|
|
|
for serviceName, gsChange := range serviceChanges {
|
|
|
|
|
gs := changeObject(gsChange.change).(*structs.GatewayService)
|
|
|
|
|
|
|
|
|
|
_, nodes, err := serviceGatewayNodes(tx, nil, serviceName.Name, gs.GatewayKind, &gatewayName.EnterpriseMeta)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Always send deregister events for deletes/updates.
|
|
|
|
|
if gsChange.changeType != changeCreate {
|
|
|
|
|
for _, sn := range nodes {
|
|
|
|
|
e := newServiceHealthEventDeregister(changes.Index, sn)
|
|
|
|
|
|
|
|
|
|
e.Topic = topicServiceHealthConnect
|
|
|
|
|
// todo(streaming): make namespace-aware in enterprise
|
|
|
|
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
|
|
|
|
payload.key = serviceName.Name
|
|
|
|
|
e.Payload = payload
|
|
|
|
|
|
|
|
|
|
events = append(events, e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if gsChange.changeType == changeDelete {
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build service events and append them
|
|
|
|
|
for _, sn := range nodes {
|
|
|
|
|
tuple := newNodeServiceTupleFromServiceNode(sn)
|
|
|
|
|
e, err := newServiceHealthEventForService(tx, changes.Index, tuple)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return nil, err
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
e.Topic = topicServiceHealthConnect
|
|
|
|
|
// todo(streaming): make namespace-aware in enterprise
|
|
|
|
|
payload := e.Payload.(EventPayloadCheckServiceNode)
|
|
|
|
|
payload.key = serviceName.Name
|
|
|
|
|
e.Payload = payload
|
|
|
|
|
|
|
|
|
|
events = append(events, e)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Duplicate any events that affected connect-enabled instances (proxies or
|
|
|
|
|
// native apps) to the relevant Connect topic.
|
|
|
|
|
connectEvents, err := serviceHealthToConnectEvents(tx, events...)
|
|
|
|
|