mirror of https://github.com/hashicorp/consul
state: use changeType in serviceChanges
To be a little more explicit, instead of nil implying an indirect changepull/8357/head
parent
01424ba146
commit
417c5c93a8
|
@ -63,13 +63,36 @@ type nodeServiceTuple struct {
|
|||
EntMeta structs.EnterpriseMeta
|
||||
}
|
||||
|
||||
func newNodeServiceTupleFromServiceNode(sn *structs.ServiceNode) nodeServiceTuple {
|
||||
return nodeServiceTuple{
|
||||
Node: sn.Node,
|
||||
ServiceID: sn.ServiceID,
|
||||
EntMeta: sn.EnterpriseMeta,
|
||||
}
|
||||
}
|
||||
|
||||
func newNodeServiceTupleFromServiceHealthCheck(hc *structs.HealthCheck) nodeServiceTuple {
|
||||
return nodeServiceTuple{
|
||||
Node: hc.Node,
|
||||
ServiceID: hc.ServiceID,
|
||||
EntMeta: hc.EnterpriseMeta,
|
||||
}
|
||||
}
|
||||
|
||||
type serviceChange struct {
|
||||
changeType changeType
|
||||
change memdb.Change
|
||||
}
|
||||
|
||||
var serviceChangeIndirect = serviceChange{changeType: changeIndirect}
|
||||
|
||||
// ServiceHealthEventsFromChanges returns all the service and Connect health
|
||||
// events that should be emitted given a set of changes to the state store.
|
||||
func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event, error) {
|
||||
var events []stream.Event
|
||||
|
||||
var nodeChanges map[string]changeType
|
||||
var serviceChanges map[nodeServiceTuple]*memdb.Change
|
||||
var serviceChanges map[nodeServiceTuple]serviceChange
|
||||
|
||||
markNode := func(node string, typ changeType) {
|
||||
if nodeChanges == nil {
|
||||
|
@ -83,21 +106,16 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
nodeChanges[node] = typ
|
||||
}
|
||||
}
|
||||
markService := func(node, service string, entMeta structs.EnterpriseMeta, svcChange *memdb.Change) {
|
||||
markService := func(key nodeServiceTuple, svcChange serviceChange) {
|
||||
if serviceChanges == nil {
|
||||
serviceChanges = make(map[nodeServiceTuple]*memdb.Change)
|
||||
}
|
||||
k := nodeServiceTuple{
|
||||
Node: node,
|
||||
ServiceID: service,
|
||||
EntMeta: entMeta,
|
||||
serviceChanges = make(map[nodeServiceTuple]serviceChange)
|
||||
}
|
||||
// If the caller has an actual service mutation ensure we store it even if
|
||||
// the service is already marked. If the caller is just marking the service
|
||||
// dirty without an node change, don't overwrite any existing node change we
|
||||
// know about.
|
||||
if serviceChanges[k] == nil {
|
||||
serviceChanges[k] = svcChange
|
||||
if serviceChanges[key].changeType == changeIndirect {
|
||||
serviceChanges[key] = svcChange
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -114,8 +132,8 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
|
||||
case "services":
|
||||
sn := changeObject(change).(*structs.ServiceNode)
|
||||
changeCopy := change // TODO: why does the change need to be copied?
|
||||
markService(sn.Node, sn.ServiceID, sn.EnterpriseMeta, &changeCopy)
|
||||
srvChange := serviceChange{changeType: changeTypeFromChange(change), change: change}
|
||||
markService(newNodeServiceTupleFromServiceNode(sn), srvChange)
|
||||
|
||||
case "checks":
|
||||
// For health we only care about the scope for now to know if it's just
|
||||
|
@ -133,14 +151,14 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
} else {
|
||||
// Check changed which means we just need to emit for the linked
|
||||
// service.
|
||||
markService(after.Node, after.ServiceID, after.EnterpriseMeta, nil)
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(after), serviceChangeIndirect)
|
||||
|
||||
// Edge case - if the check with same ID was updated to link to a
|
||||
// different service ID but the old service with old ID still exists,
|
||||
// then the old service instance needs updating too as it has one
|
||||
// fewer checks now.
|
||||
if before.ServiceID != after.ServiceID {
|
||||
markService(before.Node, before.ServiceID, before.EnterpriseMeta, nil)
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(before), serviceChangeIndirect)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,7 +168,7 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
// Node level check
|
||||
markNode(obj.Node, changeIndirect)
|
||||
} else {
|
||||
markService(obj.Node, obj.ServiceID, obj.EnterpriseMeta, nil)
|
||||
markService(newNodeServiceTupleFromServiceHealthCheck(obj), serviceChangeIndirect)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -172,12 +190,12 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
events = append(events, es...)
|
||||
}
|
||||
|
||||
for tuple, change := range serviceChanges {
|
||||
for tuple, srvChange := range serviceChanges {
|
||||
// change may be nil if there was a change that _affected_ the service
|
||||
// like a change to checks but it didn't actually change the service
|
||||
// record itself.
|
||||
if change != nil && change.Deleted() {
|
||||
sn := change.Before.(*structs.ServiceNode)
|
||||
if srvChange.changeType == changeDelete {
|
||||
sn := srvChange.change.Before.(*structs.ServiceNode)
|
||||
e := newServiceHealthEventDeregister(changes.Index, sn)
|
||||
events = append(events, e)
|
||||
continue
|
||||
|
@ -186,9 +204,9 @@ func ServiceHealthEventsFromChanges(tx ReadTxn, changes Changes) ([]stream.Event
|
|||
// Check if this was a service mutation that changed it's name which
|
||||
// requires special handling even if node changed and new events were
|
||||
// already published.
|
||||
if change != nil && change.Updated() {
|
||||
before := change.Before.(*structs.ServiceNode)
|
||||
after := change.After.(*structs.ServiceNode)
|
||||
if srvChange.changeType == changeUpdate {
|
||||
before := srvChange.change.Before.(*structs.ServiceNode)
|
||||
after := srvChange.change.After.(*structs.ServiceNode)
|
||||
|
||||
if before.ServiceName != after.ServiceName {
|
||||
// Service was renamed, the code below will ensure the new registrations
|
||||
|
|
Loading…
Reference in New Issue