Add custom balancer to always remove subConns

pull/15701/head
Chris S. Kim 2 years ago
parent 252a08e903
commit c37343dfcc

@ -0,0 +1,183 @@
/*
*
* Copyright 2017 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
// NOTICE: This file is a derivative of grpc's grpc/pickfirst.go implementation.
package balancer
import (
"errors"
"fmt"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
)
// CustomBalancerName is the name of the modified pick_first balancer.
const CustomBalancerName = "pick_first_custom"
var logger = grpclog.Component("balancer")
func newPickFirstBuilder() balancer.Builder {
return &pickfirstBuilder{}
}
type pickfirstBuilder struct{}
func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &pickfirstBalancer{cc: cc}
}
func (*pickfirstBuilder) Name() string {
return CustomBalancerName
}
type pickfirstBalancer struct {
state connectivity.State
cc balancer.ClientConn
subConn balancer.SubConn
}
func (b *pickfirstBalancer) ResolverError(err error) {
if logger.V(2) {
logger.Infof("pickfirstBalancer: ResolverError called with error %v", err)
}
if b.subConn == nil {
b.state = connectivity.TransientFailure
}
if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)},
})
}
func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
if len(state.ResolverState.Addresses) == 0 {
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
// Unlike the pick_first balancer, always tear down existing subConn
if b.subConn != nil {
b.cc.RemoveSubConn(b.subConn)
b.subConn = nil
}
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
if err != nil {
if logger.V(2) {
logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err)
}
b.state = connectivity.TransientFailure
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.TransientFailure,
Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)},
})
return balancer.ErrBadResolverState
}
b.subConn = subConn
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
})
b.subConn.Connect()
return nil
}
func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) {
if logger.V(2) {
logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state)
}
if b.subConn != subConn {
if logger.V(2) {
logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized")
}
return
}
b.state = state.ConnectivityState
if state.ConnectivityState == connectivity.Shutdown {
b.subConn = nil
return
}
switch state.ConnectivityState {
case connectivity.Ready:
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{result: balancer.PickResult{SubConn: subConn}},
})
case connectivity.Connecting:
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{err: balancer.ErrNoSubConnAvailable},
})
case connectivity.Idle:
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &idlePicker{subConn: subConn},
})
case connectivity.TransientFailure:
b.cc.UpdateState(balancer.State{
ConnectivityState: state.ConnectivityState,
Picker: &picker{err: state.ConnectionError},
})
}
}
func (b *pickfirstBalancer) Close() {
}
func (b *pickfirstBalancer) ExitIdle() {
if b.subConn != nil && b.state == connectivity.Idle {
b.subConn.Connect()
}
}
type picker struct {
result balancer.PickResult
err error
}
func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
return p.result, p.err
}
// idlePicker is used when the SubConn is IDLE and kicks the SubConn into
// CONNECTING when Pick is called.
type idlePicker struct {
subConn balancer.SubConn
}
func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
}
func init() {
balancer.Register(newPickFirstBuilder())
}

@ -13,6 +13,8 @@ import (
"github.com/armon/go-metrics"
_ "github.com/hashicorp/consul/agent/grpc-internal/balancer"
agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware"
"github.com/hashicorp/consul/agent/metadata"
"github.com/hashicorp/consul/agent/pool"
@ -134,6 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(),
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"custom"}`),
// Keep alive parameters are based on the same default ones we used for
// Yamux. These are somewhat arbitrary but we did observe in scale testing
// that the gRPC defaults (servers send keepalives only every 2 hours,

@ -271,16 +271,16 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) {
// updateAddrsLocked updates this serverResolver's ClientConn to use the given
// set of addrs. addrLock must be held by caller.
func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) {
// Only pass the first address initially, which will cause the
// balancer to spin down the connection for its previous first address
// if it is different. If we don't do this, it will keep using the old
// first address as long as it is still in the list, making it impossible to
// rebalance until that address is removed.
var firstAddr []resolver.Address
if len(addrs) > 0 {
firstAddr = []resolver.Address{addrs[0]}
}
r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
// // Only pass the first address initially, which will cause the
// // balancer to spin down the connection for its previous first address
// // if it is different. If we don't do this, it will keep using the old
// // first address as long as it is still in the list, making it impossible to
// // rebalance until that address is removed.
// var firstAddr []resolver.Address
// if len(addrs) > 0 {
// firstAddr = []resolver.Address{addrs[0]}
// }
// r.clientConn.UpdateState(resolver.State{Addresses: firstAddr})
// Call UpdateState again with the entire list of addrs in case we need them
// for failover.

Loading…
Cancel
Save