mirror of https://github.com/k3s-io/k3s
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
159 lines
5.1 KiB
159 lines
5.1 KiB
/* |
|
* |
|
* Copyright 2017 gRPC authors. |
|
* |
|
* Licensed under the Apache License, Version 2.0 (the "License"); |
|
* you may not use this file except in compliance with the License. |
|
* You may obtain a copy of the License at |
|
* |
|
* http://www.apache.org/licenses/LICENSE-2.0 |
|
* |
|
* Unless required by applicable law or agreed to in writing, software |
|
* distributed under the License is distributed on an "AS IS" BASIS, |
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
* See the License for the specific language governing permissions and |
|
* limitations under the License. |
|
* |
|
*/ |
|
|
|
package grpc |
|
|
|
import ( |
|
"sync" |
|
"sync/atomic" |
|
|
|
"golang.org/x/net/context" |
|
"google.golang.org/grpc/balancer" |
|
"google.golang.org/grpc/codes" |
|
lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" |
|
"google.golang.org/grpc/status" |
|
) |
|
|
|
type rpcStats struct { |
|
NumCallsStarted int64 |
|
NumCallsFinished int64 |
|
NumCallsFinishedWithDropForRateLimiting int64 |
|
NumCallsFinishedWithDropForLoadBalancing int64 |
|
NumCallsFinishedWithClientFailedToSend int64 |
|
NumCallsFinishedKnownReceived int64 |
|
} |
|
|
|
// toClientStats converts rpcStats to lbpb.ClientStats, and clears rpcStats. |
|
func (s *rpcStats) toClientStats() *lbpb.ClientStats { |
|
stats := &lbpb.ClientStats{ |
|
NumCallsStarted: atomic.SwapInt64(&s.NumCallsStarted, 0), |
|
NumCallsFinished: atomic.SwapInt64(&s.NumCallsFinished, 0), |
|
NumCallsFinishedWithDropForRateLimiting: atomic.SwapInt64(&s.NumCallsFinishedWithDropForRateLimiting, 0), |
|
NumCallsFinishedWithDropForLoadBalancing: atomic.SwapInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 0), |
|
NumCallsFinishedWithClientFailedToSend: atomic.SwapInt64(&s.NumCallsFinishedWithClientFailedToSend, 0), |
|
NumCallsFinishedKnownReceived: atomic.SwapInt64(&s.NumCallsFinishedKnownReceived, 0), |
|
} |
|
return stats |
|
} |
|
|
|
func (s *rpcStats) dropForRateLimiting() { |
|
atomic.AddInt64(&s.NumCallsStarted, 1) |
|
atomic.AddInt64(&s.NumCallsFinishedWithDropForRateLimiting, 1) |
|
atomic.AddInt64(&s.NumCallsFinished, 1) |
|
} |
|
|
|
func (s *rpcStats) dropForLoadBalancing() { |
|
atomic.AddInt64(&s.NumCallsStarted, 1) |
|
atomic.AddInt64(&s.NumCallsFinishedWithDropForLoadBalancing, 1) |
|
atomic.AddInt64(&s.NumCallsFinished, 1) |
|
} |
|
|
|
func (s *rpcStats) failedToSend() { |
|
atomic.AddInt64(&s.NumCallsStarted, 1) |
|
atomic.AddInt64(&s.NumCallsFinishedWithClientFailedToSend, 1) |
|
atomic.AddInt64(&s.NumCallsFinished, 1) |
|
} |
|
|
|
func (s *rpcStats) knownReceived() { |
|
atomic.AddInt64(&s.NumCallsStarted, 1) |
|
atomic.AddInt64(&s.NumCallsFinishedKnownReceived, 1) |
|
atomic.AddInt64(&s.NumCallsFinished, 1) |
|
} |
|
|
|
type errPicker struct { |
|
// Pick always returns this err. |
|
err error |
|
} |
|
|
|
func (p *errPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { |
|
return nil, nil, p.err |
|
} |
|
|
|
// rrPicker does roundrobin on subConns. It's typically used when there's no |
|
// response from remote balancer, and grpclb falls back to the resolved |
|
// backends. |
|
// |
|
// It guaranteed that len(subConns) > 0. |
|
type rrPicker struct { |
|
mu sync.Mutex |
|
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot. |
|
subConnsNext int |
|
} |
|
|
|
func (p *rrPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
sc := p.subConns[p.subConnsNext] |
|
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns) |
|
return sc, nil, nil |
|
} |
|
|
|
// lbPicker does two layers of picks: |
|
// |
|
// First layer: roundrobin on all servers in serverList, including drops and backends. |
|
// - If it picks a drop, the RPC will fail as being dropped. |
|
// - If it picks a backend, do a second layer pick to pick the real backend. |
|
// |
|
// Second layer: roundrobin on all READY backends. |
|
// |
|
// It's guaranteed that len(serverList) > 0. |
|
type lbPicker struct { |
|
mu sync.Mutex |
|
serverList []*lbpb.Server |
|
serverListNext int |
|
subConns []balancer.SubConn // The subConns that were READY when taking the snapshot. |
|
subConnsNext int |
|
|
|
stats *rpcStats |
|
} |
|
|
|
func (p *lbPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { |
|
p.mu.Lock() |
|
defer p.mu.Unlock() |
|
|
|
// Layer one roundrobin on serverList. |
|
s := p.serverList[p.serverListNext] |
|
p.serverListNext = (p.serverListNext + 1) % len(p.serverList) |
|
|
|
// If it's a drop, return an error and fail the RPC. |
|
if s.DropForRateLimiting { |
|
p.stats.dropForRateLimiting() |
|
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") |
|
} |
|
if s.DropForLoadBalancing { |
|
p.stats.dropForLoadBalancing() |
|
return nil, nil, status.Errorf(codes.Unavailable, "request dropped by grpclb") |
|
} |
|
|
|
// If not a drop but there's no ready subConns. |
|
if len(p.subConns) <= 0 { |
|
return nil, nil, balancer.ErrNoSubConnAvailable |
|
} |
|
|
|
// Return the next ready subConn in the list, also collect rpc stats. |
|
sc := p.subConns[p.subConnsNext] |
|
p.subConnsNext = (p.subConnsNext + 1) % len(p.subConns) |
|
done := func(info balancer.DoneInfo) { |
|
if !info.BytesSent { |
|
p.stats.failedToSend() |
|
} else if info.BytesReceived { |
|
p.stats.knownReceived() |
|
} |
|
} |
|
return sc, done, nil |
|
}
|
|
|