Refactor to embed pickfirstBalancer

pull/15701/head
Chris S. Kim 2022-12-07 12:25:31 -05:00
parent c37343dfcc
commit 27a0ffb6e8
3 changed files with 105 additions and 15 deletions

View File

@ -0,0 +1,91 @@
package balancer
import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/resolver"
)
func init() {
balancer.Register(newCustomPickfirstBuilder())
}
// CustomBalancerName is the name of the modified pick_first balancer.
const CustomBalancerName = "pick_first_custom"
var logger = grpclog.Component("balancer")
func newCustomPickfirstBuilder() balancer.Builder {
return &customPickfirstBuilder{}
}
type customPickfirstBuilder struct{}
func (*customPickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer {
return &customPickfirstBalancer{
pickfirstBalancer: pickfirstBalancer{cc: cc},
}
}
func (*customPickfirstBuilder) Name() string {
return CustomBalancerName
}
type customPickfirstBalancer struct {
pickfirstBalancer
activeAddr resolver.Address
}
func (b *customPickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error {
for _, a := range state.ResolverState.Addresses {
// This hack preserves an existing behavior in our client-side
// load balancing where if the first address in a shuffled list
// of addresses matched the currently connected address, it would
// be an effective no-op.
if a.Equal(b.activeAddr) {
return nil
}
// Attempt to make a new SubConn with a single address so we can
// track a successful connection explicitly. If we were to pass
// a list of addresses, we cannot assume the first address was
// successful and there is no way to extract the connected address.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{})
if err != nil {
logger.Warningf("balancer.customPickfirstBalancer: failed to create new SubConn: %v", err)
continue
}
if b.subConn != nil {
b.cc.RemoveSubConn(b.subConn)
}
// Copy-pasted from pickfirstBalancer.UpdateClientConnState.
{
b.subConn = sc
b.state = connectivity.Idle
b.cc.UpdateState(balancer.State{
ConnectivityState: connectivity.Idle,
Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}},
})
b.subConn.Connect()
}
b.activeAddr = a
// We now have a new subConn with one address.
// Break the loop and call UpdateClientConnState
// with the full set of addresses.
break
}
// This will load the full set of addresses but leave the
// newly created subConn alone.
return b.pickfirstBalancer.UpdateClientConnState(state)
}
func init() {
balancer.Register(newCustomPickfirstBuilder())
}

View File

@ -17,6 +17,7 @@
*/ */
// NOTICE: This file is a derivative of grpc's grpc/pickfirst.go implementation. // NOTICE: This file is a derivative of grpc's grpc/pickfirst.go implementation.
// It is preserved as-is with the init() removed for easier updating.
package balancer package balancer
@ -26,15 +27,12 @@ import (
"google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity" "google.golang.org/grpc/connectivity"
"google.golang.org/grpc/grpclog"
) )
// CustomBalancerName is the name of the modified pick_first balancer. // PickFirstBalancerName is the name of the pick_first balancer.
const CustomBalancerName = "pick_first_custom" const PickFirstBalancerName = "pick_first"
var logger = grpclog.Component("balancer") func newPickfirstBuilder() balancer.Builder {
func newPickFirstBuilder() balancer.Builder {
return &pickfirstBuilder{} return &pickfirstBuilder{}
} }
@ -45,7 +43,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions
} }
func (*pickfirstBuilder) Name() string { func (*pickfirstBuilder) Name() string {
return CustomBalancerName return PickFirstBalancerName
} }
type pickfirstBalancer struct { type pickfirstBalancer struct {
@ -77,14 +75,19 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
if len(state.ResolverState.Addresses) == 0 { if len(state.ResolverState.Addresses) == 0 {
// The resolver reported an empty address list. Treat it like an error by // The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError. // calling b.ResolverError.
if b.subConn != nil {
// Remove the old subConn. All addresses were removed, so it is no longer
// valid.
b.cc.RemoveSubConn(b.subConn)
b.subConn = nil
}
b.ResolverError(errors.New("produced zero addresses")) b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState return balancer.ErrBadResolverState
} }
// Unlike the pick_first balancer, always tear down existing subConn
if b.subConn != nil { if b.subConn != nil {
b.cc.RemoveSubConn(b.subConn) b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses)
b.subConn = nil return nil
} }
subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{})
@ -177,7 +180,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
i.subConn.Connect() i.subConn.Connect()
return balancer.PickResult{}, balancer.ErrNoSubConnAvailable return balancer.PickResult{}, balancer.ErrNoSubConnAvailable
} }
func init() {
balancer.Register(newPickFirstBuilder())
}

View File

@ -136,7 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien
grpc.WithContextDialer(c.dialer), grpc.WithContextDialer(c.dialer),
grpc.WithDisableRetry(), grpc.WithDisableRetry(),
grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)), grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)),
grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"custom"}`), grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first_custom"}`),
// Keep alive parameters are based on the same default ones we used for // Keep alive parameters are based on the same default ones we used for
// Yamux. These are somewhat arbitrary but we did observe in scale testing // Yamux. These are somewhat arbitrary but we did observe in scale testing
// that the gRPC defaults (servers send keepalives only every 2 hours, // that the gRPC defaults (servers send keepalives only every 2 hours,