initial updates for all dcs

pull/20908/head
Vijay Srinivas 2024-03-27 14:34:02 +05:30
parent 20210a8d86
commit 2b474be647
2 changed files with 101 additions and 39 deletions

View File

@ -72,36 +72,59 @@ func (p *Plan) RunWithClientAndHclog(client *consulapi.Client, logger hclog.Logg
// Loop until we are canceled
failures := 0
p.mapLastParamVal = make(map[string]BlockingParamVal)
p.mapLastResult = make(map[string]interface{})
OUTER:
for !p.shouldStop() {
// Invoke the handler
blockParamVal, result, err := p.Watcher(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
dcs := []string{}
blockParamVal := make(map[string]BlockingParamVal)
result := make(map[string]interface{})
var err error
if p.Datacenter == "all" {
catalog := p.client.Catalog()
dcs, err = catalog.Datacenters()
if err != nil || len(dcs) == 0 {
dcs[0] = "" //This will cause to use default DataCenter if err
}
} else {
dcs[0] = p.Datacenter
}
// Handle an error in the watch function
if err != nil {
// Perform an exponential backoff
failures++
if blockParamVal == nil {
p.lastParamVal = nil
} else {
p.lastParamVal = blockParamVal.Next(p.lastParamVal)
for _, dc := range dcs {
p.Datacenter = dc
// Invoke the handler
blockParamVal[dc], result[dc], err = p.Watcher(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break OUTER
}
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry)
select {
case <-time.After(retry):
continue OUTER
case <-p.stopCh:
return nil
// Handle an error in the watch function
if err != nil {
// Perform an exponential backoff
failures++
if blockParamVal[dc] == nil {
// p.lastParamVal = nil
p.mapLastParamVal[dc] = nil
} else {
// p.lastParamVal = blockParamVal.Next(p.lastParamVal)
p.mapLastParamVal[dc] = blockParamVal[dc].Next(p.mapLastParamVal[dc])
}
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry)
select {
case <-time.After(retry):
continue OUTER
case <-p.stopCh:
return nil
}
}
}
@ -109,29 +132,66 @@ OUTER:
failures = 0
// If the index is unchanged do nothing
if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) {
continue
// if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) {
// continue
// }
noChange := true
for dc, lastParamVal := range p.mapLastParamVal {
if lastParamVal != nil && !lastParamVal.Equal(blockParamVal[dc]) {
noChange = false
break
}
}
if noChange && len(p.mapLastParamVal) > 0 {
continue OUTER
}
// Update the index, look for change
oldParamVal := p.lastParamVal
p.lastParamVal = blockParamVal.Next(oldParamVal)
if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) {
continue
// oldParamVal := p.lastParamVal
// p.lastParamVal = blockParamVal.Next(oldParamVal)
// if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) {
// continue
// }
noChange = true
for _, dc := range dcs {
oldParamVal, ok := p.mapLastParamVal[dc]
if !ok {
oldParamVal = nil
}
p.mapLastParamVal[dc] = blockParamVal[dc].Next(oldParamVal)
if oldParamVal != nil {
if !reflect.DeepEqual(p.mapLastResult[dc], result[dc]) {
noChange = false
}
} else {
noChange = false
}
}
if noChange {
continue OUTER
}
// Handle the updated result
p.lastResult = result
var totResult []interface{}
for dc, rdc := range result {
p.mapLastResult[dc] = rdc
if r, ok := rdc.([]interface{}); ok {
totResult = append(totResult, r...)
} else {
totResult = append(totResult, r)
}
}
// If a hybrid handler exists use that
if p.HybridHandler != nil {
p.HybridHandler(blockParamVal, result)
p.HybridHandler(blockParamVal[dcs[0]], totResult)
} else if p.Handler != nil {
idx, ok := blockParamVal.(WaitIndexVal)
idx, ok := blockParamVal[dcs[0]].(WaitIndexVal)
if !ok {
watchLogger.Error("Handler only supports index-based " +
" watches but non index-based watch run. Skipping Handler.")
}
p.Handler(uint64(idx), result)
p.Handler(uint64(idx), totResult)
}
}
return nil

View File

@ -38,10 +38,12 @@ type Plan struct {
// Deprecated: use Logger
LogOutput io.Writer
address string
client *consulapi.Client
lastParamVal BlockingParamVal
lastResult interface{}
address string
client *consulapi.Client
lastParamVal BlockingParamVal
lastResult interface{}
mapLastParamVal map[string]BlockingParamVal
mapLastResult map[string]interface{}
stop bool
stopCh chan struct{}