mirror of https://github.com/XTLS/Xray-core
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
2.2 KiB
110 lines
2.2 KiB
package commander |
|
|
|
import ( |
|
"context" |
|
"sync" |
|
|
|
"github.com/xtls/xray-core/common" |
|
"github.com/xtls/xray-core/common/errors" |
|
"github.com/xtls/xray-core/common/net" |
|
"github.com/xtls/xray-core/common/net/cnc" |
|
"github.com/xtls/xray-core/common/signal/done" |
|
"github.com/xtls/xray-core/transport" |
|
) |
|
|
|
// OutboundListener is a net.Listener for listening gRPC connections. |
|
type OutboundListener struct { |
|
buffer chan net.Conn |
|
done *done.Instance |
|
} |
|
|
|
func (l *OutboundListener) add(conn net.Conn) { |
|
select { |
|
case l.buffer <- conn: |
|
case <-l.done.Wait(): |
|
conn.Close() |
|
default: |
|
conn.Close() |
|
} |
|
} |
|
|
|
// Accept implements net.Listener. |
|
func (l *OutboundListener) Accept() (net.Conn, error) { |
|
select { |
|
case <-l.done.Wait(): |
|
return nil, errors.New("listen closed") |
|
case c := <-l.buffer: |
|
return c, nil |
|
} |
|
} |
|
|
|
// Close implements net.Listener. |
|
func (l *OutboundListener) Close() error { |
|
common.Must(l.done.Close()) |
|
L: |
|
for { |
|
select { |
|
case c := <-l.buffer: |
|
c.Close() |
|
default: |
|
break L |
|
} |
|
} |
|
return nil |
|
} |
|
|
|
// Addr implements net.Listener. |
|
func (l *OutboundListener) Addr() net.Addr { |
|
return &net.TCPAddr{ |
|
IP: net.IP{0, 0, 0, 0}, |
|
Port: 0, |
|
} |
|
} |
|
|
|
// Outbound is a outbound.Handler that handles gRPC connections. |
|
type Outbound struct { |
|
tag string |
|
listener *OutboundListener |
|
access sync.RWMutex |
|
closed bool |
|
} |
|
|
|
// Dispatch implements outbound.Handler. |
|
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) { |
|
co.access.RLock() |
|
|
|
if co.closed { |
|
common.Interrupt(link.Reader) |
|
common.Interrupt(link.Writer) |
|
co.access.RUnlock() |
|
return |
|
} |
|
|
|
closeSignal := done.New() |
|
c := cnc.NewConnection(cnc.ConnectionInputMulti(link.Writer), cnc.ConnectionOutputMulti(link.Reader), cnc.ConnectionOnClose(closeSignal)) |
|
co.listener.add(c) |
|
co.access.RUnlock() |
|
<-closeSignal.Wait() |
|
} |
|
|
|
// Tag implements outbound.Handler. |
|
func (co *Outbound) Tag() string { |
|
return co.tag |
|
} |
|
|
|
// Start implements common.Runnable. |
|
func (co *Outbound) Start() error { |
|
co.access.Lock() |
|
co.closed = false |
|
co.access.Unlock() |
|
return nil |
|
} |
|
|
|
// Close implements common.Closable. |
|
func (co *Outbound) Close() error { |
|
co.access.Lock() |
|
defer co.access.Unlock() |
|
|
|
co.closed = true |
|
return co.listener.Close() |
|
}
|
|
|