You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
v2ray-core/app/commander/outbound.go

110 lines
2.1 KiB

package commander
import (
"context"
"sync"
"v2ray.com/core"
7 years ago
"v2ray.com/core/common"
"v2ray.com/core/common/net"
"v2ray.com/core/common/signal/done"
"v2ray.com/core/transport/pipe"
)
// OutboundListener is a net.Listener for listening gRPC connections.
type OutboundListener struct {
buffer chan net.Conn
done *done.Instance
}
func (l *OutboundListener) add(conn net.Conn) {
select {
case l.buffer <- conn:
case <-l.done.Wait():
conn.Close() // nolint: errcheck
default:
conn.Close() // nolint: errcheck
}
}
7 years ago
// Accept implements net.Listener.
func (l *OutboundListener) Accept() (net.Conn, error) {
select {
case <-l.done.Wait():
7 years ago
return nil, newError("listen closed")
case c := <-l.buffer:
return c, nil
}
}
7 years ago
// Close implement net.Listener.
func (l *OutboundListener) Close() error {
7 years ago
common.Must(l.done.Close())
L:
for {
select {
case c := <-l.buffer:
c.Close() // nolint: errcheck
default:
break L
}
}
return nil
}
7 years ago
// Addr implements net.Listener.
func (l *OutboundListener) Addr() net.Addr {
return &net.TCPAddr{
IP: net.IP{0, 0, 0, 0},
Port: 0,
}
}
7 years ago
// Outbound is a core.OutboundHandler that handles gRPC connections.
type Outbound struct {
tag string
listener *OutboundListener
access sync.RWMutex
closed bool
}
7 years ago
// Dispatch implements core.OutboundHandler.
func (co *Outbound) Dispatch(ctx context.Context, link *core.Link) {
co.access.RLock()
if co.closed {
pipe.CloseError(link.Reader)
pipe.CloseError(link.Writer)
co.access.RUnlock()
return
}
closeSignal := done.New()
c := net.NewConnection(net.ConnectionInputMulti(link.Writer), net.ConnectionOutputMulti(link.Reader), net.ConnectionOnClose(closeSignal))
co.listener.add(c)
co.access.RUnlock()
<-closeSignal.Wait()
}
7 years ago
// Tag implements core.OutboundHandler.
7 years ago
func (co *Outbound) Tag() string {
return co.tag
}
7 years ago
// Start implements common.Runnable.
7 years ago
func (co *Outbound) Start() error {
co.access.Lock()
co.closed = false
co.access.Unlock()
return nil
}
7 years ago
// Close implements common.Closable.
7 years ago
func (co *Outbound) Close() error {
co.access.Lock()
7 years ago
defer co.access.Unlock()
7 years ago
co.closed = true
return co.listener.Close()
}