mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
166 lines
4.2 KiB
166 lines
4.2 KiB
package router |
|
|
|
import ( |
|
"context" |
|
sync "sync" |
|
|
|
"github.com/xtls/xray-core/app/observatory" |
|
"github.com/xtls/xray-core/common" |
|
"github.com/xtls/xray-core/core" |
|
"github.com/xtls/xray-core/features/extension" |
|
"github.com/xtls/xray-core/features/outbound" |
|
) |
|
|
|
type BalancingStrategy interface { |
|
PickOutbound([]string) string |
|
} |
|
|
|
type BalancingPrincipleTarget interface { |
|
GetPrincipleTarget([]string) []string |
|
} |
|
|
|
type RoundRobinStrategy struct { |
|
FallbackTag string |
|
|
|
ctx context.Context |
|
observatory extension.Observatory |
|
mu sync.Mutex |
|
index int |
|
} |
|
|
|
func (s *RoundRobinStrategy) InjectContext(ctx context.Context) { |
|
s.ctx = ctx |
|
} |
|
|
|
func (s *RoundRobinStrategy) GetPrincipleTarget(strings []string) []string { |
|
return strings |
|
} |
|
|
|
func (s *RoundRobinStrategy) PickOutbound(tags []string) string { |
|
if len(s.FallbackTag) > 0 && s.observatory == nil { |
|
common.Must(core.RequireFeatures(s.ctx, func(observatory extension.Observatory) error { |
|
s.observatory = observatory |
|
return nil |
|
})) |
|
} |
|
if s.observatory != nil { |
|
observeReport, err := s.observatory.GetObservation(s.ctx) |
|
if err == nil { |
|
aliveTags := make([]string, 0) |
|
if result, ok := observeReport.(*observatory.ObservationResult); ok { |
|
status := result.Status |
|
statusMap := make(map[string]*observatory.OutboundStatus) |
|
for _, outboundStatus := range status { |
|
statusMap[outboundStatus.OutboundTag] = outboundStatus |
|
} |
|
for _, candidate := range tags { |
|
if outboundStatus, found := statusMap[candidate]; found { |
|
if outboundStatus.Alive { |
|
aliveTags = append(aliveTags, candidate) |
|
} |
|
} else { |
|
// unfound candidate is considered alive |
|
aliveTags = append(aliveTags, candidate) |
|
} |
|
} |
|
tags = aliveTags |
|
} |
|
} |
|
} |
|
|
|
n := len(tags) |
|
if n == 0 { |
|
// goes to fallbackTag |
|
return "" |
|
} |
|
|
|
s.mu.Lock() |
|
defer s.mu.Unlock() |
|
tag := tags[s.index%n] |
|
s.index = (s.index + 1) % n |
|
return tag |
|
} |
|
|
|
type Balancer struct { |
|
selectors []string |
|
strategy BalancingStrategy |
|
ohm outbound.Manager |
|
fallbackTag string |
|
|
|
override override |
|
} |
|
|
|
// PickOutbound picks the tag of a outbound |
|
func (b *Balancer) PickOutbound() (string, error) { |
|
candidates, err := b.SelectOutbounds() |
|
if err != nil { |
|
if b.fallbackTag != "" { |
|
newError("fallback to [", b.fallbackTag, "], due to error: ", err).AtInfo().WriteToLog() |
|
return b.fallbackTag, nil |
|
} |
|
return "", err |
|
} |
|
var tag string |
|
if o := b.override.Get(); o != "" { |
|
tag = o |
|
} else { |
|
tag = b.strategy.PickOutbound(candidates) |
|
} |
|
if tag == "" { |
|
if b.fallbackTag != "" { |
|
newError("fallback to [", b.fallbackTag, "], due to empty tag returned").AtInfo().WriteToLog() |
|
return b.fallbackTag, nil |
|
} |
|
// will use default handler |
|
return "", newError("balancing strategy returns empty tag") |
|
} |
|
return tag, nil |
|
} |
|
|
|
func (b *Balancer) InjectContext(ctx context.Context) { |
|
if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok { |
|
contextReceiver.InjectContext(ctx) |
|
} |
|
} |
|
|
|
// SelectOutbounds select outbounds with selectors of the Balancer |
|
func (b *Balancer) SelectOutbounds() ([]string, error) { |
|
hs, ok := b.ohm.(outbound.HandlerSelector) |
|
if !ok { |
|
return nil, newError("outbound.Manager is not a HandlerSelector") |
|
} |
|
tags := hs.Select(b.selectors) |
|
return tags, nil |
|
} |
|
|
|
// GetPrincipleTarget implements routing.BalancerPrincipleTarget |
|
func (r *Router) GetPrincipleTarget(tag string) ([]string, error) { |
|
if b, ok := r.balancers[tag]; ok { |
|
if s, ok := b.strategy.(BalancingPrincipleTarget); ok { |
|
candidates, err := b.SelectOutbounds() |
|
if err != nil { |
|
return nil, newError("unable to select outbounds").Base(err) |
|
} |
|
return s.GetPrincipleTarget(candidates), nil |
|
} |
|
return nil, newError("unsupported GetPrincipleTarget") |
|
} |
|
return nil, newError("cannot find tag") |
|
} |
|
|
|
// SetOverrideTarget implements routing.BalancerOverrider |
|
func (r *Router) SetOverrideTarget(tag, target string) error { |
|
if b, ok := r.balancers[tag]; ok { |
|
b.override.Put(target) |
|
return nil |
|
} |
|
return newError("cannot find tag") |
|
} |
|
|
|
// GetOverrideTarget implements routing.BalancerOverrider |
|
func (r *Router) GetOverrideTarget(tag string) (string, error) { |
|
if b, ok := r.balancers[tag]; ok { |
|
return b.override.Get(), nil |
|
} |
|
return "", newError("cannot find tag") |
|
}
|
|
|