From 9fb7375615b9b4ccc36ec59088b3293d6d4b10ea Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Tue, 9 Sep 2025 11:47:30 +0000 Subject: [PATCH] Refine code --- app/dispatcher/default.go | 1 + app/reverse/bridge.go | 8 +-- app/reverse/reverse.go | 2 +- proxy/vless/inbound/inbound.go | 107 +++++++++++++++++-------------- proxy/vless/outbound/outbound.go | 57 ++++++++-------- 5 files changed, 89 insertions(+), 86 deletions(-) diff --git a/app/dispatcher/default.go b/app/dispatcher/default.go index e3524350..c17bfcd1 100644 --- a/app/dispatcher/default.go +++ b/app/dispatcher/default.go @@ -483,6 +483,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport. handler = h } else { errors.LogWarning(ctx, "non existing outTag: ", outTag) + return } } else { errors.LogInfo(ctx, "default route for ", destination) diff --git a/app/reverse/bridge.go b/app/reverse/bridge.go index 25f039be..b86d153b 100644 --- a/app/reverse/bridge.go +++ b/app/reverse/bridge.go @@ -97,7 +97,7 @@ type BridgeWorker struct { Tag string Worker *mux.ServerWorker Dispatcher routing.Dispatcher - state Control_State + State Control_State } func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) { @@ -141,7 +141,7 @@ func (w *BridgeWorker) Close() error { } func (w *BridgeWorker) IsActive() bool { - return w.state == Control_ACTIVE && !w.Worker.Closed() + return w.State == Control_ACTIVE && !w.Worker.Closed() } func (w *BridgeWorker) Connections() uint32 { @@ -161,8 +161,8 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) { errors.LogInfoInner(context.Background(), err, "failed to parse proto message") break } - if ctl.State != w.state { - w.state = ctl.State + if ctl.State != w.State { + w.State = ctl.State } } } diff --git a/app/reverse/reverse.go b/app/reverse/reverse.go index f550a23f..dcd24c57 100644 --- a/app/reverse/reverse.go +++ b/app/reverse/reverse.go @@ -12,7 +12,7 @@ import ( ) const ( - internalDomain = "reverse.internal.v2fly.org" // make reverse proxy compatible with v2fly + internalDomain = "reverse" ) func isDomain(dest net.Destination, domain string) bool { diff --git a/proxy/vless/inbound/inbound.go b/proxy/vless/inbound/inbound.go index 0ab9df71..77f96bf3 100644 --- a/proxy/vless/inbound/inbound.go +++ b/proxy/vless/inbound/inbound.go @@ -12,7 +12,7 @@ import ( "time" "unsafe" - app_dispatcher "github.com/xtls/xray-core/app/dispatcher" + "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/app/reverse" "github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common/buf" @@ -71,12 +71,14 @@ func init() { // Handler is an inbound connection handler that handles messages in VLess protocol. type Handler struct { - inboundHandlerManager feature_inbound.Manager - policyManager policy.Manager - validator vless.Validator - dns dns.Client - decryption *encryption.ServerInstance - fallbacks map[string]map[string]map[string]*Fallback // or nil + inboundHandlerManager feature_inbound.Manager + policyManager policy.Manager + validator vless.Validator + decryption *encryption.ServerInstance + outboundHandlerManager outbound.Manager + defaultDispatcher *dispatcher.DefaultDispatcher + ctx context.Context + fallbacks map[string]map[string]map[string]*Fallback // or nil // regexps map[string]*regexp.Regexp // or nil } @@ -84,10 +86,12 @@ type Handler struct { func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Validator) (*Handler, error) { v := core.MustFromContext(ctx) handler := &Handler{ - inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager), - policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), - dns: dc, - validator: validator, + inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager), + policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), + validator: validator, + outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager), + defaultDispatcher: v.GetFeature(routing.DispatcherType()).(*dispatcher.DefaultDispatcher), + ctx: ctx, } if config.Decryption != "" && config.Decryption != "none" { @@ -179,12 +183,46 @@ func isMuxAndNotXUDP(request *protocol.RequestHeader, first *buf.Buffer) bool { firstBytes[6] == 2) // Network type: UDP } +func (h *Handler) GetReverse(a *vless.MemoryAccount) (*Reverse, error) { + u := h.validator.Get(a.ID.UUID()) + if u == nil { + return nil, errors.New("reverse: user " + a.ID.String() + " doesn't exist anymore") + } + a = u.Account.(*vless.MemoryAccount) + if a.Reverse == nil || a.Reverse.Tag == "" { + return nil, errors.New("reverse: user " + a.ID.String() + " is not allowed to create reverse proxy") + } + r := h.outboundHandlerManager.GetHandler(a.Reverse.Tag) + if r == nil { + picker, _ := reverse.NewStaticMuxPicker() + r = &Reverse{tag: a.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}} + if err := h.outboundHandlerManager.AddHandler(h.ctx, r); err != nil { + return nil, err + } + } + if r, ok := r.(*Reverse); ok { + return r, nil + } + return nil, errors.New("reverse: outbound " + a.Reverse.Tag + " is not type Reverse") +} + +func (h *Handler) RemoveReverse(u *protocol.MemoryUser) { + if u != nil { + a := u.Account.(*vless.MemoryAccount) + if a.Reverse != nil && a.Reverse.Tag != "" { + h.outboundHandlerManager.RemoveHandler(h.ctx, a.Reverse.Tag) + } + } +} + // Close implements common.Closable.Close(). func (h *Handler) Close() error { if h.decryption != nil { h.decryption.Close() } - // TODO: remove reverse's handlers (needs ctx) + for _, u := range h.validator.GetAll() { + h.RemoveReverse(u) + } return errors.Combine(common.Close(h.validator)) } @@ -195,19 +233,7 @@ func (h *Handler) AddUser(ctx context.Context, u *protocol.MemoryUser) error { // RemoveUser implements proxy.UserManager.RemoveUser(). func (h *Handler) RemoveUser(ctx context.Context, e string) error { - u := h.validator.GetByEmail(e) - if u != nil { - a := u.Account.(*vless.MemoryAccount) - if a.Reverse != nil && a.Reverse.Tag != "" { - core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { // not sure whether it works or not - go func() { - time.Sleep(time.Minute) // TODO: check firstLen - om.RemoveHandler(ctx, a.Reverse.Tag) - }() - return nil - }) - } - } + h.RemoveReverse(h.validator.GetByEmail(e)) return h.validator.Del(e) } @@ -519,6 +545,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s switch request.Command { case protocol.RequestCommandUDP: return errors.New(requestAddons.Flow + " doesn't support UDP").AtWarning() + case protocol.RequestCommandRvs: + inbound.CanSpliceCopy = 3 case protocol.RequestCommandMux: fallthrough // we will break Mux connections that contain TCP requests case protocol.RequestCommandTCP: @@ -585,30 +613,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s bufferWriter.SetFlushNext() if request.Command == protocol.RequestCommandRvs { - if account.Reverse == nil || account.Reverse.Tag == "" { - return errors.New("account " + account.ID.String() + " can not use reverse proxy") - } - var rd routing.Dispatcher - var obm outbound.Manager - if err := core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { - rd = d - obm = om - return nil - }); err != nil { + r, err := h.GetReverse(account) + if err != nil { return err } - r := obm.GetHandler(account.Reverse.Tag) - if r == nil { - picker, _ := reverse.NewStaticMuxPicker() - r = &Reverse{tag: account.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}} - if err := obm.AddHandler(ctx, r); err != nil { - return err - } - } - if r, ok := r.(*Reverse); ok { - return r.NewMux(ctx, rd.(*app_dispatcher.DefaultDispatcher).WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) - } - return errors.New("mismatched reverse tag") + return r.NewMux(ctx, h.defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter})) } if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ @@ -630,14 +639,14 @@ func (r *Reverse) Tag() string { return r.tag } -func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { +func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { // XTLS? vnext? users? muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{}) if err != nil { return errors.New("failed to create mux client worker").Base(err).AtWarning() } worker, err := reverse.NewPortalWorker(muxClient) if err != nil { - return errors.New("failed to create portal worker").Base(err) + return errors.New("failed to create portal worker").Base(err).AtWarning() } r.picker.AddWorker(worker) select { diff --git a/proxy/vless/outbound/outbound.go b/proxy/vless/outbound/outbound.go index 08902932..30b9dcf9 100644 --- a/proxy/vless/outbound/outbound.go +++ b/proxy/vless/outbound/outbound.go @@ -89,28 +89,20 @@ func New(ctx context.Context, config *Config) (*Handler, error) { } if a.Reverse != nil { - if err := core.RequireFeatures(ctx, func(d routing.Dispatcher) error { - ctx = session.ContextWithInbound(ctx, &session.Inbound{ - Tag: a.Reverse.Tag, - }) - ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{ - Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")}, - }}) - handler.reverse = &Reverse{ - tag: a.Reverse.Tag, - dispatcher: d, - ctx: ctx, - handler: handler, - } - handler.reverse.monitorTask = &task.Periodic{ - Execute: handler.reverse.monitor, - Interval: time.Second * 2, - } - handler.reverse.Start() - return nil - }); err != nil { - return nil, err + handler.reverse = &Reverse{ + tag: a.Reverse.Tag, + dispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher), + ctx: ctx, + handler: handler, } + handler.reverse.monitorTask = &task.Periodic{ + Execute: handler.reverse.monitor, + Interval: time.Second * 2, + } + go func() { + time.Sleep(2 * time.Second) + handler.reverse.Start() + }() } return handler, nil @@ -128,7 +120,7 @@ func (h *Handler) Close() error { func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { outbounds := session.OutboundsFromContext(ctx) ob := outbounds[len(outbounds)-1] - if !ob.Target.IsValid() { + if !ob.Target.IsValid() && ob.Target.Address.String() != "v1.rvs.cool" { return errors.New("target not specified").AtError() } ob.Name = "vless" @@ -400,26 +392,27 @@ func (r *Reverse) monitor() error { if numWorker == 0 || numConnections/numWorker > 16 { reader1, writer1 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) reader2, writer2 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) - link1 := &transport.Link{ - Reader: reader1, - Writer: writer2, - } - link2 := &transport.Link{ - Reader: reader2, - Writer: writer1, - } + link1 := &transport.Link{Reader: reader1, Writer: writer2} + link2 := &transport.Link{Reader: reader2, Writer: writer1} w := &reverse.BridgeWorker{ Tag: r.tag, Dispatcher: r.dispatcher, } worker, err := mux.NewServerWorker(r.ctx, w, link1) if err != nil { - errors.LogWarningInner(context.Background(), err, "failed to create bridge worker") + errors.LogWarningInner(r.ctx, err, "failed to create mux server worker") return nil } w.Worker = worker r.workers = append(r.workers, w) - go r.handler.Process(r.ctx, link2, session.HandlerFromContext(r.ctx).(*proxyman.Handler)) + go func() { + ctx := session.ContextWithOutbounds(r.ctx, []*session.Outbound{{ + Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")}, + }}) + r.handler.Process(ctx, link2, session.HandlerFromContext(ctx).(*proxyman.Handler)) + common.Interrupt(reader1) + common.Interrupt(reader2) + }() } return nil }