Backport of Decouple xds capacity controller and raft-autopilot into release/1.15.x (#20548)

* backport of commit 1668a314c6

* backport of commit d33eda97be

* Backport DeliverLatest func.

---------

Co-authored-by: Derek Menteer <derek.menteer@hashicorp.com>
pull/20590/head
hc-github-team-consul-core 10 months ago committed by GitHub
parent ce4d60998c
commit 7e39aa3760
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -0,0 +1,3 @@
```release-note:bug
connect: Remove code coupling where the xDS capacity controller could negatively affect raft autopilot performance.
```

@ -15,6 +15,7 @@ import (
"golang.org/x/time/rate"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/channels"
"github.com/hashicorp/consul/lib/retry"
)
@ -72,7 +73,7 @@ type SessionLimiter interface {
func NewController(cfg Config) *Controller {
return &Controller{
cfg: cfg,
serverCh: make(chan uint32),
serverCh: make(chan uint32, 1),
doneCh: make(chan struct{}),
}
}
@ -109,9 +110,9 @@ func (c *Controller) Run(ctx context.Context) {
// SetServerCount updates the number of healthy servers that is used when
// determining capacity. It is called by the autopilot delegate.
func (c *Controller) SetServerCount(count uint32) {
select {
case c.serverCh <- count:
case <-c.doneCh:
if err := channels.DeliverLatest(count, c.serverCh); err != nil {
// This should not be possible since only one producer should ever exist, but log anyway.
c.cfg.Logger.Error("failed to deliver latest server count to xDS capacity controller")
}
}

@ -0,0 +1,35 @@
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package channels
import "fmt"
// DeliverLatest will drain the channel discarding any messages if there are any and sends the current message.
func DeliverLatest[T any](val T, ch chan T) error {
// Send if chan is empty
select {
case ch <- val:
return nil
default:
}
// If it falls through to here, the channel is not empty.
// Drain the channel.
done := false
for !done {
select {
case <-ch:
continue
default:
done = true
}
}
// Attempt to send again. If it is not empty, throw an error
select {
case ch <- val:
return nil
default:
return fmt.Errorf("failed to deliver latest event: chan full again after draining")
}
}
Loading…
Cancel
Save