From 2b474be647b280226cb4f49dc570f6072541b3b5 Mon Sep 17 00:00:00 2001 From: Vijay Srinivas Date: Wed, 27 Mar 2024 14:34:02 +0530 Subject: [PATCH] initial updates for all dcs --- api/watch/plan.go | 130 +++++++++++++++++++++++++++++++++------------ api/watch/watch.go | 10 ++-- 2 files changed, 101 insertions(+), 39 deletions(-) diff --git a/api/watch/plan.go b/api/watch/plan.go index a3588ff184..07f9243218 100644 --- a/api/watch/plan.go +++ b/api/watch/plan.go @@ -72,36 +72,59 @@ func (p *Plan) RunWithClientAndHclog(client *consulapi.Client, logger hclog.Logg // Loop until we are canceled failures := 0 + + p.mapLastParamVal = make(map[string]BlockingParamVal) + p.mapLastResult = make(map[string]interface{}) OUTER: for !p.shouldStop() { - // Invoke the handler - blockParamVal, result, err := p.Watcher(p) - - // Check if we should terminate since the function - // could have blocked for a while - if p.shouldStop() { - break + dcs := []string{} + blockParamVal := make(map[string]BlockingParamVal) + result := make(map[string]interface{}) + var err error + if p.Datacenter == "all" { + catalog := p.client.Catalog() + dcs, err = catalog.Datacenters() + if err != nil || len(dcs) == 0 { + dcs[0] = "" //This will cause to use default DataCenter if err + } + } else { + dcs[0] = p.Datacenter } - // Handle an error in the watch function - if err != nil { - // Perform an exponential backoff - failures++ - if blockParamVal == nil { - p.lastParamVal = nil - } else { - p.lastParamVal = blockParamVal.Next(p.lastParamVal) + for _, dc := range dcs { + p.Datacenter = dc + + // Invoke the handler + blockParamVal[dc], result[dc], err = p.Watcher(p) + + // Check if we should terminate since the function + // could have blocked for a while + if p.shouldStop() { + break OUTER } - retry := retryInterval * time.Duration(failures*failures) - if retry > maxBackoffTime { - retry = maxBackoffTime - } - watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry) - select { - case <-time.After(retry): - continue OUTER - case <-p.stopCh: - return nil + + // Handle an error in the watch function + if err != nil { + // Perform an exponential backoff + failures++ + if blockParamVal[dc] == nil { + // p.lastParamVal = nil + p.mapLastParamVal[dc] = nil + } else { + // p.lastParamVal = blockParamVal.Next(p.lastParamVal) + p.mapLastParamVal[dc] = blockParamVal[dc].Next(p.mapLastParamVal[dc]) + } + retry := retryInterval * time.Duration(failures*failures) + if retry > maxBackoffTime { + retry = maxBackoffTime + } + watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry) + select { + case <-time.After(retry): + continue OUTER + case <-p.stopCh: + return nil + } } } @@ -109,29 +132,66 @@ OUTER: failures = 0 // If the index is unchanged do nothing - if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) { - continue + // if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) { + // continue + // } + noChange := true + for dc, lastParamVal := range p.mapLastParamVal { + if lastParamVal != nil && !lastParamVal.Equal(blockParamVal[dc]) { + noChange = false + break + } + } + if noChange && len(p.mapLastParamVal) > 0 { + continue OUTER } // Update the index, look for change - oldParamVal := p.lastParamVal - p.lastParamVal = blockParamVal.Next(oldParamVal) - if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { - continue + // oldParamVal := p.lastParamVal + // p.lastParamVal = blockParamVal.Next(oldParamVal) + // if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { + // continue + // } + noChange = true + for _, dc := range dcs { + oldParamVal, ok := p.mapLastParamVal[dc] + if !ok { + oldParamVal = nil + } + p.mapLastParamVal[dc] = blockParamVal[dc].Next(oldParamVal) + if oldParamVal != nil { + if !reflect.DeepEqual(p.mapLastResult[dc], result[dc]) { + noChange = false + } + } else { + noChange = false + } + } + if noChange { + continue OUTER } // Handle the updated result - p.lastResult = result + var totResult []interface{} + for dc, rdc := range result { + p.mapLastResult[dc] = rdc + if r, ok := rdc.([]interface{}); ok { + totResult = append(totResult, r...) + } else { + totResult = append(totResult, r) + } + } + // If a hybrid handler exists use that if p.HybridHandler != nil { - p.HybridHandler(blockParamVal, result) + p.HybridHandler(blockParamVal[dcs[0]], totResult) } else if p.Handler != nil { - idx, ok := blockParamVal.(WaitIndexVal) + idx, ok := blockParamVal[dcs[0]].(WaitIndexVal) if !ok { watchLogger.Error("Handler only supports index-based " + " watches but non index-based watch run. Skipping Handler.") } - p.Handler(uint64(idx), result) + p.Handler(uint64(idx), totResult) } } return nil diff --git a/api/watch/watch.go b/api/watch/watch.go index ea00f8ef0c..17181873be 100644 --- a/api/watch/watch.go +++ b/api/watch/watch.go @@ -38,10 +38,12 @@ type Plan struct { // Deprecated: use Logger LogOutput io.Writer - address string - client *consulapi.Client - lastParamVal BlockingParamVal - lastResult interface{} + address string + client *consulapi.Client + lastParamVal BlockingParamVal + lastResult interface{} + mapLastParamVal map[string]BlockingParamVal + mapLastResult map[string]interface{} stop bool stopCh chan struct{}