Compare commits

...

4 Commits

Author SHA1 Message Date
Darien Raymond
19e0cb40e9 locker protected connection 2017-01-02 07:43:02 +01:00
Darien Raymond
5857aea881 remove inspector 2017-01-01 23:08:39 +01:00
Darien Raymond
48bddb25d7 fix lint warnings 2017-01-01 22:12:44 +01:00
Darien Raymond
4d6ca7efe0 Update version 2017-01-01 21:19:58 +01:00
16 changed files with 134 additions and 146 deletions

View File

@@ -28,32 +28,7 @@ var _ = math.Inf
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package
type Config_FixDestination int32
const (
Config_Auto Config_FixDestination = 0
Config_Enabled Config_FixDestination = 1
Config_Disabled Config_FixDestination = 2
)
var Config_FixDestination_name = map[int32]string{
0: "Auto",
1: "Enabled",
2: "Disabled",
}
var Config_FixDestination_value = map[string]int32{
"Auto": 0,
"Enabled": 1,
"Disabled": 2,
}
func (x Config_FixDestination) String() string {
return proto.EnumName(Config_FixDestination_name, int32(x))
}
func (Config_FixDestination) EnumDescriptor() ([]byte, []int) { return fileDescriptor0, []int{0, 0} }
type Config struct {
FixDestination Config_FixDestination `protobuf:"varint,1,opt,name=fix_destination,json=fixDestination,enum=v2ray.core.app.dispatcher.Config_FixDestination" json:"fix_destination,omitempty"`
}
func (m *Config) Reset() { *m = Config{} }
@@ -61,34 +36,21 @@ func (m *Config) String() string { return proto.CompactTextString(m)
func (*Config) ProtoMessage() {}
func (*Config) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
func (m *Config) GetFixDestination() Config_FixDestination {
if m != nil {
return m.FixDestination
}
return Config_Auto
}
func init() {
proto.RegisterType((*Config)(nil), "v2ray.core.app.dispatcher.Config")
proto.RegisterEnum("v2ray.core.app.dispatcher.Config_FixDestination", Config_FixDestination_name, Config_FixDestination_value)
}
func init() { proto.RegisterFile("v2ray.com/core/app/dispatcher/config.proto", fileDescriptor0) }
var fileDescriptor0 = []byte{
// 218 bytes of a gzipped FileDescriptorProto
// 134 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xd2, 0x2a, 0x33, 0x2a, 0x4a,
0xac, 0xd4, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xce, 0x2f, 0x4a, 0xd5, 0x4f, 0x2c, 0x28, 0xd0, 0x4f,
0xc9, 0x2c, 0x2e, 0x48, 0x2c, 0x49, 0xce, 0x48, 0x2d, 0xd2, 0x4f, 0xce, 0xcf, 0x4b, 0xcb, 0x4c,
0xd7, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x92, 0x84, 0xa9, 0x2d, 0x4a, 0xd5, 0x4b, 0x2c, 0x28,
0xd0, 0x43, 0xa8, 0x53, 0x9a, 0xc5, 0xc8, 0xc5, 0xe6, 0x0c, 0x56, 0x2b, 0x14, 0xc9, 0xc5, 0x9f,
0x96, 0x59, 0x11, 0x9f, 0x92, 0x5a, 0x5c, 0x92, 0x99, 0x97, 0x58, 0x92, 0x99, 0x9f, 0x27, 0xc1,
0xa8, 0xc0, 0xa8, 0xc1, 0x67, 0x64, 0xa0, 0x87, 0x53, 0xbf, 0x1e, 0x44, 0xaf, 0x9e, 0x5b, 0x66,
0x85, 0x0b, 0x42, 0x5f, 0x10, 0x5f, 0x1a, 0x0a, 0x5f, 0xc9, 0x94, 0x8b, 0x0f, 0x55, 0x85, 0x10,
0x07, 0x17, 0x8b, 0x63, 0x69, 0x49, 0xbe, 0x00, 0x83, 0x10, 0x37, 0x17, 0xbb, 0x6b, 0x5e, 0x62,
0x52, 0x4e, 0x6a, 0x8a, 0x00, 0xa3, 0x10, 0x0f, 0x17, 0x87, 0x4b, 0x66, 0x31, 0x84, 0xc7, 0xe4,
0x14, 0xc2, 0x25, 0x9b, 0x9c, 0x9f, 0x8b, 0xdb, 0x76, 0x27, 0x6e, 0x88, 0xf5, 0x01, 0x20, 0x5f,
0x46, 0x71, 0x21, 0x24, 0x56, 0x31, 0x49, 0x86, 0x19, 0x05, 0x25, 0x56, 0xea, 0x39, 0x83, 0x34,
0x39, 0x16, 0x14, 0xe8, 0xb9, 0xc0, 0xe5, 0x92, 0xd8, 0xc0, 0x81, 0x62, 0x0c, 0x08, 0x00, 0x00,
0xff, 0xff, 0x63, 0xfd, 0xa7, 0xdb, 0x42, 0x01, 0x00, 0x00,
0xd0, 0x43, 0xa8, 0x53, 0xe2, 0xe0, 0x62, 0x73, 0x06, 0x2b, 0x75, 0x0a, 0xe1, 0x92, 0x4d, 0xce,
0xcf, 0xd5, 0xc3, 0xa9, 0xd4, 0x89, 0x1b, 0xa2, 0x30, 0x00, 0x64, 0x64, 0x14, 0x17, 0x42, 0x62,
0x15, 0x93, 0x64, 0x98, 0x51, 0x50, 0x62, 0xa5, 0x9e, 0x33, 0x48, 0x93, 0x63, 0x41, 0x81, 0x9e,
0x0b, 0x5c, 0x2e, 0x89, 0x0d, 0xec, 0x02, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x60, 0xf8,
0x2f, 0x3b, 0xaf, 0x00, 0x00, 0x00,
}

View File

@@ -7,11 +7,4 @@ option java_package = "com.v2ray.core.app.dispatcher";
option java_outer_classname = "ConfigProto";
message Config {
enum FixDestination {
Auto = 0;
Enabled = 1;
Disabled = 2;
}
FixDestination fix_destination = 1;
}

View File

@@ -14,6 +14,3 @@ const (
type PacketDispatcher interface {
DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay
}
type Inspector interface {
}

View File

@@ -27,6 +27,7 @@ func NewDefaultDispatcher(space app.Space) *DefaultDispatcher {
return d
}
// Initialize initializes the dispatcher.
// Private: Used by app.Space only.
func (v *DefaultDispatcher) Initialize(space app.Space) error {
if !space.HasApp(proxyman.APP_ID_OUTBOUND_MANAGER) {
@@ -41,6 +42,7 @@ func (v *DefaultDispatcher) Initialize(space app.Space) error {
return nil
}
// Release implements common.Releasable.Release().
func (v *DefaultDispatcher) Release() {
}
@@ -85,21 +87,6 @@ func (v *DefaultDispatcher) FilterPacketAndDispatch(destination v2net.Destinatio
dispatcher.Dispatch(destination, payload, link)
}
func (v *DefaultDispatcher) FixDestination(payload *buf.Buffer, dest v2net.Destination) v2net.Destination {
if dest.Address.Family().IsDomain() || dest.Network != v2net.Network_TCP {
return dest
}
switch dest.Port {
case 80:
addr := DetectHTTPHost(payload)
if addr != nil {
dest.Address = addr
}
}
return dest
}
type DefaultDispatcherFactory struct{}
func (v DefaultDispatcherFactory) Create(space app.Space, config interface{}) (app.Application, error) {

View File

@@ -1,10 +0,0 @@
package impl
import (
"v2ray.com/core/common/buf"
v2net "v2ray.com/core/common/net"
)
func DetectHTTPHost(payload *buf.Buffer) v2net.Address {
return nil
}

View File

@@ -8,7 +8,7 @@ import (
)
var (
version = "2.13"
version = "2.14"
build = "Custom"
codename = "One for all"
intro = "An unified platform for anti-censorship."

View File

@@ -9,50 +9,59 @@ import (
"v2ray.com/core/common/signal"
)
// ConnectionRecyler is the interface for recycling connections.
type ConnectionRecyler interface {
Put(ConnectionId, net.Conn)
// Put returns a connection back to a connection pool.
Put(ConnectionID, net.Conn)
}
type ConnectionId struct {
// ConnectionID is the ID of a connection.
type ConnectionID struct {
Local v2net.Address
Remote v2net.Address
RemotePort v2net.Port
}
func NewConnectionId(source v2net.Address, dest v2net.Destination) ConnectionId {
return ConnectionId{
// NewConnectionID creates a new ConnectionId.
func NewConnectionID(source v2net.Address, dest v2net.Destination) ConnectionID {
return ConnectionID{
Local: source,
Remote: dest.Address,
RemotePort: dest.Port,
}
}
// ExpiringConnection is a connection that will expire in certain time.
type ExpiringConnection struct {
conn net.Conn
expire time.Time
}
func (o *ExpiringConnection) Expired() bool {
return o.expire.Before(time.Now())
// Expired returns true if the connection has expired.
func (ec *ExpiringConnection) Expired() bool {
return ec.expire.Before(time.Now())
}
// Pool is a connection pool.
type Pool struct {
sync.Mutex
connsByDest map[ConnectionId][]*ExpiringConnection
connsByDest map[ConnectionID][]*ExpiringConnection
cleanupOnce signal.Once
}
// NewConnectionPool creates a new Pool.
func NewConnectionPool() *Pool {
return &Pool{
connsByDest: make(map[ConnectionId][]*ExpiringConnection),
connsByDest: make(map[ConnectionID][]*ExpiringConnection),
}
}
func (o *Pool) Get(id ConnectionId) net.Conn {
o.Lock()
defer o.Unlock()
// Get returns a connection with matching connection ID. Nil if not found.
func (p *Pool) Get(id ConnectionID) net.Conn {
p.Lock()
defer p.Unlock()
list, found := o.connsByDest[id]
list, found := p.connsByDest[id]
if !found {
return nil
}
@@ -72,18 +81,18 @@ func (o *Pool) Get(id ConnectionId) net.Conn {
list[connIdx] = list[listLen-1]
}
list = list[:listLen-1]
o.connsByDest[id] = list
p.connsByDest[id] = list
return conn.conn
}
func (o *Pool) Cleanup() {
defer o.cleanupOnce.Reset()
func (p *Pool) cleanup() {
defer p.cleanupOnce.Reset()
for len(o.connsByDest) > 0 {
for len(p.connsByDest) > 0 {
time.Sleep(time.Second * 5)
expiredConns := make([]net.Conn, 0, 16)
o.Lock()
for dest, list := range o.connsByDest {
p.Lock()
for dest, list := range p.connsByDest {
validConns := make([]*ExpiringConnection, 0, len(list))
for _, conn := range list {
if conn.Expired() {
@@ -93,34 +102,35 @@ func (o *Pool) Cleanup() {
}
}
if len(validConns) != len(list) {
o.connsByDest[dest] = validConns
p.connsByDest[dest] = validConns
}
}
o.Unlock()
p.Unlock()
for _, conn := range expiredConns {
conn.Close()
}
}
}
func (o *Pool) Put(id ConnectionId, conn net.Conn) {
// Put implements ConnectionRecyler.Put().
func (p *Pool) Put(id ConnectionID, conn net.Conn) {
expiringConn := &ExpiringConnection{
conn: conn,
expire: time.Now().Add(time.Second * 4),
}
o.Lock()
defer o.Unlock()
p.Lock()
defer p.Unlock()
list, found := o.connsByDest[id]
list, found := p.connsByDest[id]
if !found {
list = []*ExpiringConnection{expiringConn}
} else {
list = append(list, expiringConn)
}
o.connsByDest[id] = list
p.connsByDest[id] = list
o.cleanupOnce.Do(func() {
go o.Cleanup()
p.cleanupOnce.Do(func() {
go p.cleanup()
})
}

View File

@@ -51,11 +51,11 @@ func TestConnectionCache(t *testing.T) {
assert := assert.On(t)
pool := NewConnectionPool()
conn := pool.Get(NewConnectionId(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.Pointer(conn).IsNil()
pool.Put(NewConnectionId(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), &TestConnection{id: "test"})
conn = pool.Get(NewConnectionId(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), &TestConnection{id: "test"})
conn = pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.String(conn.(*TestConnection).id).Equals("test")
}
@@ -64,9 +64,9 @@ func TestConnectionRecycle(t *testing.T) {
pool := NewConnectionPool()
c := &TestConnection{id: "test"}
pool.Put(NewConnectionId(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), c)
pool.Put(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))), c)
time.Sleep(6 * time.Second)
assert.Bool(c.closed).IsTrue()
conn := pool.Get(NewConnectionId(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
conn := pool.Get(NewConnectionID(v2net.LocalHostIP, v2net.TCPDestination(v2net.LocalHostIP, v2net.Port(80))))
assert.Pointer(conn).IsNil()
}

View File

@@ -3,13 +3,15 @@ package internal
import (
"net"
"reflect"
"v2ray.com/core/common/errors"
)
var (
ErrInvalidConn = errors.New("Invalid Connection.")
errInvalidConn = errors.New("Invalid Connection.")
)
// GetSysFd returns the underlying fd of a connection.
func GetSysFd(conn net.Conn) (int, error) {
cv := reflect.ValueOf(conn)
switch ce := cv.Elem(); ce.Kind() {
@@ -21,5 +23,5 @@ func GetSysFd(conn net.Conn) (int, error) {
return int(fd.Int()), nil
}
}
return 0, ErrInvalidConn
return 0, errInvalidConn
}

View File

@@ -156,7 +156,7 @@ func (v *Updater) Run() {
type SystemConnection interface {
net.Conn
Id() internal.ConnectionId
Id() internal.ConnectionID
Reset(func([]Segment))
Overhead() int
}

View File

@@ -48,15 +48,15 @@ func (o *NoOpConn) SetWriteDeadline(time.Time) error {
return nil
}
func (o *NoOpConn) Id() internal.ConnectionId {
return internal.ConnectionId{}
func (o *NoOpConn) Id() internal.ConnectionID {
return internal.ConnectionID{}
}
func (o *NoOpConn) Reset(input func([]Segment)) {}
type NoOpRecycler struct{}
func (o *NoOpRecycler) Put(internal.ConnectionId, net.Conn) {}
func (o *NoOpRecycler) Put(internal.ConnectionID, net.Conn) {}
func TestConnectionReadTimeout(t *testing.T) {
assert := assert.On(t)

View File

@@ -26,7 +26,7 @@ var (
type ClientConnection struct {
sync.RWMutex
net.Conn
id internal.ConnectionId
id internal.ConnectionID
input func([]Segment)
reader PacketReader
writer PacketWriter
@@ -56,7 +56,7 @@ func (o *ClientConnection) Read([]byte) (int, error) {
panic("KCP|ClientConnection: Read should not be called.")
}
func (o *ClientConnection) Id() internal.ConnectionId {
func (o *ClientConnection) Id() internal.ConnectionID {
return o.id
}
@@ -112,7 +112,7 @@ func DialKCP(src v2net.Address, dest v2net.Destination, options internet.DialerO
dest.Network = v2net.Network_UDP
log.Info("KCP|Dialer: Dialing KCP to ", dest)
id := internal.NewConnectionId(src, dest)
id := internal.NewConnectionID(src, dest)
conn := globalPool.Get(id)
if conn == nil {
rawConn, err := internet.DialToDest(src, dest)

View File

@@ -27,7 +27,7 @@ type ConnectionID struct {
}
type ServerConnection struct {
id internal.ConnectionId
id internal.ConnectionID
local net.Addr
remote net.Addr
writer PacketWriter
@@ -73,7 +73,7 @@ func (o *ServerConnection) SetWriteDeadline(time.Time) error {
return nil
}
func (o *ServerConnection) Id() internal.ConnectionId {
func (o *ServerConnection) Id() internal.ConnectionID {
return o.id
}
@@ -188,7 +188,7 @@ func (v *Listener) OnReceive(payload *buf.Buffer, session *proxy.SessionInfo) {
}
localAddr := v.hub.Addr()
sConn := &ServerConnection{
id: internal.NewConnectionId(v2net.LocalHostIP, src),
id: internal.NewConnectionID(v2net.LocalHostIP, src),
local: localAddr,
remote: remoteAddr,
writer: &KCPPacketWriter{
@@ -274,7 +274,7 @@ func (v *Listener) Addr() net.Addr {
return v.hub.Addr()
}
func (v *Listener) Put(internal.ConnectionId, net.Conn) {}
func (v *Listener) Put(internal.ConnectionID, net.Conn) {}
type Writer struct {
id ConnectionID

View File

@@ -3,15 +3,12 @@ package tcp
import (
"io"
"net"
"sync"
"time"
"v2ray.com/core/transport/internet/internal"
)
type ConnectionManager interface {
Put(internal.ConnectionId, net.Conn)
}
type RawConnection struct {
net.TCPConn
}
@@ -27,14 +24,15 @@ func (v *RawConnection) SysFd() (int, error) {
}
type Connection struct {
id internal.ConnectionId
sync.RWMutex
id internal.ConnectionID
reusable bool
conn net.Conn
listener ConnectionManager
listener internal.ConnectionRecyler
config *Config
}
func NewConnection(id internal.ConnectionId, conn net.Conn, manager ConnectionManager, config *Config) *Connection {
func NewConnection(id internal.ConnectionID, conn net.Conn, manager internal.ConnectionRecyler, config *Config) *Connection {
return &Connection{
id: id,
conn: conn,
@@ -45,22 +43,30 @@ func NewConnection(id internal.ConnectionId, conn net.Conn, manager ConnectionMa
}
func (v *Connection) Read(b []byte) (int, error) {
if v == nil || v.conn == nil {
conn := v.underlyingConn()
if conn == nil {
return 0, io.EOF
}
return v.conn.Read(b)
return conn.Read(b)
}
func (v *Connection) Write(b []byte) (int, error) {
if v == nil || v.conn == nil {
conn := v.underlyingConn()
if conn == nil {
return 0, io.ErrClosedPipe
}
return v.conn.Write(b)
return conn.Write(b)
}
func (v *Connection) Close() error {
if v == nil || v.conn == nil {
if v == nil {
return io.ErrClosedPipe
}
v.Lock()
defer v.Unlock()
if v.conn == nil {
return io.ErrClosedPipe
}
if v.Reusable() {
@@ -73,33 +79,74 @@ func (v *Connection) Close() error {
}
func (v *Connection) LocalAddr() net.Addr {
return v.conn.LocalAddr()
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.LocalAddr()
}
func (v *Connection) RemoteAddr() net.Addr {
return v.conn.RemoteAddr()
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.RemoteAddr()
}
func (v *Connection) SetDeadline(t time.Time) error {
return v.conn.SetDeadline(t)
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetDeadline(t)
}
func (v *Connection) SetReadDeadline(t time.Time) error {
return v.conn.SetReadDeadline(t)
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetReadDeadline(t)
}
func (v *Connection) SetWriteDeadline(t time.Time) error {
return v.conn.SetWriteDeadline(t)
conn := v.underlyingConn()
if conn == nil {
return nil
}
return conn.SetWriteDeadline(t)
}
func (v *Connection) SetReusable(reusable bool) {
if v == nil {
return
}
v.reusable = reusable
}
func (v *Connection) Reusable() bool {
if v == nil {
return false
}
return v.config.IsConnectionReuse() && v.reusable
}
func (v *Connection) SysFd() (int, error) {
return internal.GetSysFd(v.conn)
conn := v.underlyingConn()
if conn == nil {
return 0, io.ErrClosedPipe
}
return internal.GetSysFd(conn)
}
func (v *Connection) underlyingConn() net.Conn {
if v == nil {
return nil
}
v.RLock()
defer v.RUnlock()
return v.conn
}

View File

@@ -27,7 +27,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
}
tcpSettings := networkSettings.(*Config)
id := internal.NewConnectionId(src, dest)
id := internal.NewConnectionID(src, dest)
var conn net.Conn
if dest.Network == v2net.Network_TCP && tcpSettings.IsConnectionReuse() {
conn = globalCache.Get(id)

View File

@@ -89,7 +89,7 @@ func (v *TCPListener) Accept() (internet.Connection, error) {
return nil, connErr.err
}
conn := connErr.conn
return NewConnection(internal.ConnectionId{}, conn, v, v.config), nil
return NewConnection(internal.ConnectionID{}, conn, v, v.config), nil
case <-time.After(time.Second * 2):
}
}
@@ -125,7 +125,7 @@ func (v *TCPListener) KeepAccepting() {
}
}
func (v *TCPListener) Put(id internal.ConnectionId, conn net.Conn) {
func (v *TCPListener) Put(id internal.ConnectionID, conn net.Conn) {
v.Lock()
defer v.Unlock()
if !v.acccepting {