mirror of https://github.com/v2ray/v2ray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
266 lines
5.3 KiB
266 lines
5.3 KiB
// +build !confonly |
|
|
|
package reverse |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
"time" |
|
|
|
"github.com/golang/protobuf/proto" |
|
"v2ray.com/core/common" |
|
"v2ray.com/core/common/buf" |
|
"v2ray.com/core/common/mux" |
|
"v2ray.com/core/common/net" |
|
"v2ray.com/core/common/session" |
|
"v2ray.com/core/common/task" |
|
"v2ray.com/core/features/outbound" |
|
"v2ray.com/core/transport" |
|
"v2ray.com/core/transport/pipe" |
|
) |
|
|
|
type Portal struct { |
|
ohm outbound.Manager |
|
tag string |
|
domain string |
|
picker *StaticMuxPicker |
|
client *mux.ClientManager |
|
} |
|
|
|
func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) { |
|
if config.Tag == "" { |
|
return nil, newError("portal tag is empty") |
|
} |
|
|
|
if config.Domain == "" { |
|
return nil, newError("portal domain is empty") |
|
} |
|
|
|
picker, err := NewStaticMuxPicker() |
|
if err != nil { |
|
return nil, err |
|
} |
|
|
|
return &Portal{ |
|
ohm: ohm, |
|
tag: config.Tag, |
|
domain: config.Domain, |
|
picker: picker, |
|
client: &mux.ClientManager{ |
|
Picker: picker, |
|
}, |
|
}, nil |
|
} |
|
|
|
func (p *Portal) Start() error { |
|
return p.ohm.AddHandler(context.Background(), &Outbound{ |
|
portal: p, |
|
tag: p.tag, |
|
}) |
|
} |
|
|
|
func (p *Portal) Close() error { |
|
return p.ohm.RemoveHandler(context.Background(), p.tag) |
|
} |
|
|
|
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error { |
|
outboundMeta := session.OutboundFromContext(ctx) |
|
if outboundMeta == nil { |
|
return newError("outbound metadata not found").AtError() |
|
} |
|
|
|
if isDomain(outboundMeta.Target, p.domain) { |
|
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{}) |
|
if err != nil { |
|
return newError("failed to create mux client worker").Base(err).AtWarning() |
|
} |
|
|
|
worker, err := NewPortalWorker(muxClient) |
|
if err != nil { |
|
return newError("failed to create portal worker").Base(err) |
|
} |
|
|
|
p.picker.AddWorker(worker) |
|
return nil |
|
} |
|
|
|
return p.client.Dispatch(ctx, link) |
|
} |
|
|
|
type Outbound struct { |
|
portal *Portal |
|
tag string |
|
} |
|
|
|
func (o *Outbound) Tag() string { |
|
return o.tag |
|
} |
|
|
|
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) { |
|
if err := o.portal.HandleConnection(ctx, link); err != nil { |
|
newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx)) |
|
common.Interrupt(link.Writer) |
|
} |
|
} |
|
|
|
func (o *Outbound) Start() error { |
|
return nil |
|
} |
|
|
|
func (o *Outbound) Close() error { |
|
return nil |
|
} |
|
|
|
type StaticMuxPicker struct { |
|
access sync.Mutex |
|
workers []*PortalWorker |
|
cTask *task.Periodic |
|
} |
|
|
|
func NewStaticMuxPicker() (*StaticMuxPicker, error) { |
|
p := &StaticMuxPicker{} |
|
p.cTask = &task.Periodic{ |
|
Execute: p.cleanup, |
|
Interval: time.Second * 30, |
|
} |
|
p.cTask.Start() |
|
return p, nil |
|
} |
|
|
|
func (p *StaticMuxPicker) cleanup() error { |
|
p.access.Lock() |
|
defer p.access.Unlock() |
|
|
|
var activeWorkers []*PortalWorker |
|
for _, w := range p.workers { |
|
if !w.Closed() { |
|
activeWorkers = append(activeWorkers, w) |
|
} |
|
} |
|
|
|
if len(activeWorkers) != len(p.workers) { |
|
p.workers = activeWorkers |
|
} |
|
|
|
return nil |
|
} |
|
|
|
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) { |
|
p.access.Lock() |
|
defer p.access.Unlock() |
|
|
|
if len(p.workers) == 0 { |
|
return nil, newError("empty worker list") |
|
} |
|
|
|
var minIdx int = -1 |
|
var minConn uint32 = 9999 |
|
for i, w := range p.workers { |
|
if w.draining { |
|
continue |
|
} |
|
if w.client.ActiveConnections() < minConn { |
|
minConn = w.client.ActiveConnections() |
|
minIdx = i |
|
} |
|
} |
|
|
|
if minIdx == -1 { |
|
for i, w := range p.workers { |
|
if w.IsFull() { |
|
continue |
|
} |
|
if w.client.ActiveConnections() < minConn { |
|
minConn = w.client.ActiveConnections() |
|
minIdx = i |
|
} |
|
} |
|
} |
|
|
|
if minIdx != -1 { |
|
return p.workers[minIdx].client, nil |
|
} |
|
|
|
return nil, newError("no mux client worker available") |
|
} |
|
|
|
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) { |
|
p.access.Lock() |
|
defer p.access.Unlock() |
|
|
|
p.workers = append(p.workers, worker) |
|
} |
|
|
|
type PortalWorker struct { |
|
client *mux.ClientWorker |
|
control *task.Periodic |
|
writer buf.Writer |
|
reader buf.Reader |
|
draining bool |
|
} |
|
|
|
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) { |
|
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)} |
|
uplinkReader, uplinkWriter := pipe.New(opt...) |
|
downlinkReader, downlinkWriter := pipe.New(opt...) |
|
|
|
ctx := context.Background() |
|
ctx = session.ContextWithOutbound(ctx, &session.Outbound{ |
|
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0), |
|
}) |
|
f := client.Dispatch(ctx, &transport.Link{ |
|
Reader: uplinkReader, |
|
Writer: downlinkWriter, |
|
}) |
|
if !f { |
|
return nil, newError("unable to dispatch control connection") |
|
} |
|
w := &PortalWorker{ |
|
client: client, |
|
reader: downlinkReader, |
|
writer: uplinkWriter, |
|
} |
|
w.control = &task.Periodic{ |
|
Execute: w.heartbeat, |
|
Interval: time.Second * 2, |
|
} |
|
w.control.Start() |
|
return w, nil |
|
} |
|
|
|
func (w *PortalWorker) heartbeat() error { |
|
if w.client.Closed() { |
|
return newError("client worker stopped") |
|
} |
|
|
|
if w.draining || w.writer == nil { |
|
return newError("already disposed") |
|
} |
|
|
|
msg := &Control{} |
|
msg.FillInRandom() |
|
|
|
if w.client.TotalConnections() > 256 { |
|
w.draining = true |
|
msg.State = Control_DRAIN |
|
|
|
defer func() { |
|
common.Close(w.writer) |
|
common.Interrupt(w.reader) |
|
w.writer = nil |
|
}() |
|
} |
|
|
|
b, err := proto.Marshal(msg) |
|
common.Must(err) |
|
mb := buf.MergeBytes(nil, b) |
|
return w.writer.WriteMultiBuffer(mb) |
|
} |
|
|
|
func (w *PortalWorker) IsFull() bool { |
|
return w.client.IsFull() |
|
} |
|
|
|
func (w *PortalWorker) Closed() bool { |
|
return w.client.Closed() |
|
}
|
|
|