diff --git a/agent/grpc-internal/balancer/custombalancer.go b/agent/grpc-internal/balancer/custombalancer.go new file mode 100644 index 0000000000..d689f14b5d --- /dev/null +++ b/agent/grpc-internal/balancer/custombalancer.go @@ -0,0 +1,91 @@ +package balancer + +import ( + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/resolver" +) + +func init() { + balancer.Register(newCustomPickfirstBuilder()) +} + +// CustomBalancerName is the name of the modified pick_first balancer. +const CustomBalancerName = "pick_first_custom" + +var logger = grpclog.Component("balancer") + +func newCustomPickfirstBuilder() balancer.Builder { + return &customPickfirstBuilder{} +} + +type customPickfirstBuilder struct{} + +func (*customPickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { + return &customPickfirstBalancer{ + pickfirstBalancer: pickfirstBalancer{cc: cc}, + } +} + +func (*customPickfirstBuilder) Name() string { + return CustomBalancerName +} + +type customPickfirstBalancer struct { + pickfirstBalancer + + activeAddr resolver.Address +} + +func (b *customPickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + for _, a := range state.ResolverState.Addresses { + // This hack preserves an existing behavior in our client-side + // load balancing where if the first address in a shuffled list + // of addresses matched the currently connected address, it would + // be an effective no-op. + if a.Equal(b.activeAddr) { + return nil + } + + // Attempt to make a new SubConn with a single address so we can + // track a successful connection explicitly. If we were to pass + // a list of addresses, we cannot assume the first address was + // successful and there is no way to extract the connected address. + sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{}) + if err != nil { + logger.Warningf("balancer.customPickfirstBalancer: failed to create new SubConn: %v", err) + continue + } + + if b.subConn != nil { + b.cc.RemoveSubConn(b.subConn) + } + + // Copy-pasted from pickfirstBalancer.UpdateClientConnState. + { + b.subConn = sc + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + }) + b.subConn.Connect() + } + + b.activeAddr = a + + // We now have a new subConn with one address. + // Break the loop and call UpdateClientConnState + // with the full set of addresses. + break + } + + // This will load the full set of addresses but leave the + // newly created subConn alone. + return b.pickfirstBalancer.UpdateClientConnState(state) +} + +func init() { + balancer.Register(newCustomPickfirstBuilder()) +} diff --git a/agent/grpc-internal/balancer/balancer.go b/agent/grpc-internal/balancer/pickfirst.go similarity index 90% rename from agent/grpc-internal/balancer/balancer.go rename to agent/grpc-internal/balancer/pickfirst.go index a477a3e2de..82f18337c0 100644 --- a/agent/grpc-internal/balancer/balancer.go +++ b/agent/grpc-internal/balancer/pickfirst.go @@ -17,6 +17,7 @@ */ // NOTICE: This file is a derivative of grpc's grpc/pickfirst.go implementation. +// It is preserved as-is with the init() removed for easier updating. package balancer @@ -26,15 +27,12 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/connectivity" - "google.golang.org/grpc/grpclog" ) -// CustomBalancerName is the name of the modified pick_first balancer. -const CustomBalancerName = "pick_first_custom" +// PickFirstBalancerName is the name of the pick_first balancer. +const PickFirstBalancerName = "pick_first" -var logger = grpclog.Component("balancer") - -func newPickFirstBuilder() balancer.Builder { +func newPickfirstBuilder() balancer.Builder { return &pickfirstBuilder{} } @@ -45,7 +43,7 @@ func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions } func (*pickfirstBuilder) Name() string { - return CustomBalancerName + return PickFirstBalancerName } type pickfirstBalancer struct { @@ -77,14 +75,19 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState if len(state.ResolverState.Addresses) == 0 { // The resolver reported an empty address list. Treat it like an error by // calling b.ResolverError. + if b.subConn != nil { + // Remove the old subConn. All addresses were removed, so it is no longer + // valid. + b.cc.RemoveSubConn(b.subConn) + b.subConn = nil + } b.ResolverError(errors.New("produced zero addresses")) return balancer.ErrBadResolverState } - // Unlike the pick_first balancer, always tear down existing subConn if b.subConn != nil { - b.cc.RemoveSubConn(b.subConn) - b.subConn = nil + b.cc.UpdateAddresses(b.subConn, state.ResolverState.Addresses) + return nil } subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) @@ -177,7 +180,3 @@ func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { i.subConn.Connect() return balancer.PickResult{}, balancer.ErrNoSubConnAvailable } - -func init() { - balancer.Register(newPickFirstBuilder()) -} diff --git a/agent/grpc-internal/client.go b/agent/grpc-internal/client.go index c7c8b08a9d..9a1e8402a7 100644 --- a/agent/grpc-internal/client.go +++ b/agent/grpc-internal/client.go @@ -136,7 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)), - grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"custom"}`), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"pick_first_custom"}`), // Keep alive parameters are based on the same default ones we used for // Yamux. These are somewhat arbitrary but we did observe in scale testing // that the gRPC defaults (servers send keepalives only every 2 hours,