|
|
|
@ -6,16 +6,16 @@ package hcp
|
|
|
|
|
import (
|
|
|
|
|
"context"
|
|
|
|
|
"fmt"
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/go-hclog"
|
|
|
|
|
|
|
|
|
|
"github.com/hashicorp/consul/internal/hcp/internal/types"
|
|
|
|
|
"github.com/hashicorp/consul/lib/retry"
|
|
|
|
|
pbhcp "github.com/hashicorp/consul/proto-public/pbhcp/v2"
|
|
|
|
|
"github.com/hashicorp/consul/proto-public/pbresource"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
var linkWatchRetryCount = 10
|
|
|
|
|
|
|
|
|
|
func NewLinkWatch(ctx context.Context, logger hclog.Logger, client pbresource.ResourceServiceClient) (chan *pbresource.WatchEvent, error) {
|
|
|
|
|
watchClient, err := client.WatchList(
|
|
|
|
|
ctx, &pbresource.WatchListRequest{
|
|
|
|
@ -29,7 +29,11 @@ func NewLinkWatch(ctx context.Context, logger hclog.Logger, client pbresource.Re
|
|
|
|
|
|
|
|
|
|
eventCh := make(chan *pbresource.WatchEvent)
|
|
|
|
|
go func() {
|
|
|
|
|
errorCounter := 0
|
|
|
|
|
errorBackoff := &retry.Waiter{
|
|
|
|
|
MinFailures: 1,
|
|
|
|
|
MinWait: 1 * time.Second,
|
|
|
|
|
MaxWait: 1 * time.Minute,
|
|
|
|
|
}
|
|
|
|
|
for {
|
|
|
|
|
select {
|
|
|
|
|
case <-ctx.Done():
|
|
|
|
@ -38,19 +42,14 @@ func NewLinkWatch(ctx context.Context, logger hclog.Logger, client pbresource.Re
|
|
|
|
|
return
|
|
|
|
|
default:
|
|
|
|
|
watchEvent, err := watchClient.Recv()
|
|
|
|
|
|
|
|
|
|
if err != nil {
|
|
|
|
|
logger.Error("error receiving link watch event", "error", err)
|
|
|
|
|
|
|
|
|
|
errorCounter++
|
|
|
|
|
if errorCounter >= linkWatchRetryCount {
|
|
|
|
|
logger.Error("received multiple consecutive errors from link watch client, will stop watching link")
|
|
|
|
|
close(eventCh)
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
errorBackoff.Wait(ctx)
|
|
|
|
|
continue
|
|
|
|
|
}
|
|
|
|
|
errorCounter = 0
|
|
|
|
|
|
|
|
|
|
errorBackoff.Reset()
|
|
|
|
|
eventCh <- watchEvent
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|