diff --git a/app/dispatcher/impl/default.go b/app/dispatcher/impl/default.go index 87c43d89..4aa20bc9 100644 --- a/app/dispatcher/impl/default.go +++ b/app/dispatcher/impl/default.go @@ -2,7 +2,6 @@ package impl import ( "context" - "time" "v2ray.com/core/app" "v2ray.com/core/app/dispatcher" @@ -10,7 +9,6 @@ import ( "v2ray.com/core/app/proxyman" "v2ray.com/core/app/router" "v2ray.com/core/common" - "v2ray.com/core/common/buf" "v2ray.com/core/common/errors" "v2ray.com/core/common/net" "v2ray.com/core/proxy" @@ -71,70 +69,13 @@ func (v *DefaultDispatcher) Dispatch(ctx context.Context, destination net.Destin } direct := ray.NewRay(ctx) - var waitFunc func() error - if allowPassiveConnection, ok := proxy.AllowPassiveConnectionFromContext(ctx); ok && allowPassiveConnection { - waitFunc = noOpWait() - } else { - wdi := &waitDataInspector{ - hasData: make(chan bool, 1), - } - direct.AddInspector(wdi) - waitFunc = waitForData(wdi) - } - - go v.waitAndDispatch(ctx, waitFunc, direct, dispatcher) + go dispatcher.Dispatch(ctx, direct) return direct, nil } -func (v *DefaultDispatcher) waitAndDispatch(ctx context.Context, wait func() error, link ray.OutboundRay, dispatcher proxyman.OutboundHandler) { - if err := wait(); err != nil { - log.Info("DefaultDispatcher: Failed precondition: ", err) - link.OutboundInput().CloseError() - link.OutboundOutput().CloseError() - return - } - - dispatcher.Dispatch(ctx, link) -} - func init() { common.Must(common.RegisterConfig((*dispatcher.Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) { return NewDefaultDispatcher(ctx, config.(*dispatcher.Config)) })) } - -type waitDataInspector struct { - hasData chan bool -} - -func (wdi *waitDataInspector) Input(*buf.Buffer) { - select { - case wdi.hasData <- true: - default: - } -} - -func (wdi *waitDataInspector) WaitForData() bool { - select { - case <-wdi.hasData: - return true - case <-time.After(time.Minute): - return false - } -} - -func waitForData(wdi *waitDataInspector) func() error { - return func() error { - if wdi.WaitForData() { - return nil - } - return errors.New("DefaultDispatcher: No data.") - } -} - -func noOpWait() func() error { - return func() error { - return nil - } -} diff --git a/app/proxyman/config.proto b/app/proxyman/config.proto index 4dadb25b..defbd4a7 100644 --- a/app/proxyman/config.proto +++ b/app/proxyman/config.proto @@ -53,7 +53,7 @@ message ReceiverConfig { AllocationStrategy allocation_strategy = 3; v2ray.core.transport.internet.StreamConfig stream_settings = 4; bool receive_original_destination = 5; - bool allow_passive_connection = 6; + bool allow_passive_connection = 6 [deprecated=true]; } message InboundHandlerConfig { diff --git a/app/proxyman/inbound/always.go b/app/proxyman/inbound/always.go index 473dd290..f2826c85 100644 --- a/app/proxyman/inbound/always.go +++ b/app/proxyman/inbound/always.go @@ -37,14 +37,13 @@ func NewAlwaysOnInboundHandler(ctx context.Context, tag string, receiverConfig * if nl.HasNetwork(net.Network_TCP) { log.Debug("Proxyman|DefaultInboundHandler: creating tcp worker on ", address, ":", port) worker := &tcpWorker{ - address: address, - port: net.Port(port), - proxy: p, - stream: receiverConfig.StreamSettings, - recvOrigDest: receiverConfig.ReceiveOriginalDestination, - tag: tag, - allowPassiveConn: receiverConfig.AllowPassiveConnection, - dispatcher: h.mux, + address: address, + port: net.Port(port), + proxy: p, + stream: receiverConfig.StreamSettings, + recvOrigDest: receiverConfig.ReceiveOriginalDestination, + tag: tag, + dispatcher: h.mux, } h.workers = append(h.workers, worker) } diff --git a/app/proxyman/inbound/dynamic.go b/app/proxyman/inbound/dynamic.go index 6d7824e9..2c9469ce 100644 --- a/app/proxyman/inbound/dynamic.go +++ b/app/proxyman/inbound/dynamic.go @@ -97,14 +97,13 @@ func (h *DynamicInboundHandler) refresh() error { nl := p.Network() if nl.HasNetwork(v2net.Network_TCP) { worker := &tcpWorker{ - tag: h.tag, - address: address, - port: port, - proxy: p, - stream: h.receiverConfig.StreamSettings, - recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, - allowPassiveConn: h.receiverConfig.AllowPassiveConnection, - dispatcher: h.mux, + tag: h.tag, + address: address, + port: port, + proxy: p, + stream: h.receiverConfig.StreamSettings, + recvOrigDest: h.receiverConfig.ReceiveOriginalDestination, + dispatcher: h.mux, } if err := worker.Start(); err != nil { log.Warning("Proxyman:InboundHandler: Failed to create TCP worker: ", err) diff --git a/app/proxyman/inbound/worker.go b/app/proxyman/inbound/worker.go index 6ff80926..cbea174e 100644 --- a/app/proxyman/inbound/worker.go +++ b/app/proxyman/inbound/worker.go @@ -26,14 +26,13 @@ type worker interface { } type tcpWorker struct { - address v2net.Address - port v2net.Port - proxy proxy.Inbound - stream *internet.StreamConfig - recvOrigDest bool - tag string - allowPassiveConn bool - dispatcher dispatcher.Interface + address v2net.Address + port v2net.Port + proxy proxy.Inbound + stream *internet.StreamConfig + recvOrigDest bool + tag string + dispatcher dispatcher.Interface ctx context.Context cancel context.CancelFunc @@ -51,7 +50,6 @@ func (w *tcpWorker) callback(conn internet.Connection) { if len(w.tag) > 0 { ctx = proxy.ContextWithInboundTag(ctx, w.tag) } - ctx = proxy.ContextWithAllowPassiveConnection(ctx, w.allowPassiveConn) ctx = proxy.ContextWithInboundDestination(ctx, v2net.TCPDestination(w.address, w.port)) ctx = proxy.ContextWithSource(ctx, v2net.DestinationFromAddr(conn.RemoteAddr())) if err := w.proxy.Process(ctx, v2net.Network_TCP, conn, w.dispatcher); err != nil { diff --git a/transport/ray/direct.go b/transport/ray/direct.go index 21628d46..df773d5f 100644 --- a/transport/ray/direct.go +++ b/transport/ray/direct.go @@ -1,13 +1,11 @@ package ray import ( + "context" "errors" "io" - "time" - "context" - "v2ray.com/core/common/buf" ) @@ -46,29 +44,19 @@ func (v *directRay) InboundOutput() InputStream { return v.Output } -func (v *directRay) AddInspector(inspector Inspector) { - if inspector == nil { - return - } - v.Input.inspector.AddInspector(inspector) - v.Output.inspector.AddInspector(inspector) -} - type Stream struct { - buffer chan *buf.Buffer - ctx context.Context - close chan bool - err chan bool - inspector *InspectorChain + buffer chan *buf.Buffer + ctx context.Context + close chan bool + err chan bool } func NewStream(ctx context.Context) *Stream { return &Stream{ - ctx: ctx, - buffer: make(chan *buf.Buffer, bufferSize), - close: make(chan bool), - err: make(chan bool), - inspector: &InspectorChain{}, + ctx: ctx, + buffer: make(chan *buf.Buffer, bufferSize), + close: make(chan bool), + err: make(chan bool), } } @@ -139,7 +127,6 @@ func (v *Stream) Write(data *buf.Buffer) (err error) { case <-v.close: return io.ErrClosedPipe case v.buffer <- data: - v.inspector.Input(data) return nil } } diff --git a/transport/ray/inspector.go b/transport/ray/inspector.go deleted file mode 100644 index 6989f52d..00000000 --- a/transport/ray/inspector.go +++ /dev/null @@ -1,36 +0,0 @@ -package ray - -import ( - "sync" - - "v2ray.com/core/common/buf" -) - -type Inspector interface { - Input(*buf.Buffer) -} - -type NoOpInspector struct{} - -func (NoOpInspector) Input(*buf.Buffer) {} - -type InspectorChain struct { - sync.RWMutex - chain []Inspector -} - -func (ic *InspectorChain) AddInspector(inspector Inspector) { - ic.Lock() - defer ic.Unlock() - - ic.chain = append(ic.chain, inspector) -} - -func (ic *InspectorChain) Input(b *buf.Buffer) { - ic.RLock() - defer ic.RUnlock() - - for _, inspector := range ic.chain { - inspector.Input(b) - } -} diff --git a/transport/ray/ray.go b/transport/ray/ray.go index 3d00e938..9172f969 100644 --- a/transport/ray/ray.go +++ b/transport/ray/ray.go @@ -32,7 +32,6 @@ type InboundRay interface { type Ray interface { InboundRay OutboundRay - AddInspector(Inspector) } type RayStream interface {