You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
v2ray-core/app/reverse/portal.go

267 lines
5.3 KiB

// +build !confonly
package reverse
import (
"context"
"sync"
"time"
"github.com/golang/protobuf/proto"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/mux"
"v2ray.com/core/common/net"
"v2ray.com/core/common/session"
"v2ray.com/core/common/task"
"v2ray.com/core/features/outbound"
"v2ray.com/core/transport"
"v2ray.com/core/transport/pipe"
)
type Portal struct {
ohm outbound.Manager
tag string
domain string
picker *StaticMuxPicker
client *mux.ClientManager
}
func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
if config.Tag == "" {
return nil, newError("portal tag is empty")
}
if config.Domain == "" {
return nil, newError("portal domain is empty")
}
picker, err := NewStaticMuxPicker()
if err != nil {
return nil, err
}
return &Portal{
ohm: ohm,
tag: config.Tag,
domain: config.Domain,
picker: picker,
client: &mux.ClientManager{
Picker: picker,
},
}, nil
}
func (p *Portal) Start() error {
return p.ohm.AddHandler(context.Background(), &Outbound{
portal: p,
tag: p.tag,
})
}
func (p *Portal) Close() error {
return p.ohm.RemoveHandler(context.Background(), p.tag)
}
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
outboundMeta := session.OutboundFromContext(ctx)
if outboundMeta == nil {
return newError("outbound metadata not found").AtError()
}
if isDomain(outboundMeta.Target, p.domain) {
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
if err != nil {
return newError("failed to create mux client worker").Base(err).AtWarning()
}
worker, err := NewPortalWorker(muxClient)
if err != nil {
return newError("failed to create portal worker").Base(err)
}
p.picker.AddWorker(worker)
return nil
}
return p.client.Dispatch(ctx, link)
}
type Outbound struct {
portal *Portal
tag string
}
func (o *Outbound) Tag() string {
return o.tag
}
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
if err := o.portal.HandleConnection(ctx, link); err != nil {
newError("failed to process reverse connection").Base(err).WriteToLog(session.ExportIDToError(ctx))
common.Interrupt(link.Writer)
}
}
func (o *Outbound) Start() error {
return nil
}
func (o *Outbound) Close() error {
return nil
}
type StaticMuxPicker struct {
access sync.Mutex
workers []*PortalWorker
cTask *task.Periodic
}
func NewStaticMuxPicker() (*StaticMuxPicker, error) {
p := &StaticMuxPicker{}
p.cTask = &task.Periodic{
Execute: p.cleanup,
Interval: time.Second * 30,
}
p.cTask.Start()
return p, nil
}
func (p *StaticMuxPicker) cleanup() error {
p.access.Lock()
defer p.access.Unlock()
var activeWorkers []*PortalWorker
for _, w := range p.workers {
if !w.Closed() {
activeWorkers = append(activeWorkers, w)
}
}
if len(activeWorkers) != len(p.workers) {
p.workers = activeWorkers
}
return nil
}
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
p.access.Lock()
defer p.access.Unlock()
if len(p.workers) == 0 {
return nil, newError("empty worker list")
}
var minIdx int = -1
var minConn uint32 = 9999
for i, w := range p.workers {
if w.draining {
continue
}
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
if minIdx == -1 {
for i, w := range p.workers {
if w.IsFull() {
continue
}
if w.client.ActiveConnections() < minConn {
minConn = w.client.ActiveConnections()
minIdx = i
}
}
}
if minIdx != -1 {
return p.workers[minIdx].client, nil
}
return nil, newError("no mux client worker available")
}
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
p.access.Lock()
defer p.access.Unlock()
p.workers = append(p.workers, worker)
}
type PortalWorker struct {
client *mux.ClientWorker
control *task.Periodic
writer buf.Writer
reader buf.Reader
draining bool
}
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
ctx := context.Background()
ctx = session.ContextWithOutbound(ctx, &session.Outbound{
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
})
f := client.Dispatch(ctx, &transport.Link{
Reader: uplinkReader,
Writer: downlinkWriter,
})
if !f {
return nil, newError("unable to dispatch control connection")
}
w := &PortalWorker{
client: client,
reader: downlinkReader,
writer: uplinkWriter,
}
w.control = &task.Periodic{
Execute: w.heartbeat,
Interval: time.Second * 2,
}
w.control.Start()
return w, nil
}
func (w *PortalWorker) heartbeat() error {
if w.client.Closed() {
return newError("client worker stopped")
}
if w.draining || w.writer == nil {
return newError("already disposed")
}
msg := &Control{}
msg.FillInRandom()
if w.client.TotalConnections() > 256 {
w.draining = true
msg.State = Control_DRAIN
defer func() {
common.Close(w.writer)
common.Interrupt(w.reader)
w.writer = nil
}()
}
b, err := proto.Marshal(msg)
common.Must(err)
mb := buf.MergeBytes(nil, b)
return w.writer.WriteMultiBuffer(mb)
}
func (w *PortalWorker) IsFull() bool {
return w.client.IsFull()
}
func (w *PortalWorker) Closed() bool {
return w.client.Closed()
}