mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
258 lines
5.9 KiB
258 lines
5.9 KiB
// Copyright (c) HashiCorp, Inc. |
|
// SPDX-License-Identifier: MPL-2.0 |
|
|
|
package watch |
|
|
|
import ( |
|
"context" |
|
"fmt" |
|
"io" |
|
"log" |
|
"reflect" |
|
"time" |
|
|
|
"github.com/hashicorp/go-hclog" |
|
|
|
consulapi "github.com/hashicorp/consul/api" |
|
) |
|
|
|
const ( |
|
// retryInterval is the base retry value |
|
retryInterval = 5 * time.Second |
|
|
|
// maximum back off time, this is to prevent |
|
// exponential runaway |
|
maxBackoffTime = 180 * time.Second |
|
|
|
// Name used with hclog Logger. We do not add this to the logging package |
|
// because we do not want to pull in the root consul module. |
|
watchLoggerName = "watch" |
|
) |
|
|
|
func (p *Plan) Run(address string) error { |
|
return p.RunWithConfig(address, nil) |
|
} |
|
|
|
// Run is used to run a watch plan |
|
func (p *Plan) RunWithConfig(address string, conf *consulapi.Config) error { |
|
logger := p.Logger |
|
if logger == nil { |
|
logger = newWatchLogger(p.LogOutput) |
|
} |
|
|
|
// Setup the client |
|
p.address = address |
|
if conf == nil { |
|
conf = consulapi.DefaultConfigWithLogger(logger) |
|
} |
|
conf.Address = address |
|
conf.Datacenter = p.Datacenter |
|
conf.Token = p.Token |
|
client, err := consulapi.NewClient(conf) |
|
if err != nil { |
|
return fmt.Errorf("Failed to connect to agent: %v", err) |
|
} |
|
|
|
return p.RunWithClientAndHclog(client, logger) |
|
} |
|
|
|
// RunWithClientAndLogger runs a watch plan using an external client and |
|
// hclog.Logger instance. Using this, the plan's Datacenter, Token and LogOutput |
|
// fields are ignored and the passed client is expected to be configured as |
|
// needed. |
|
func (p *Plan) RunWithClientAndHclog(client *consulapi.Client, logger hclog.Logger) error { |
|
var watchLogger hclog.Logger |
|
if logger == nil { |
|
watchLogger = newWatchLogger(nil) |
|
} else { |
|
watchLogger = logger.Named(watchLoggerName) |
|
} |
|
|
|
p.client = client |
|
|
|
// Loop until we are canceled |
|
failures := 0 |
|
OUTER: |
|
for !p.shouldStop() { |
|
// Invoke the handler |
|
blockParamVal, result, err := p.Watcher(p) |
|
|
|
// Check if we should terminate since the function |
|
// could have blocked for a while |
|
if p.shouldStop() { |
|
break |
|
} |
|
|
|
// Handle an error in the watch function |
|
if err != nil { |
|
// Perform an exponential backoff |
|
failures++ |
|
if blockParamVal == nil { |
|
p.lastParamVal = nil |
|
} else { |
|
p.lastParamVal = blockParamVal.Next(p.lastParamVal) |
|
} |
|
retry := retryInterval * time.Duration(failures*failures) |
|
if retry > maxBackoffTime { |
|
retry = maxBackoffTime |
|
} |
|
watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry) |
|
select { |
|
case <-time.After(retry): |
|
continue OUTER |
|
case <-p.stopCh: |
|
return nil |
|
} |
|
} |
|
|
|
// Clear the failures |
|
failures = 0 |
|
|
|
// If the index is unchanged do nothing |
|
if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) { |
|
continue |
|
} |
|
|
|
// Update the index, look for change |
|
oldParamVal := p.lastParamVal |
|
p.lastParamVal = blockParamVal.Next(oldParamVal) |
|
if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { |
|
continue |
|
} |
|
|
|
// Handle the updated result |
|
p.lastResult = result |
|
// If a hybrid handler exists use that |
|
if p.HybridHandler != nil { |
|
p.HybridHandler(blockParamVal, result) |
|
} else if p.Handler != nil { |
|
idx, ok := blockParamVal.(WaitIndexVal) |
|
if !ok { |
|
watchLogger.Error("Handler only supports index-based " + |
|
" watches but non index-based watch run. Skipping Handler.") |
|
} |
|
p.Handler(uint64(idx), result) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Deprecated: Use RunwithClientAndHclog |
|
func (p *Plan) RunWithClientAndLogger(client *consulapi.Client, logger *log.Logger) error { |
|
|
|
p.client = client |
|
|
|
// Loop until we are canceled |
|
failures := 0 |
|
OUTER: |
|
for !p.shouldStop() { |
|
// Invoke the handler |
|
blockParamVal, result, err := p.Watcher(p) |
|
|
|
// Check if we should terminate since the function |
|
// could have blocked for a while |
|
if p.shouldStop() { |
|
break |
|
} |
|
|
|
// Handle an error in the watch function |
|
if err != nil { |
|
// Perform an exponential backoff |
|
failures++ |
|
if blockParamVal == nil { |
|
p.lastParamVal = nil |
|
} else { |
|
p.lastParamVal = blockParamVal.Next(p.lastParamVal) |
|
} |
|
retry := retryInterval * time.Duration(failures*failures) |
|
if retry > maxBackoffTime { |
|
retry = maxBackoffTime |
|
} |
|
logger.Printf("[ERR] consul.watch: Watch (type: %s) errored: %v, retry in %v", |
|
p.Type, err, retry) |
|
select { |
|
case <-time.After(retry): |
|
continue OUTER |
|
case <-p.stopCh: |
|
return nil |
|
} |
|
} |
|
|
|
// Clear the failures |
|
failures = 0 |
|
|
|
// If the index is unchanged do nothing |
|
if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) { |
|
continue |
|
} |
|
|
|
// Update the index, look for change |
|
oldParamVal := p.lastParamVal |
|
p.lastParamVal = blockParamVal.Next(oldParamVal) |
|
if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) { |
|
continue |
|
} |
|
|
|
// Handle the updated result |
|
p.lastResult = result |
|
// If a hybrid handler exists use that |
|
if p.HybridHandler != nil { |
|
p.HybridHandler(blockParamVal, result) |
|
} else if p.Handler != nil { |
|
idx, ok := blockParamVal.(WaitIndexVal) |
|
if !ok { |
|
logger.Printf("[ERR] consul.watch: Handler only supports index-based " + |
|
" watches but non index-based watch run. Skipping Handler.") |
|
} |
|
p.Handler(uint64(idx), result) |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Stop is used to stop running the watch plan |
|
func (p *Plan) Stop() { |
|
p.stopLock.Lock() |
|
defer p.stopLock.Unlock() |
|
if p.stop { |
|
return |
|
} |
|
p.stop = true |
|
if p.cancelFunc != nil { |
|
p.cancelFunc() |
|
} |
|
close(p.stopCh) |
|
} |
|
|
|
func (p *Plan) shouldStop() bool { |
|
select { |
|
case <-p.stopCh: |
|
return true |
|
default: |
|
return false |
|
} |
|
} |
|
|
|
func (p *Plan) setCancelFunc(cancel context.CancelFunc) { |
|
p.stopLock.Lock() |
|
defer p.stopLock.Unlock() |
|
if p.shouldStop() { |
|
// The watch is stopped and execute the new cancel func to stop watchFactory |
|
cancel() |
|
return |
|
} |
|
p.cancelFunc = cancel |
|
} |
|
|
|
func (p *Plan) IsStopped() bool { |
|
p.stopLock.Lock() |
|
defer p.stopLock.Unlock() |
|
return p.stop |
|
} |
|
|
|
func newWatchLogger(output io.Writer) hclog.Logger { |
|
return hclog.New(&hclog.LoggerOptions{ |
|
Name: watchLoggerName, |
|
Output: output, |
|
}) |
|
}
|
|
|