Refine code

pull/5101/head
RPRX 2025-09-09 11:47:30 +00:00 committed by GitHub
parent 8b6824104e
commit 9fb7375615
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 89 additions and 86 deletions

View File

@ -483,6 +483,7 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
handler = h handler = h
} else { } else {
errors.LogWarning(ctx, "non existing outTag: ", outTag) errors.LogWarning(ctx, "non existing outTag: ", outTag)
return
} }
} else { } else {
errors.LogInfo(ctx, "default route for ", destination) errors.LogInfo(ctx, "default route for ", destination)

View File

@ -97,7 +97,7 @@ type BridgeWorker struct {
Tag string Tag string
Worker *mux.ServerWorker Worker *mux.ServerWorker
Dispatcher routing.Dispatcher Dispatcher routing.Dispatcher
state Control_State State Control_State
} }
func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) { func NewBridgeWorker(domain string, tag string, d routing.Dispatcher) (*BridgeWorker, error) {
@ -141,7 +141,7 @@ func (w *BridgeWorker) Close() error {
} }
func (w *BridgeWorker) IsActive() bool { func (w *BridgeWorker) IsActive() bool {
return w.state == Control_ACTIVE && !w.Worker.Closed() return w.State == Control_ACTIVE && !w.Worker.Closed()
} }
func (w *BridgeWorker) Connections() uint32 { func (w *BridgeWorker) Connections() uint32 {
@ -161,8 +161,8 @@ func (w *BridgeWorker) handleInternalConn(link *transport.Link) {
errors.LogInfoInner(context.Background(), err, "failed to parse proto message") errors.LogInfoInner(context.Background(), err, "failed to parse proto message")
break break
} }
if ctl.State != w.state { if ctl.State != w.State {
w.state = ctl.State w.State = ctl.State
} }
} }
} }

View File

