mirror of https://github.com/hashicorp/consul
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
197 lines
5.2 KiB
197 lines
5.2 KiB
/* |
|
* |
|
* Copyright 2017 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
package grpc |
|
|
|
import ( |
|
"context" |
|
"io" |
|
"sync" |
|
|
|
"google.golang.org/grpc/balancer" |
|
"google.golang.org/grpc/codes" |
|
"google.golang.org/grpc/grpclog" |
|
"google.golang.org/grpc/internal/channelz" |
|
"google.golang.org/grpc/internal/transport" |
|
"google.golang.org/grpc/status" |
|
) |
|
|
|
// pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick |
|
// actions and unblock when there's a picker update. |
|
type pickerWrapper struct { |
|
mu sync.Mutex |
|
done bool |
|
blockingCh chan struct{} |
|
picker balancer.Picker |
|
|
|
// The latest connection happened. |
|
connErrMu sync.Mutex |
|
connErr error |
|
} |
|
|
|
func newPickerWrapper() *pickerWrapper { |
|
bp := &pickerWrapper{blockingCh: make(chan struct{})} |
|
return bp |
|
} |
|
|
|
func (bp *pickerWrapper) updateConnectionError(err error) { |
|
bp.connErrMu.Lock() |
|
bp.connErr = err |
|
bp.connErrMu.Unlock() |
|
} |
|
|
|
func (bp *pickerWrapper) connectionError() error { |
|
bp.connErrMu.Lock() |
|
err := bp.connErr |
|
bp.connErrMu.Unlock() |
|
return err |
|
} |
|
|
|
// updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. |
|
func (bp *pickerWrapper) updatePicker(p balancer.Picker) { |
|
bp.mu.Lock() |
|
if bp.done { |
|
bp.mu.Unlock() |
|
return |
|
} |
|
bp.picker = p |
|
// bp.blockingCh should never be nil. |
|
close(bp.blockingCh) |
|
bp.blockingCh = make(chan struct{}) |
|
bp.mu.Unlock() |
|
} |
|
|
|
func doneChannelzWrapper(acw *acBalancerWrapper, done func(balancer.DoneInfo)) func(balancer.DoneInfo) { |
|
acw.mu.Lock() |
|
ac := acw.ac |
|
acw.mu.Unlock() |
|
ac.incrCallsStarted() |
|
return func(b balancer.DoneInfo) { |
|
if b.Err != nil && b.Err != io.EOF { |
|
ac.incrCallsFailed() |
|
} else { |
|
ac.incrCallsSucceeded() |
|
} |
|
if done != nil { |
|
done(b) |
|
} |
|
} |
|
} |
|
|
|
// pick returns the transport that will be used for the RPC. |
|
// It may block in the following cases: |
|
// - there's no picker |
|
// - the current picker returns ErrNoSubConnAvailable |
|
// - the current picker returns other errors and failfast is false. |
|
// - the subConn returned by the current picker is not READY |
|
// When one of these situations happens, pick blocks until the picker gets updated. |
|
func (bp *pickerWrapper) pick(ctx context.Context, failfast bool, opts balancer.PickOptions) (transport.ClientTransport, func(balancer.DoneInfo), error) { |
|
var ch chan struct{} |
|
|
|
for { |
|
bp.mu.Lock() |
|
if bp.done { |
|
bp.mu.Unlock() |
|
return nil, nil, ErrClientConnClosing |
|
} |
|
|
|
if bp.picker == nil { |
|
ch = bp.blockingCh |
|
} |
|
if ch == bp.blockingCh { |
|
// This could happen when either: |
|
// - bp.picker is nil (the previous if condition), or |
|
// - has called pick on the current picker. |
|
bp.mu.Unlock() |
|
select { |
|
case <-ctx.Done(): |
|
if connectionErr := bp.connectionError(); connectionErr != nil { |
|
switch ctx.Err() { |
|
case context.DeadlineExceeded: |
|
return nil, nil, status.Errorf(codes.DeadlineExceeded, "latest connection error: %v", connectionErr) |
|
case context.Canceled: |
|
return nil, nil, status.Errorf(codes.Canceled, "latest connection error: %v", connectionErr) |
|
} |
|
} |
|
return nil, nil, ctx.Err() |
|
case <-ch: |
|
} |
|
continue |
|
} |
|
|
|
ch = bp.blockingCh |
|
p := bp.picker |
|
bp.mu.Unlock() |
|
|
|
subConn, done, err := p.Pick(ctx, opts) |
|
|
|
if err != nil { |
|
switch err { |
|
case balancer.ErrNoSubConnAvailable: |
|
continue |
|
case balancer.ErrTransientFailure: |
|
if !failfast { |
|
continue |
|
} |
|
return nil, nil, status.Errorf(codes.Unavailable, "%v, latest connection error: %v", err, bp.connectionError()) |
|
case context.DeadlineExceeded: |
|
return nil, nil, status.Error(codes.DeadlineExceeded, err.Error()) |
|
case context.Canceled: |
|
return nil, nil, status.Error(codes.Canceled, err.Error()) |
|
default: |
|
if _, ok := status.FromError(err); ok { |
|
return nil, nil, err |
|
} |
|
// err is some other error. |
|
return nil, nil, status.Error(codes.Unknown, err.Error()) |
|
} |
|
} |
|
|
|
acw, ok := subConn.(*acBalancerWrapper) |
|
if !ok { |
|
grpclog.Error("subconn returned from pick is not *acBalancerWrapper") |
|
continue |
|
} |
|
if t, ok := acw.getAddrConn().getReadyTransport(); ok { |
|
if channelz.IsOn() { |
|
return t, doneChannelzWrapper(acw, done), nil |
|
} |
|
return t, done, nil |
|
} |
|
if done != nil { |
|
// Calling done with nil error, no bytes sent and no bytes received. |
|
// DoneInfo with default value works. |
|
done(balancer.DoneInfo{}) |
|
} |
|
grpclog.Infof("blockingPicker: the picked transport is not ready, loop back to repick") |
|
// If ok == false, ac.state is not READY. |
|
// A valid picker always returns READY subConn. This means the state of ac |
|
// just changed, and picker will be updated shortly. |
|
// continue back to the beginning of the for loop to repick. |
|
} |
|
} |
|
|
|
func (bp *pickerWrapper) close() { |
|
bp.mu.Lock() |
|
defer bp.mu.Unlock() |
|
if bp.done { |
|
return |
|
} |
|
bp.done = true |
|
close(bp.blockingCh) |
|
}
|
|
|