diff --git a/agent/grpc-internal/balancer/balancer.go b/agent/grpc-internal/balancer/balancer.go new file mode 100644 index 0000000000..a477a3e2de --- /dev/null +++ b/agent/grpc-internal/balancer/balancer.go @@ -0,0 +1,183 @@ +/* + * + * Copyright 2017 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// NOTICE: This file is a derivative of grpc's grpc/pickfirst.go implementation. + +package balancer + +import ( + "errors" + "fmt" + + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" +) + +// CustomBalancerName is the name of the modified pick_first balancer. +const CustomBalancerName = "pick_first_custom" + +var logger = grpclog.Component("balancer") + +func newPickFirstBuilder() balancer.Builder { + return &pickfirstBuilder{} +} + +type pickfirstBuilder struct{} + +func (*pickfirstBuilder) Build(cc balancer.ClientConn, opt balancer.BuildOptions) balancer.Balancer { + return &pickfirstBalancer{cc: cc} +} + +func (*pickfirstBuilder) Name() string { + return CustomBalancerName +} + +type pickfirstBalancer struct { + state connectivity.State + cc balancer.ClientConn + subConn balancer.SubConn +} + +func (b *pickfirstBalancer) ResolverError(err error) { + if logger.V(2) { + logger.Infof("pickfirstBalancer: ResolverError called with error %v", err) + } + if b.subConn == nil { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("name resolver error: %v", err)}, + }) +} + +func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState) error { + if len(state.ResolverState.Addresses) == 0 { + // The resolver reported an empty address list. Treat it like an error by + // calling b.ResolverError. + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + + // Unlike the pick_first balancer, always tear down existing subConn + if b.subConn != nil { + b.cc.RemoveSubConn(b.subConn) + b.subConn = nil + } + + subConn, err := b.cc.NewSubConn(state.ResolverState.Addresses, balancer.NewSubConnOptions{}) + if err != nil { + if logger.V(2) { + logger.Errorf("pickfirstBalancer: failed to NewSubConn: %v", err) + } + b.state = connectivity.TransientFailure + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.TransientFailure, + Picker: &picker{err: fmt.Errorf("error creating connection: %v", err)}, + }) + return balancer.ErrBadResolverState + } + b.subConn = subConn + b.state = connectivity.Idle + b.cc.UpdateState(balancer.State{ + ConnectivityState: connectivity.Idle, + Picker: &picker{result: balancer.PickResult{SubConn: b.subConn}}, + }) + b.subConn.Connect() + return nil +} + +func (b *pickfirstBalancer) UpdateSubConnState(subConn balancer.SubConn, state balancer.SubConnState) { + if logger.V(2) { + logger.Infof("pickfirstBalancer: UpdateSubConnState: %p, %v", subConn, state) + } + if b.subConn != subConn { + if logger.V(2) { + logger.Infof("pickfirstBalancer: ignored state change because subConn is not recognized") + } + return + } + b.state = state.ConnectivityState + if state.ConnectivityState == connectivity.Shutdown { + b.subConn = nil + return + } + + switch state.ConnectivityState { + case connectivity.Ready: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{result: balancer.PickResult{SubConn: subConn}}, + }) + case connectivity.Connecting: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: balancer.ErrNoSubConnAvailable}, + }) + case connectivity.Idle: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &idlePicker{subConn: subConn}, + }) + case connectivity.TransientFailure: + b.cc.UpdateState(balancer.State{ + ConnectivityState: state.ConnectivityState, + Picker: &picker{err: state.ConnectionError}, + }) + } +} + +func (b *pickfirstBalancer) Close() { +} + +func (b *pickfirstBalancer) ExitIdle() { + if b.subConn != nil && b.state == connectivity.Idle { + b.subConn.Connect() + } +} + +type picker struct { + result balancer.PickResult + err error +} + +func (p *picker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + return p.result, p.err +} + +// idlePicker is used when the SubConn is IDLE and kicks the SubConn into +// CONNECTING when Pick is called. +type idlePicker struct { + subConn balancer.SubConn +} + +func (i *idlePicker) Pick(balancer.PickInfo) (balancer.PickResult, error) { + i.subConn.Connect() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} + +func init() { + balancer.Register(newPickFirstBuilder()) +} diff --git a/agent/grpc-internal/client.go b/agent/grpc-internal/client.go index 18596eec71..c7c8b08a9d 100644 --- a/agent/grpc-internal/client.go +++ b/agent/grpc-internal/client.go @@ -13,6 +13,8 @@ import ( "github.com/armon/go-metrics" + _ "github.com/hashicorp/consul/agent/grpc-internal/balancer" + agentmiddleware "github.com/hashicorp/consul/agent/grpc-middleware" "github.com/hashicorp/consul/agent/metadata" "github.com/hashicorp/consul/agent/pool" @@ -134,6 +136,7 @@ func (c *ClientConnPool) dial(datacenter string, serverType string) (*grpc.Clien grpc.WithContextDialer(c.dialer), grpc.WithDisableRetry(), grpc.WithStatsHandler(agentmiddleware.NewStatsHandler(metrics.Default(), metricsLabels)), + grpc.WithDefaultServiceConfig(`{"loadBalancingPolicy":"custom"}`), // Keep alive parameters are based on the same default ones we used for // Yamux. These are somewhat arbitrary but we did observe in scale testing // that the gRPC defaults (servers send keepalives only every 2 hours, diff --git a/agent/grpc-internal/resolver/resolver.go b/agent/grpc-internal/resolver/resolver.go index cf34c990f1..d1f195cb25 100644 --- a/agent/grpc-internal/resolver/resolver.go +++ b/agent/grpc-internal/resolver/resolver.go @@ -271,16 +271,16 @@ func (r *serverResolver) updateAddrs(addrs []resolver.Address) { // updateAddrsLocked updates this serverResolver's ClientConn to use the given // set of addrs. addrLock must be held by caller. func (r *serverResolver) updateAddrsLocked(addrs []resolver.Address) { - // Only pass the first address initially, which will cause the - // balancer to spin down the connection for its previous first address - // if it is different. If we don't do this, it will keep using the old - // first address as long as it is still in the list, making it impossible to - // rebalance until that address is removed. - var firstAddr []resolver.Address - if len(addrs) > 0 { - firstAddr = []resolver.Address{addrs[0]} - } - r.clientConn.UpdateState(resolver.State{Addresses: firstAddr}) + // // Only pass the first address initially, which will cause the + // // balancer to spin down the connection for its previous first address + // // if it is different. If we don't do this, it will keep using the old + // // first address as long as it is still in the list, making it impossible to + // // rebalance until that address is removed. + // var firstAddr []resolver.Address + // if len(addrs) > 0 { + // firstAddr = []resolver.Address{addrs[0]} + // } + // r.clientConn.UpdateState(resolver.State{Addresses: firstAddr}) // Call UpdateState again with the entire list of addrs in case we need them // for failover.