@ -12,7 +12,7 @@ import (
) )
const ( const (
internalDomain = "reverse.internal.v2fly.org" // make reverse proxy compatible with v2fly internalDomain = "reverse"
) )
func isDomain(dest net.Destination, domain string) bool { func isDomain(dest net.Destination, domain string) bool {

View File

@ -12,7 +12,7 @@ import (
"time" "time"
"unsafe" "unsafe"
app_dispatcher "github.com/xtls/xray-core/app/dispatcher" "github.com/xtls/xray-core/app/dispatcher"
"github.com/xtls/xray-core/app/reverse" "github.com/xtls/xray-core/app/reverse"
"github.com/xtls/xray-core/common" "github.com/xtls/xray-core/common"
"github.com/xtls/xray-core/common/buf" "github.com/xtls/xray-core/common/buf"
@ -74,8 +74,10 @@ type Handler struct {
inboundHandlerManager feature_inbound.Manager inboundHandlerManager feature_inbound.Manager
policyManager policy.Manager policyManager policy.Manager
validator vless.Validator validator vless.Validator
dns dns.Client
decryption *encryption.ServerInstance decryption *encryption.ServerInstance
outboundHandlerManager outbound.Manager
defaultDispatcher *dispatcher.DefaultDispatcher
ctx context.Context
fallbacks map[string]map[string]map[string]*Fallback // or nil fallbacks map[string]map[string]map[string]*Fallback // or nil
// regexps map[string]*regexp.Regexp // or nil // regexps map[string]*regexp.Regexp // or nil
} }
@ -86,8 +88,10 @@ func New(ctx context.Context, config *Config, dc dns.Client, validator vless.Val
handler := &Handler{ handler := &Handler{
inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager), inboundHandlerManager: v.GetFeature(feature_inbound.ManagerType()).(feature_inbound.Manager),
policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager), policyManager: v.GetFeature(policy.ManagerType()).(policy.Manager),
dns: dc,
validator: validator, validator: validator,
outboundHandlerManager: v.GetFeature(outbound.ManagerType()).(outbound.Manager),
defaultDispatcher: v.GetFeature(routing.DispatcherType()).(*dispatcher.DefaultDispatcher),
ctx: ctx,
} }
if config.Decryption != "" && config.Decryption != "none" { if config.Decryption != "" && config.Decryption != "none" {
@ -179,12 +183,46 @@ func isMuxAndNotXUDP(request *protocol.RequestHeader, first *buf.Buffer) bool {
firstBytes[6] == 2) // Network type: UDP firstBytes[6] == 2) // Network type: UDP
} }
func (h *Handler) GetReverse(a *vless.MemoryAccount) (*Reverse, error) {
u := h.validator.Get(a.ID.UUID())
if u == nil {
return nil, errors.New("reverse: user " + a.ID.String() + " doesn't exist anymore")
}
a = u.Account.(*vless.MemoryAccount)
if a.Reverse == nil || a.Reverse.Tag == "" {
return nil, errors.New("reverse: user " + a.ID.String() + " is not allowed to create reverse proxy")
}
r := h.outboundHandlerManager.GetHandler(a.Reverse.Tag)
if r == nil {
picker, _ := reverse.NewStaticMuxPicker()
r = &Reverse{tag: a.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}}
if err := h.outboundHandlerManager.AddHandler(h.ctx, r); err != nil {
return nil, err
}
}
if r, ok := r.(*Reverse); ok {
return r, nil
}
return nil, errors.New("reverse: outbound " + a.Reverse.Tag + " is not type Reverse")
}
func (h *Handler) RemoveReverse(u *protocol.MemoryUser) {
if u != nil {
a := u.Account.(*vless.MemoryAccount)
if a.Reverse != nil && a.Reverse.Tag != "" {
h.outboundHandlerManager.RemoveHandler(h.ctx, a.Reverse.Tag)
}
}
}
// Close implements common.Closable.Close(). // Close implements common.Closable.Close().
func (h *Handler) Close() error { func (h *Handler) Close() error {
if h.decryption != nil { if h.decryption != nil {
h.decryption.Close() h.decryption.Close()
} }
// TODO: remove reverse's handlers (needs ctx) for _, u := range h.validator.GetAll() {
h.RemoveReverse(u)
}
return errors.Combine(common.Close(h.validator)) return errors.Combine(common.Close(h.validator))
} }
@ -195,19 +233,7 @@ func (h *Handler) AddUser(ctx context.Context, u *protocol.MemoryUser) error {
// RemoveUser implements proxy.UserManager.RemoveUser(). // RemoveUser implements proxy.UserManager.RemoveUser().
func (h *Handler) RemoveUser(ctx context.Context, e string) error { func (h *Handler) RemoveUser(ctx context.Context, e string) error {
u := h.validator.GetByEmail(e) h.RemoveReverse(h.validator.GetByEmail(e))
if u != nil {
a := u.Account.(*vless.MemoryAccount)
if a.Reverse != nil && a.Reverse.Tag != "" {
core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error { // not sure whether it works or not
go func() {
time.Sleep(time.Minute) // TODO: check firstLen
om.RemoveHandler(ctx, a.Reverse.Tag)
}()
return nil
})
}
}
return h.validator.Del(e) return h.validator.Del(e)
} }
@ -519,6 +545,8 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
switch request.Command { switch request.Command {
case protocol.RequestCommandUDP: case protocol.RequestCommandUDP:
return errors.New(requestAddons.Flow + " doesn't support UDP").AtWarning() return errors.New(requestAddons.Flow + " doesn't support UDP").AtWarning()
case protocol.RequestCommandRvs:
inbound.CanSpliceCopy = 3
case protocol.RequestCommandMux: case protocol.RequestCommandMux:
fallthrough // we will break Mux connections that contain TCP requests fallthrough // we will break Mux connections that contain TCP requests
case protocol.RequestCommandTCP: case protocol.RequestCommandTCP:
@ -585,30 +613,11 @@ func (h *Handler) Process(ctx context.Context, network net.Network, connection s
bufferWriter.SetFlushNext() bufferWriter.SetFlushNext()
if request.Command == protocol.RequestCommandRvs { if request.Command == protocol.RequestCommandRvs {
if account.Reverse == nil || account.Reverse.Tag == "" { r, err := h.GetReverse(account)
return errors.New("account " + account.ID.String() + " can not use reverse proxy") if err != nil {
}
var rd routing.Dispatcher
var obm outbound.Manager
if err := core.RequireFeatures(ctx, func(d routing.Dispatcher, om outbound.Manager) error {
rd = d
obm = om
return nil
}); err != nil {
return err return err
} }
r := obm.GetHandler(account.Reverse.Tag) return r.NewMux(ctx, h.defaultDispatcher.WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter}))
if r == nil {
picker, _ := reverse.NewStaticMuxPicker()
r = &Reverse{tag: account.Reverse.Tag, picker: picker, client: &mux.ClientManager{Picker: picker}}
if err := obm.AddHandler(ctx, r); err != nil {
return err
}
}
if r, ok := r.(*Reverse); ok {
return r.NewMux(ctx, rd.(*app_dispatcher.DefaultDispatcher).WrapLink(ctx, &transport.Link{Reader: clientReader, Writer: clientWriter}))
}
return errors.New("mismatched reverse tag")
} }
if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{ if err := dispatcher.DispatchLink(ctx, request.Destination(), &transport.Link{
@ -630,14 +639,14 @@ func (r *Reverse) Tag() string {
return r.tag return r.tag
} }
func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { func (r *Reverse) NewMux(ctx context.Context, link *transport.Link) error { // XTLS? vnext? users?
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{}) muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
if err != nil { if err != nil {
return errors.New("failed to create mux client worker").Base(err).AtWarning() return errors.New("failed to create mux client worker").Base(err).AtWarning()
} }
worker, err := reverse.NewPortalWorker(muxClient) worker, err := reverse.NewPortalWorker(muxClient)
if err != nil { if err != nil {
return errors.New("failed to create portal worker").Base(err) return errors.New("failed to create portal worker").Base(err).AtWarning()
} }
r.picker.AddWorker(worker) r.picker.AddWorker(worker)
select { select {

View File

@ -89,16 +89,9 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
} }
if a.Reverse != nil { if a.Reverse != nil {
if err := core.RequireFeatures(ctx, func(d routing.Dispatcher) error {
ctx = session.ContextWithInbound(ctx, &session.Inbound{
Tag: a.Reverse.Tag,
})
ctx = session.ContextWithOutbounds(ctx, []*session.Outbound{{
Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")},
}})
handler.reverse = &Reverse{ handler.reverse = &Reverse{
tag: a.Reverse.Tag, tag: a.Reverse.Tag,
dispatcher: d, dispatcher: v.GetFeature(routing.DispatcherType()).(routing.Dispatcher),
ctx: ctx, ctx: ctx,
handler: handler, handler: handler,
} }
@ -106,11 +99,10 @@ func New(ctx context.Context, config *Config) (*Handler, error) {
Execute: handler.reverse.monitor, Execute: handler.reverse.monitor,
Interval: time.Second * 2, Interval: time.Second * 2,
} }
go func() {
time.Sleep(2 * time.Second)
handler.reverse.Start() handler.reverse.Start()
return nil }()
}); err != nil {
return nil, err
}
} }
return handler, nil return handler, nil
@ -128,7 +120,7 @@ func (h *Handler) Close() error {
func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error { func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer internet.Dialer) error {
outbounds := session.OutboundsFromContext(ctx) outbounds := session.OutboundsFromContext(ctx)
ob := outbounds[len(outbounds)-1] ob := outbounds[len(outbounds)-1]
if !ob.Target.IsValid() { if !ob.Target.IsValid() && ob.Target.Address.String() != "v1.rvs.cool" {
return errors.New("target not specified").AtError() return errors.New("target not specified").AtError()
} }
ob.Name = "vless" ob.Name = "vless"
@ -400,26 +392,27 @@ func (r *Reverse) monitor() error {
if numWorker == 0 || numConnections/numWorker > 16 { if numWorker == 0 || numConnections/numWorker > 16 {
reader1, writer1 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) reader1, writer1 := pipe.New(pipe.WithSizeLimit(2 * buf.Size))
reader2, writer2 := pipe.New(pipe.WithSizeLimit(2 * buf.Size)) reader2, writer2 := pipe.New(pipe.WithSizeLimit(2 * buf.Size))
link1 := &transport.Link{ link1 := &transport.Link{Reader: reader1, Writer: writer2}
Reader: reader1, link2 := &transport.Link{Reader: reader2, Writer: writer1}
Writer: writer2,
}
link2 := &transport.Link{
Reader: reader2,
Writer: writer1,
}
w := &reverse.BridgeWorker{ w := &reverse.BridgeWorker{
Tag: r.tag, Tag: r.tag,
Dispatcher: r.dispatcher, Dispatcher: r.dispatcher,
} }
worker, err := mux.NewServerWorker(r.ctx, w, link1) worker, err := mux.NewServerWorker(r.ctx, w, link1)
if err != nil { if err != nil {
errors.LogWarningInner(context.Background(), err, "failed to create bridge worker") errors.LogWarningInner(r.ctx, err, "failed to create mux server worker")
return nil return nil
} }
w.Worker = worker w.Worker = worker
r.workers = append(r.workers, w) r.workers = append(r.workers, w)
go r.handler.Process(r.ctx, link2, session.HandlerFromContext(r.ctx).(*proxyman.Handler)) go func() {
ctx := session.ContextWithOutbounds(r.ctx, []*session.Outbound{{
Target: net.Destination{Address: net.DomainAddress("v1.rvs.cool")},
}})
r.handler.Process(ctx, link2, session.HandlerFromContext(ctx).(*proxyman.Handler))
common.Interrupt(reader1)
common.Interrupt(reader2)
}()
} }
return nil return nil
} }