You've already forked v2ray-core
Compare commits
26 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e9d1e8d1d4 | ||
|
|
58ae7e4967 | ||
|
|
2a52ae9e96 | ||
|
|
c74ca91bd3 | ||
|
|
3cc0783d9c | ||
|
|
a44d556667 | ||
|
|
f6aa7a0053 | ||
|
|
609dbc1f13 | ||
|
|
690d71b16e | ||
|
|
bdfb879963 | ||
|
|
e6214b7a87 | ||
|
|
7bc98503a8 | ||
|
|
5ff2b3453a | ||
|
|
7d31c0641b | ||
|
|
7c751fcca0 | ||
|
|
22fa151391 | ||
|
|
c347e50c28 | ||
|
|
7d1426ff7f | ||
|
|
c68da6a0e8 | ||
|
|
5769df496b | ||
|
|
538b0720d5 | ||
|
|
ea33b7691b | ||
|
|
f195f15536 | ||
|
|
f7379bc1c3 | ||
|
|
38e4cad8d1 | ||
|
|
29d62185cf |
@@ -1,6 +1,6 @@
|
||||
language: go
|
||||
go:
|
||||
- 1.7.3
|
||||
- 1.7.4
|
||||
go_import_path: v2ray.com/core
|
||||
git:
|
||||
depth: 5
|
||||
|
||||
2
LICENSE
2
LICENSE
@@ -1,6 +1,6 @@
|
||||
The MIT License (MIT)
|
||||
|
||||
Copyright (c) 2015-2016 V2Ray
|
||||
Copyright (c) 2015-2017 V2Ray
|
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
of this software and associated documentation files (the "Software"), to deal
|
||||
|
||||
@@ -14,3 +14,6 @@ const (
|
||||
type PacketDispatcher interface {
|
||||
DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay
|
||||
}
|
||||
|
||||
type Inspector interface {
|
||||
}
|
||||
|
||||
@@ -39,6 +39,7 @@ func (v *OutboundProxy) RegisterDialer() {
|
||||
internet.ProxyDialer = v.Dial
|
||||
}
|
||||
|
||||
// Dial implements internet.Dialer.
|
||||
func (v *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
|
||||
handler := v.outboundManager.GetHandler(options.Proxy.Tag)
|
||||
if handler == nil {
|
||||
@@ -50,14 +51,15 @@ func (v *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options
|
||||
log.Info("Proxy: Dialing to ", dest)
|
||||
stream := ray.NewRay()
|
||||
go handler.Dispatch(dest, nil, stream)
|
||||
return NewProxyConnection(src, dest, stream), nil
|
||||
return NewConnection(src, dest, stream), nil
|
||||
}
|
||||
|
||||
// Release implements common.Releasable.Release().
|
||||
func (v *OutboundProxy) Release() {
|
||||
|
||||
}
|
||||
|
||||
type ProxyConnection struct {
|
||||
type Connection struct {
|
||||
stream ray.Ray
|
||||
closed bool
|
||||
localAddr net.Addr
|
||||
@@ -67,8 +69,8 @@ type ProxyConnection struct {
|
||||
writer *buf.BytesToBufferWriter
|
||||
}
|
||||
|
||||
func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *ProxyConnection {
|
||||
return &ProxyConnection{
|
||||
func NewConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *Connection {
|
||||
return &Connection{
|
||||
stream: stream,
|
||||
localAddr: &net.TCPAddr{
|
||||
IP: []byte{0, 0, 0, 0},
|
||||
@@ -83,21 +85,24 @@ func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ra
|
||||
}
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) Read(b []byte) (int, error) {
|
||||
// Read implements net.Conn.Read().
|
||||
func (v *Connection) Read(b []byte) (int, error) {
|
||||
if v.closed {
|
||||
return 0, io.EOF
|
||||
}
|
||||
return v.reader.Read(b)
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) Write(b []byte) (int, error) {
|
||||
// Write implements net.Conn.Write().
|
||||
func (v *Connection) Write(b []byte) (int, error) {
|
||||
if v.closed {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
return v.writer.Write(b)
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) Close() error {
|
||||
// Close implements net.Conn.Close().
|
||||
func (v *Connection) Close() error {
|
||||
v.closed = true
|
||||
v.stream.InboundInput().Close()
|
||||
v.stream.InboundOutput().Release()
|
||||
@@ -106,30 +111,32 @@ func (v *ProxyConnection) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) LocalAddr() net.Addr {
|
||||
// LocalAddr implements net.Conn.LocalAddr().
|
||||
func (v *Connection) LocalAddr() net.Addr {
|
||||
return v.localAddr
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) RemoteAddr() net.Addr {
|
||||
// RemoteAddr implements net.Conn.RemoteAddr().
|
||||
func (v *Connection) RemoteAddr() net.Addr {
|
||||
return v.remoteAddr
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) SetDeadline(t time.Time) error {
|
||||
func (v *Connection) SetDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) SetReadDeadline(t time.Time) error {
|
||||
func (v *Connection) SetReadDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) SetWriteDeadline(t time.Time) error {
|
||||
func (v *Connection) SetWriteDeadline(t time.Time) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) Reusable() bool {
|
||||
func (v *Connection) Reusable() bool {
|
||||
return false
|
||||
}
|
||||
|
||||
func (v *ProxyConnection) SetReusable(bool) {
|
||||
func (v *Connection) SetReusable(bool) {
|
||||
|
||||
}
|
||||
|
||||
@@ -7,12 +7,14 @@ import (
|
||||
. "v2ray.com/core/app/proxy"
|
||||
"v2ray.com/core/app/proxyman"
|
||||
"v2ray.com/core/app/proxyman/outbound"
|
||||
"v2ray.com/core/common"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/proxy/freedom"
|
||||
"v2ray.com/core/testing/assert"
|
||||
"v2ray.com/core/testing/servers/tcp"
|
||||
"v2ray.com/core/transport/internet"
|
||||
_ "v2ray.com/core/transport/internet/tcp"
|
||||
)
|
||||
|
||||
func TestProxyDial(t *testing.T) {
|
||||
@@ -20,12 +22,12 @@ func TestProxyDial(t *testing.T) {
|
||||
|
||||
space := app.NewSpace()
|
||||
outboundManager := outbound.New()
|
||||
outboundManager.SetHandler("tag", freedom.New(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{
|
||||
common.Must(outboundManager.SetHandler("tag", freedom.New(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{
|
||||
Tag: "tag",
|
||||
StreamSettings: &internet.StreamConfig{
|
||||
Network: v2net.Network_RawTCP,
|
||||
},
|
||||
}))
|
||||
})))
|
||||
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundManager)
|
||||
|
||||
proxy := NewOutboundProxy(space)
|
||||
@@ -64,6 +66,6 @@ func TestProxyDial(t *testing.T) {
|
||||
|
||||
assert.Bytes(xor(b[:nBytes])).Equals([]byte{'a', 'b', 'c', 'd'})
|
||||
|
||||
conn.Close()
|
||||
common.Must(conn.Close())
|
||||
tcpServer.Close()
|
||||
}
|
||||
|
||||
@@ -29,6 +29,8 @@ func (b *Buffer) Release() {
|
||||
}
|
||||
b.v = nil
|
||||
b.pool = nil
|
||||
b.start = 0
|
||||
b.end = 0
|
||||
}
|
||||
|
||||
// Clear clears the content of the buffer, results an empty buffer with
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package buf
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
import "io"
|
||||
import "v2ray.com/core/common"
|
||||
|
||||
// BytesToBufferReader is a Reader that adjusts its reading speed automatically.
|
||||
type BytesToBufferReader struct {
|
||||
@@ -48,11 +46,10 @@ func (v *BytesToBufferReader) Read() (*Buffer, error) {
|
||||
|
||||
// Release implements Releasable.Release().
|
||||
func (v *BytesToBufferReader) Release() {
|
||||
v.reader = nil
|
||||
common.Release(v.reader)
|
||||
}
|
||||
|
||||
type BufferToBytesReader struct {
|
||||
sync.Mutex
|
||||
stream Reader
|
||||
current *Buffer
|
||||
eof bool
|
||||
@@ -74,8 +71,6 @@ func (v *BufferToBytesReader) Read(b []byte) (int, error) {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
if v.current == nil {
|
||||
v.Fill()
|
||||
if v.eof {
|
||||
@@ -92,11 +87,7 @@ func (v *BufferToBytesReader) Read(b []byte) (int, error) {
|
||||
|
||||
// Release implements Releasable.Release().
|
||||
func (v *BufferToBytesReader) Release() {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
v.eof = true
|
||||
v.current.Release()
|
||||
v.current = nil
|
||||
v.stream = nil
|
||||
}
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
package buf
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
)
|
||||
import "io"
|
||||
import "v2ray.com/core/common"
|
||||
|
||||
// BufferToBytesWriter is a Writer that writes alloc.Buffer into underlying writer.
|
||||
type BufferToBytesWriter struct {
|
||||
@@ -28,21 +26,14 @@ func (v *BufferToBytesWriter) Write(buffer *Buffer) error {
|
||||
|
||||
// Release implements Releasable.Release().
|
||||
func (v *BufferToBytesWriter) Release() {
|
||||
v.writer = nil
|
||||
common.Release(v.writer)
|
||||
}
|
||||
|
||||
type BytesToBufferWriter struct {
|
||||
sync.Mutex
|
||||
writer Writer
|
||||
}
|
||||
|
||||
func (v *BytesToBufferWriter) Write(payload []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
bytesWritten := 0
|
||||
size := len(payload)
|
||||
for size > 0 {
|
||||
@@ -62,8 +53,5 @@ func (v *BytesToBufferWriter) Write(payload []byte) (int, error) {
|
||||
|
||||
// Release implements Releasable.Release()
|
||||
func (v *BytesToBufferWriter) Release() {
|
||||
v.Lock()
|
||||
v.writer.Release()
|
||||
v.writer = nil
|
||||
v.Unlock()
|
||||
}
|
||||
|
||||
@@ -2,7 +2,6 @@ package bufio
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
@@ -10,55 +9,44 @@ import (
|
||||
|
||||
// BufferedReader is a reader with internal cache.
|
||||
type BufferedReader struct {
|
||||
sync.Mutex
|
||||
reader io.Reader
|
||||
buffer *buf.Buffer
|
||||
cached bool
|
||||
reader io.Reader
|
||||
buffer *buf.Buffer
|
||||
buffered bool
|
||||
}
|
||||
|
||||
// NewReader creates a new BufferedReader based on an io.Reader.
|
||||
func NewReader(rawReader io.Reader) *BufferedReader {
|
||||
return &BufferedReader{
|
||||
reader: rawReader,
|
||||
buffer: buf.New(),
|
||||
cached: true,
|
||||
reader: rawReader,
|
||||
buffer: buf.New(),
|
||||
buffered: true,
|
||||
}
|
||||
}
|
||||
|
||||
// Release implements Releasable.Release().
|
||||
func (v *BufferedReader) Release() {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
if releasable, ok := v.reader.(common.Releasable); ok {
|
||||
releasable.Release()
|
||||
if v.buffer != nil {
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
}
|
||||
v.reader = nil
|
||||
|
||||
common.Release(v.reader)
|
||||
}
|
||||
|
||||
// Cached returns true if the internal cache is effective.
|
||||
func (v *BufferedReader) Cached() bool {
|
||||
return v.cached
|
||||
// IsBuffered returns true if the internal cache is effective.
|
||||
func (v *BufferedReader) IsBuffered() bool {
|
||||
return v.buffered
|
||||
}
|
||||
|
||||
// SetCached is to enable or disable internal cache. If cache is disabled,
|
||||
// Read() and Write() calls will be delegated to the underlying io.Reader directly.
|
||||
func (v *BufferedReader) SetCached(cached bool) {
|
||||
v.cached = cached
|
||||
// SetBuffered is to enable or disable internal cache. If cache is disabled,
|
||||
// Read() calls will be delegated to the underlying io.Reader directly.
|
||||
func (v *BufferedReader) SetBuffered(cached bool) {
|
||||
v.buffered = cached
|
||||
}
|
||||
|
||||
// Read implements io.Reader.Read().
|
||||
func (v *BufferedReader) Read(b []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if v.reader == nil {
|
||||
return 0, io.EOF
|
||||
}
|
||||
|
||||
if !v.cached {
|
||||
if !v.buffered || v.buffer == nil {
|
||||
if !v.buffer.IsEmpty() {
|
||||
return v.buffer.Read(b)
|
||||
}
|
||||
|
||||
@@ -18,7 +18,7 @@ func TestBufferedReader(t *testing.T) {
|
||||
len := content.Len()
|
||||
|
||||
reader := NewReader(content)
|
||||
assert.Bool(reader.Cached()).IsTrue()
|
||||
assert.Bool(reader.IsBuffered()).IsTrue()
|
||||
|
||||
payload := make([]byte, 16)
|
||||
|
||||
|
||||
@@ -2,36 +2,31 @@ package bufio
|
||||
|
||||
import (
|
||||
"io"
|
||||
"sync"
|
||||
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/errors"
|
||||
)
|
||||
|
||||
// BufferedWriter is an io.Writer with internal buffer. It writes to underlying writer when buffer is full or on demand.
|
||||
// This type is not thread safe.
|
||||
type BufferedWriter struct {
|
||||
sync.Mutex
|
||||
writer io.Writer
|
||||
buffer *buf.Buffer
|
||||
cached bool
|
||||
writer io.Writer
|
||||
buffer *buf.Buffer
|
||||
buffered bool
|
||||
}
|
||||
|
||||
// NewWriter creates a new BufferedWriter.
|
||||
func NewWriter(rawWriter io.Writer) *BufferedWriter {
|
||||
return &BufferedWriter{
|
||||
writer: rawWriter,
|
||||
buffer: buf.NewSmall(),
|
||||
cached: true,
|
||||
writer: rawWriter,
|
||||
buffer: buf.NewSmall(),
|
||||
buffered: true,
|
||||
}
|
||||
}
|
||||
|
||||
// ReadFrom implements io.ReaderFrom.ReadFrom().
|
||||
func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
totalBytes := int64(0)
|
||||
for {
|
||||
oriSize := v.buffer.Len()
|
||||
@@ -43,19 +38,14 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
|
||||
}
|
||||
return totalBytes, err
|
||||
}
|
||||
v.FlushWithoutLock()
|
||||
if err := v.Flush(); err != nil {
|
||||
return totalBytes, err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *BufferedWriter) Write(b []byte) (int, error) {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if v.writer == nil {
|
||||
return 0, io.ErrClosedPipe
|
||||
}
|
||||
|
||||
if !v.cached {
|
||||
if !v.buffered || v.buffer == nil {
|
||||
return v.writer.Write(b)
|
||||
}
|
||||
nBytes, err := v.buffer.Write(b)
|
||||
@@ -63,7 +53,7 @@ func (v *BufferedWriter) Write(b []byte) (int, error) {
|
||||
return 0, err
|
||||
}
|
||||
if v.buffer.IsFull() {
|
||||
err := v.FlushWithoutLock()
|
||||
err := v.Flush()
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
@@ -76,18 +66,8 @@ func (v *BufferedWriter) Write(b []byte) (int, error) {
|
||||
return len(b), nil
|
||||
}
|
||||
|
||||
// Flush writes all buffered content into underlying writer, if any.
|
||||
func (v *BufferedWriter) Flush() error {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
if v.writer == nil {
|
||||
return io.ErrClosedPipe
|
||||
}
|
||||
|
||||
return v.FlushWithoutLock()
|
||||
}
|
||||
|
||||
func (v *BufferedWriter) FlushWithoutLock() error {
|
||||
defer v.buffer.Clear()
|
||||
for !v.buffer.IsEmpty() {
|
||||
nBytes, err := v.writer.Write(v.buffer.Bytes())
|
||||
@@ -99,28 +79,28 @@ func (v *BufferedWriter) FlushWithoutLock() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *BufferedWriter) Cached() bool {
|
||||
return v.cached
|
||||
// IsBuffered returns true if this BufferedWriter holds a buffer.
|
||||
func (v *BufferedWriter) IsBuffered() bool {
|
||||
return v.buffered
|
||||
}
|
||||
|
||||
func (v *BufferedWriter) SetCached(cached bool) {
|
||||
v.cached = cached
|
||||
// SetBuffered controls whether the BufferedWriter holds a buffer for writing. If not buffered, any write() calls into underlying writer directly.
|
||||
func (v *BufferedWriter) SetBuffered(cached bool) {
|
||||
v.buffered = cached
|
||||
if !cached && !v.buffer.IsEmpty() {
|
||||
v.Flush()
|
||||
}
|
||||
}
|
||||
|
||||
// Release implements common.Releasable.Release().
|
||||
func (v *BufferedWriter) Release() {
|
||||
v.Flush()
|
||||
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
|
||||
if releasable, ok := v.writer.(common.Releasable); ok {
|
||||
releasable.Release()
|
||||
if !v.buffer.IsEmpty() {
|
||||
v.Flush()
|
||||
}
|
||||
v.writer = nil
|
||||
|
||||
if v.buffer != nil {
|
||||
v.buffer.Release()
|
||||
v.buffer = nil
|
||||
}
|
||||
common.Release(v.writer)
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ func TestBufferedWriter(t *testing.T) {
|
||||
content := buf.New()
|
||||
|
||||
writer := NewWriter(content)
|
||||
assert.Bool(writer.Cached()).IsTrue()
|
||||
assert.Bool(writer.IsBuffered()).IsTrue()
|
||||
|
||||
payload := make([]byte, 16)
|
||||
|
||||
@@ -25,7 +25,7 @@ func TestBufferedWriter(t *testing.T) {
|
||||
|
||||
assert.Bool(content.IsEmpty()).IsTrue()
|
||||
|
||||
writer.SetCached(false)
|
||||
writer.SetBuffered(false)
|
||||
assert.Int(content.Len()).Equals(16)
|
||||
}
|
||||
|
||||
@@ -35,7 +35,7 @@ func TestBufferedWriterLargePayload(t *testing.T) {
|
||||
content := buf.NewLocal(128 * 1024)
|
||||
|
||||
writer := NewWriter(content)
|
||||
assert.Bool(writer.Cached()).IsTrue()
|
||||
assert.Bool(writer.IsBuffered()).IsTrue()
|
||||
|
||||
payload := make([]byte, 64*1024)
|
||||
rand.Read(payload)
|
||||
|
||||
@@ -18,3 +18,17 @@ type Releasable interface {
|
||||
// Release releases all references to accelerate garbage collection.
|
||||
Release()
|
||||
}
|
||||
|
||||
// Release tries to release the given object.
|
||||
func Release(v interface{}) {
|
||||
if releasable, ok := v.(Releasable); ok {
|
||||
releasable.Release()
|
||||
}
|
||||
}
|
||||
|
||||
// Must panics if err is not nil.
|
||||
func Must(err error) {
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/serial"
|
||||
)
|
||||
@@ -135,9 +136,9 @@ func (v *AuthenticationReader) EnsureChunk() error {
|
||||
v.buffer.Clear()
|
||||
} else {
|
||||
leftover := v.buffer.Bytes()
|
||||
v.buffer.Reset(func(b []byte) (int, error) {
|
||||
common.Must(v.buffer.Reset(func(b []byte) (int, error) {
|
||||
return copy(b, leftover), nil
|
||||
})
|
||||
}))
|
||||
}
|
||||
err = v.buffer.AppendSupplier(buf.ReadFrom(v.reader))
|
||||
if err == nil {
|
||||
|
||||
@@ -1,3 +1,2 @@
|
||||
// Provides common crypto libraries for V2Ray.
|
||||
|
||||
// Package crypto provides common crypto libraries for V2Ray.
|
||||
package crypto
|
||||
|
||||
@@ -20,9 +20,6 @@ func NewCryptionReader(stream cipher.Stream, reader io.Reader) *CryptionReader {
|
||||
}
|
||||
|
||||
func (v *CryptionReader) Read(data []byte) (int, error) {
|
||||
if v.reader == nil {
|
||||
return 0, common.ErrObjectReleased
|
||||
}
|
||||
nBytes, err := v.reader.Read(data)
|
||||
if nBytes > 0 {
|
||||
v.stream.XORKeyStream(data[:nBytes], data[:nBytes])
|
||||
@@ -31,8 +28,8 @@ func (v *CryptionReader) Read(data []byte) (int, error) {
|
||||
}
|
||||
|
||||
func (v *CryptionReader) Release() {
|
||||
v.reader = nil
|
||||
v.stream = nil
|
||||
common.Release(v.reader)
|
||||
common.Release(v.stream)
|
||||
}
|
||||
|
||||
type CryptionWriter struct {
|
||||
@@ -40,6 +37,7 @@ type CryptionWriter struct {
|
||||
writer io.Writer
|
||||
}
|
||||
|
||||
// NewCryptionWriter creates a new CryptionWriter.
|
||||
func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
|
||||
return &CryptionWriter{
|
||||
stream: stream,
|
||||
@@ -47,15 +45,14 @@ func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
|
||||
}
|
||||
}
|
||||
|
||||
// Write implements io.Writer.Write().
|
||||
func (v *CryptionWriter) Write(data []byte) (int, error) {
|
||||
if v.writer == nil {
|
||||
return 0, common.ErrObjectReleased
|
||||
}
|
||||
v.stream.XORKeyStream(data, data)
|
||||
return v.writer.Write(data)
|
||||
}
|
||||
|
||||
// Release implements common.Releasable.Release().
|
||||
func (v *CryptionWriter) Release() {
|
||||
v.writer = nil
|
||||
v.stream = nil
|
||||
common.Release(v.writer)
|
||||
common.Release(v.stream)
|
||||
}
|
||||
|
||||
@@ -19,10 +19,9 @@ type ErrorLog struct {
|
||||
}
|
||||
|
||||
func (v *ErrorLog) Release() {
|
||||
for index := range v.Values {
|
||||
v.Values[index] = nil
|
||||
for _, val := range v.Values {
|
||||
common.Release(val)
|
||||
}
|
||||
v.Values = nil
|
||||
}
|
||||
|
||||
func (v *ErrorLog) String() string {
|
||||
@@ -37,9 +36,9 @@ type AccessLog struct {
|
||||
}
|
||||
|
||||
func (v *AccessLog) Release() {
|
||||
v.From = nil
|
||||
v.To = nil
|
||||
v.Reason = nil
|
||||
common.Release(v.From)
|
||||
common.Release(v.To)
|
||||
common.Release(v.Reason)
|
||||
}
|
||||
|
||||
func (v *AccessLog) String() string {
|
||||
|
||||
@@ -4,6 +4,8 @@ import (
|
||||
"io"
|
||||
"net"
|
||||
"time"
|
||||
|
||||
"v2ray.com/core/common"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -51,8 +53,8 @@ func (reader *TimeOutReader) SetTimeOut(value uint32) {
|
||||
}
|
||||
|
||||
func (reader *TimeOutReader) Release() {
|
||||
reader.connection = nil
|
||||
reader.worker = nil
|
||||
common.Release(reader.connection)
|
||||
common.Release(reader.worker)
|
||||
}
|
||||
|
||||
type timedReaderWorker struct {
|
||||
|
||||
@@ -23,11 +23,11 @@ func GetMessageType(message proto.Message) string {
|
||||
}
|
||||
|
||||
func GetInstance(messageType string) (interface{}, error) {
|
||||
mType := proto.MessageType(messageType).Elem()
|
||||
if mType == nil {
|
||||
mType := proto.MessageType(messageType)
|
||||
if mType == nil || mType.Elem() == nil {
|
||||
return nil, errors.New("Unknown type: " + messageType)
|
||||
}
|
||||
return reflect.New(mType).Interface(), nil
|
||||
return reflect.New(mType.Elem()).Interface(), nil
|
||||
}
|
||||
|
||||
func (v *TypedMessage) GetInstance() (interface{}, error) {
|
||||
|
||||
30
common/signal/exec.go
Normal file
30
common/signal/exec.go
Normal file
@@ -0,0 +1,30 @@
|
||||
package signal
|
||||
|
||||
func executeAndFulfill(f func() error, done chan<- error) {
|
||||
err := f()
|
||||
if err != nil {
|
||||
done <- err
|
||||
}
|
||||
close(done)
|
||||
}
|
||||
|
||||
func ExecuteAsync(f func() error) <-chan error {
|
||||
done := make(chan error, 1)
|
||||
go executeAndFulfill(f, done)
|
||||
return done
|
||||
}
|
||||
|
||||
func ErrorOrFinish2(c1, c2 <-chan error) error {
|
||||
select {
|
||||
case err, failed := <-c1:
|
||||
if failed {
|
||||
return err
|
||||
}
|
||||
return <-c2
|
||||
case err, failed := <-c2:
|
||||
if failed {
|
||||
return err
|
||||
}
|
||||
return <-c1
|
||||
}
|
||||
}
|
||||
87
common/signal/exec_test.go
Normal file
87
common/signal/exec_test.go
Normal file
@@ -0,0 +1,87 @@
|
||||
package signal_test
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
. "v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/testing/assert"
|
||||
)
|
||||
|
||||
func TestErrorOrFinish2_Error(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
c1 := make(chan error, 1)
|
||||
c2 := make(chan error, 2)
|
||||
c := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
c <- ErrorOrFinish2(c1, c2)
|
||||
}()
|
||||
|
||||
c1 <- errors.New("test")
|
||||
err := <-c
|
||||
assert.String(err.Error()).Equals("test")
|
||||
}
|
||||
|
||||
func TestErrorOrFinish2_Error2(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
c1 := make(chan error, 1)
|
||||
c2 := make(chan error, 2)
|
||||
c := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
c <- ErrorOrFinish2(c1, c2)
|
||||
}()
|
||||
|
||||
c2 <- errors.New("test")
|
||||
err := <-c
|
||||
assert.String(err.Error()).Equals("test")
|
||||
}
|
||||
|
||||
func TestErrorOrFinish2_NoneError(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
c1 := make(chan error, 1)
|
||||
c2 := make(chan error, 2)
|
||||
c := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
c <- ErrorOrFinish2(c1, c2)
|
||||
}()
|
||||
|
||||
close(c1)
|
||||
select {
|
||||
case <-c:
|
||||
t.Fail()
|
||||
default:
|
||||
}
|
||||
|
||||
close(c2)
|
||||
err := <-c
|
||||
assert.Error(err).IsNil()
|
||||
}
|
||||
|
||||
func TestErrorOrFinish2_NoneError2(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
c1 := make(chan error, 1)
|
||||
c2 := make(chan error, 2)
|
||||
c := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
c <- ErrorOrFinish2(c1, c2)
|
||||
}()
|
||||
|
||||
close(c2)
|
||||
select {
|
||||
case <-c:
|
||||
t.Fail()
|
||||
default:
|
||||
}
|
||||
|
||||
close(c1)
|
||||
err := <-c
|
||||
assert.Error(err).IsNil()
|
||||
}
|
||||
@@ -1,11 +1,12 @@
|
||||
package blackhole
|
||||
|
||||
import (
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/proxy"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Must listed after config.pb.go
|
||||
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
|
||||
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
|
||||
}
|
||||
|
||||
@@ -5,11 +5,13 @@ import (
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/app/dispatcher"
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/errors"
|
||||
"v2ray.com/core/common/log"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/internet/udp"
|
||||
@@ -165,37 +167,42 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
|
||||
Destination: dest,
|
||||
Inbound: v.meta,
|
||||
})
|
||||
defer ray.InboundOutput().Release()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
output := ray.InboundOutput()
|
||||
defer output.ForceClose()
|
||||
|
||||
reader := v2net.NewTimeOutReader(v.config.Timeout, conn)
|
||||
defer reader.Release()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.InboundInput().Close()
|
||||
|
||||
v2reader := buf.NewReader(reader)
|
||||
defer v2reader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
|
||||
log.Info("Dokodemo: Failed to transport all TCP request: ", err)
|
||||
return err
|
||||
}
|
||||
wg.Done()
|
||||
ray.InboundInput().Close()
|
||||
}()
|
||||
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
return nil
|
||||
})
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer output.ForceClose()
|
||||
|
||||
v2writer := buf.NewWriter(conn)
|
||||
defer v2writer.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
|
||||
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
|
||||
log.Info("Dokodemo: Failed to transport all TCP response: ", err)
|
||||
return err
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
wg.Wait()
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("Dokodemo: Connection ends with ", err)
|
||||
}
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
@@ -211,5 +218,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Inb
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
|
||||
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ import (
|
||||
"v2ray.com/core/testing/servers/tcp"
|
||||
"v2ray.com/core/testing/servers/udp"
|
||||
"v2ray.com/core/transport/internet"
|
||||
_ "v2ray.com/core/transport/internet/tcp"
|
||||
)
|
||||
|
||||
func TestDokodemoTCP(t *testing.T) {
|
||||
|
||||
@@ -5,6 +5,7 @@ import (
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/app/dns"
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/dice"
|
||||
"v2ray.com/core/common/errors"
|
||||
@@ -12,9 +13,9 @@ import (
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/retry"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/internet/tcp"
|
||||
"v2ray.com/core/transport/ray"
|
||||
)
|
||||
|
||||
@@ -70,8 +71,10 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
|
||||
log.Info("Freedom: Opening connection to ", destination)
|
||||
|
||||
defer payload.Release()
|
||||
defer ray.OutboundInput().Release()
|
||||
defer ray.OutboundOutput().Close()
|
||||
input := ray.OutboundInput()
|
||||
output := ray.OutboundOutput()
|
||||
defer input.ForceClose()
|
||||
defer output.Close()
|
||||
|
||||
var conn internet.Connection
|
||||
if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
|
||||
@@ -91,24 +94,24 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
input := ray.OutboundInput()
|
||||
output := ray.OutboundOutput()
|
||||
|
||||
if !payload.IsEmpty() {
|
||||
conn.Write(payload.Bytes())
|
||||
if _, err := conn.Write(payload.Bytes()); err != nil {
|
||||
log.Warning("Freedom: Failed to write to destination: ", destination, ": ", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
go func() {
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer input.ForceClose()
|
||||
|
||||
v2writer := buf.NewWriter(conn)
|
||||
defer v2writer.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(input, v2writer); err != nil {
|
||||
log.Info("Freedom: Failed to transport all TCP request: ", err)
|
||||
return err
|
||||
}
|
||||
if tcpConn, ok := conn.(*tcp.RawConnection); ok {
|
||||
tcpConn.CloseWrite()
|
||||
}
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
var reader io.Reader = conn
|
||||
|
||||
@@ -120,12 +123,21 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
|
||||
reader = v2net.NewTimeOutReader(timeout /* seconds */, conn)
|
||||
}
|
||||
|
||||
v2reader := buf.NewReader(reader)
|
||||
if err := buf.PipeUntilEOF(v2reader, output); err != nil {
|
||||
log.Info("Freedom: Failed to transport all TCP response: ", err)
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer output.Close()
|
||||
|
||||
v2reader := buf.NewReader(reader)
|
||||
defer v2reader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(v2reader, output); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("Freedom: Connection ending with ", err)
|
||||
}
|
||||
v2reader.Release()
|
||||
ray.OutboundOutput().Close()
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
@@ -141,5 +153,5 @@ func (v *Factory) Create(space app.Space, config interface{}, meta *proxy.Outbou
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
|
||||
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package freedom_test
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"testing"
|
||||
|
||||
"v2ray.com/core/app"
|
||||
@@ -18,6 +19,7 @@ import (
|
||||
"v2ray.com/core/testing/assert"
|
||||
"v2ray.com/core/testing/servers/tcp"
|
||||
"v2ray.com/core/transport/internet"
|
||||
_ "v2ray.com/core/transport/internet/tcp"
|
||||
"v2ray.com/core/transport/ray"
|
||||
)
|
||||
|
||||
@@ -32,7 +34,7 @@ func TestSinglePacket(t *testing.T) {
|
||||
return buffer
|
||||
},
|
||||
}
|
||||
_, err := tcpServer.Start()
|
||||
tcpServerAddr, err := tcpServer.Start()
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
space := app.NewSpace()
|
||||
@@ -52,7 +54,8 @@ func TestSinglePacket(t *testing.T) {
|
||||
payload := buf.NewLocal(2048)
|
||||
payload.Append([]byte(data2Send))
|
||||
|
||||
go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, tcpServer.Port), payload, traffic)
|
||||
fmt.Println(tcpServerAddr.Network, tcpServerAddr.Address, tcpServerAddr.Port)
|
||||
go freedom.Dispatch(tcpServerAddr, payload, traffic)
|
||||
traffic.InboundInput().Close()
|
||||
|
||||
respPayload, err := traffic.InboundOutput().Read()
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/errors"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/transport/internet"
|
||||
)
|
||||
|
||||
@@ -20,12 +21,6 @@ func RegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) {
|
||||
if err := RegisterInboundHandlerCreator(name, creator); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) error {
|
||||
if _, found := outboundFactories[name]; found {
|
||||
return common.ErrDuplicatedName
|
||||
@@ -34,12 +29,6 @@ func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory)
|
||||
return nil
|
||||
}
|
||||
|
||||
func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) {
|
||||
if err := RegisterOutboundHandlerCreator(name, creator); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
}
|
||||
|
||||
func CreateInboundHandler(name string, space app.Space, config interface{}, meta *InboundHandlerMeta) (InboundHandler, error) {
|
||||
creator, found := inboundFactories[name]
|
||||
if !found {
|
||||
@@ -49,6 +38,8 @@ func CreateInboundHandler(name string, space app.Space, config interface{}, meta
|
||||
meta.StreamSettings = &internet.StreamConfig{
|
||||
Network: creator.StreamCapability().Get(0),
|
||||
}
|
||||
} else if meta.StreamSettings.Network == v2net.Network_Unknown {
|
||||
meta.StreamSettings.Network = creator.StreamCapability().Get(0)
|
||||
} else {
|
||||
if !creator.StreamCapability().HasNetwork(meta.StreamSettings.Network) {
|
||||
return nil, errors.New("Proxy: Invalid network: " + meta.StreamSettings.Network.String())
|
||||
@@ -67,6 +58,8 @@ func CreateOutboundHandler(name string, space app.Space, config interface{}, met
|
||||
meta.StreamSettings = &internet.StreamConfig{
|
||||
Network: creator.StreamCapability().Get(0),
|
||||
}
|
||||
} else if meta.StreamSettings.Network == v2net.Network_Unknown {
|
||||
meta.StreamSettings.Network = creator.StreamCapability().Get(0)
|
||||
} else {
|
||||
if !creator.StreamCapability().HasNetwork(meta.StreamSettings.Network) {
|
||||
return nil, errors.New("Proxy: Invalid network: " + meta.StreamSettings.Network.String())
|
||||
|
||||
@@ -17,9 +17,9 @@ import (
|
||||
"v2ray.com/core/common/log"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/ray"
|
||||
)
|
||||
|
||||
// Server is a HTTP proxy server.
|
||||
@@ -155,35 +155,32 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
|
||||
}
|
||||
|
||||
ray := v.packetDispatcher.DispatchToOutbound(session)
|
||||
v.transport(reader, writer, ray)
|
||||
}
|
||||
|
||||
func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay) {
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
defer wg.Wait()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.InboundInput().Close()
|
||||
|
||||
go func() {
|
||||
v2reader := buf.NewReader(input)
|
||||
v2reader := buf.NewReader(reader)
|
||||
defer v2reader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
|
||||
log.Info("HTTP: Failed to transport all TCP request: ", err)
|
||||
return err
|
||||
}
|
||||
ray.InboundInput().Close()
|
||||
wg.Done()
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
go func() {
|
||||
v2writer := buf.NewWriter(output)
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.InboundOutput().ForceClose()
|
||||
|
||||
v2writer := buf.NewWriter(writer)
|
||||
defer v2writer.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
|
||||
log.Info("HTTP: Failed to transport all TCP response: ", err)
|
||||
return err
|
||||
}
|
||||
ray.InboundOutput().Release()
|
||||
wg.Done()
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
signal.ErrorOrFinish2(requestDone, responseDone)
|
||||
}
|
||||
|
||||
// @VisibleForTesting
|
||||
@@ -223,7 +220,7 @@ func (v *Server) GenerateResponse(statusCode int, status string) *http.Response
|
||||
Header: hdr,
|
||||
Body: nil,
|
||||
ContentLength: 0,
|
||||
Close: false,
|
||||
Close: true,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -239,25 +236,31 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
|
||||
StripHopByHopHeaders(request)
|
||||
|
||||
ray := v.packetDispatcher.DispatchToOutbound(session)
|
||||
defer ray.InboundInput().Close()
|
||||
defer ray.InboundOutput().Release()
|
||||
input := ray.InboundInput()
|
||||
output := ray.InboundOutput()
|
||||
|
||||
defer input.Close()
|
||||
defer output.ForceClose()
|
||||
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer input.Close()
|
||||
|
||||
var finish sync.WaitGroup
|
||||
finish.Add(1)
|
||||
go func() {
|
||||
defer finish.Done()
|
||||
requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput()))
|
||||
defer requestWriter.Release()
|
||||
|
||||
err := request.Write(requestWriter)
|
||||
if err != nil {
|
||||
log.Warning("HTTP: Failed to write request: ", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
requestWriter.Flush()
|
||||
}()
|
||||
if err := requestWriter.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer output.ForceClose()
|
||||
|
||||
finish.Add(1)
|
||||
go func() {
|
||||
defer finish.Done()
|
||||
responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
|
||||
response, err := http.ReadResponse(responseReader, request)
|
||||
if err != nil {
|
||||
@@ -265,14 +268,19 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
|
||||
response = v.GenerateResponse(503, "Service Unavailable")
|
||||
}
|
||||
responseWriter := bufio.NewWriter(writer)
|
||||
err = response.Write(responseWriter)
|
||||
if err != nil {
|
||||
log.Warning("HTTP: Failed to write response: ", err)
|
||||
return
|
||||
if err := response.Write(responseWriter); err != nil {
|
||||
return err
|
||||
}
|
||||
responseWriter.Flush()
|
||||
}()
|
||||
finish.Wait()
|
||||
|
||||
if err := responseWriter.Flush(); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("HTTP|Server: Connecton ending with ", err)
|
||||
}
|
||||
}
|
||||
|
||||
// ServerFactory is a InboundHandlerFactory.
|
||||
@@ -297,5 +305,5 @@ func (v *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *pro
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
|
||||
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
|
||||
}
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package shadowsocks
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/bufio"
|
||||
@@ -10,6 +8,7 @@ import (
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/protocol"
|
||||
"v2ray.com/core/common/retry"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/ray"
|
||||
@@ -38,8 +37,6 @@ func NewClient(config *ClientConfig, space app.Space, meta *proxy.OutboundHandle
|
||||
// Dispatch implements OutboundHandler.Dispatch().
|
||||
func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
|
||||
defer payload.Release()
|
||||
defer ray.OutboundInput().Release()
|
||||
defer ray.OutboundOutput().Close()
|
||||
|
||||
network := destination.Network
|
||||
|
||||
@@ -109,48 +106,38 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
|
||||
}
|
||||
}
|
||||
|
||||
var responseMutex sync.Mutex
|
||||
responseMutex.Lock()
|
||||
bufferedWriter.SetBuffered(false)
|
||||
|
||||
go func() {
|
||||
defer responseMutex.Unlock()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.OutboundInput().ForceClose()
|
||||
|
||||
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.OutboundOutput().Close()
|
||||
|
||||
responseReader, err := ReadTCPResponse(user, conn)
|
||||
if err != nil {
|
||||
log.Warning("Shadowsocks|Client: Failed to read response: ", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
bufferedWriter.SetCached(false)
|
||||
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("Shadowsocks|Client: Connection ends with ", err)
|
||||
}
|
||||
|
||||
responseMutex.Lock()
|
||||
}
|
||||
|
||||
if request.Command == protocol.RequestCommandUDP {
|
||||
timedReader := v2net.NewTimeOutReader(16, conn)
|
||||
var responseMutex sync.Mutex
|
||||
responseMutex.Lock()
|
||||
|
||||
go func() {
|
||||
defer responseMutex.Unlock()
|
||||
|
||||
reader := &UDPReader{
|
||||
Reader: timedReader,
|
||||
User: user,
|
||||
}
|
||||
|
||||
if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
|
||||
}
|
||||
}()
|
||||
|
||||
writer := &UDPWriter{
|
||||
Writer: conn,
|
||||
@@ -162,11 +149,35 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
|
||||
return
|
||||
}
|
||||
}
|
||||
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
|
||||
}
|
||||
|
||||
responseMutex.Lock()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.OutboundInput().ForceClose()
|
||||
|
||||
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
timedReader := v2net.NewTimeOutReader(16, conn)
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.OutboundOutput().Close()
|
||||
|
||||
reader := &UDPReader{
|
||||
Reader: timedReader,
|
||||
User: user,
|
||||
}
|
||||
|
||||
if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
|
||||
log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
signal.ErrorOrFinish2(requestDone, responseDone)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,12 +1,13 @@
|
||||
package shadowsocks
|
||||
|
||||
import (
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/proxy"
|
||||
)
|
||||
|
||||
func init() {
|
||||
// Must happen after config is initialized
|
||||
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(ClientConfig)), new(ClientFactory))
|
||||
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
|
||||
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(ClientConfig)), new(ClientFactory)))
|
||||
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"crypto/sha1"
|
||||
"io"
|
||||
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/errors"
|
||||
"v2ray.com/core/common/serial"
|
||||
@@ -70,8 +71,8 @@ func NewChunkReader(reader io.Reader, auth *Authenticator) *ChunkReader {
|
||||
}
|
||||
|
||||
func (v *ChunkReader) Release() {
|
||||
v.reader = nil
|
||||
v.auth = nil
|
||||
common.Release(v.reader)
|
||||
common.Release(v.auth)
|
||||
}
|
||||
|
||||
func (v *ChunkReader) Read() (*buf.Buffer, error) {
|
||||
@@ -124,8 +125,8 @@ func NewChunkWriter(writer io.Writer, auth *Authenticator) *ChunkWriter {
|
||||
}
|
||||
|
||||
func (v *ChunkWriter) Release() {
|
||||
v.writer = nil
|
||||
v.auth = nil
|
||||
common.Release(v.writer)
|
||||
common.Release(v.auth)
|
||||
}
|
||||
|
||||
func (v *ChunkWriter) Write(payload *buf.Buffer) error {
|
||||
|
||||
@@ -1,8 +1,6 @@
|
||||
package shadowsocks
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/app/dispatcher"
|
||||
"v2ray.com/core/common"
|
||||
@@ -12,6 +10,7 @@ import (
|
||||
"v2ray.com/core/common/log"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/protocol"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/internet/udp"
|
||||
@@ -161,7 +160,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
|
||||
}
|
||||
defer bodyReader.Release()
|
||||
|
||||
bufferedReader.SetCached(false)
|
||||
bufferedReader.SetBuffered(false)
|
||||
|
||||
userSettings := v.user.GetSettings()
|
||||
timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
|
||||
@@ -176,12 +175,11 @@ func (v *Server) handleConnection(conn internet.Connection) {
|
||||
User: request.User,
|
||||
Inbound: v.meta,
|
||||
})
|
||||
defer ray.InboundOutput().Release()
|
||||
defer ray.InboundOutput().ForceClose()
|
||||
defer ray.InboundInput().Close()
|
||||
|
||||
var writeFinish sync.Mutex
|
||||
writeFinish.Lock()
|
||||
go func() {
|
||||
defer writeFinish.Unlock()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.InboundOutput().ForceClose()
|
||||
|
||||
bufferedWriter := bufio.NewWriter(conn)
|
||||
defer bufferedWriter.Release()
|
||||
@@ -189,26 +187,38 @@ func (v *Server) handleConnection(conn internet.Connection) {
|
||||
responseWriter, err := WriteTCPResponse(request, bufferedWriter)
|
||||
if err != nil {
|
||||
log.Warning("Shadowsocks|Server: Failed to write response: ", err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
defer responseWriter.Release()
|
||||
|
||||
if payload, err := ray.InboundOutput().Read(); err == nil {
|
||||
responseWriter.Write(payload)
|
||||
bufferedWriter.SetCached(false)
|
||||
|
||||
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
|
||||
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
|
||||
}
|
||||
payload, err := ray.InboundOutput().Read()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}()
|
||||
responseWriter.Write(payload)
|
||||
bufferedWriter.SetBuffered(false)
|
||||
|
||||
if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
|
||||
log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
|
||||
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
|
||||
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
})
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer ray.InboundInput().Close()
|
||||
|
||||
if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
|
||||
log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("Shadowsocks|Server: Connection ends with ", err)
|
||||
}
|
||||
ray.InboundInput().Close()
|
||||
|
||||
writeFinish.Lock()
|
||||
}
|
||||
|
||||
type ServerFactory struct{}
|
||||
|
||||
@@ -7,6 +7,7 @@ import (
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/app/dispatcher"
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/bufio"
|
||||
"v2ray.com/core/common/crypto"
|
||||
@@ -14,6 +15,7 @@ import (
|
||||
"v2ray.com/core/common/log"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/proxy/socks/protocol"
|
||||
"v2ray.com/core/transport/internet"
|
||||
@@ -216,8 +218,8 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.Buffer
|
||||
return err
|
||||
}
|
||||
|
||||
reader.SetCached(false)
|
||||
writer.SetCached(false)
|
||||
reader.SetBuffered(false)
|
||||
writer.SetBuffered(false)
|
||||
|
||||
dest := request.Destination()
|
||||
session := &proxy.SessionInfo{
|
||||
@@ -279,8 +281,8 @@ func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.Buffer
|
||||
return ErrUnsupportedSocksCommand
|
||||
}
|
||||
|
||||
reader.SetCached(false)
|
||||
writer.SetCached(false)
|
||||
reader.SetBuffered(false)
|
||||
writer.SetBuffered(false)
|
||||
|
||||
dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port)
|
||||
session := &proxy.SessionInfo{
|
||||
@@ -298,26 +300,36 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
|
||||
input := ray.InboundInput()
|
||||
output := ray.InboundOutput()
|
||||
|
||||
defer input.Close()
|
||||
defer output.Release()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer input.Close()
|
||||
|
||||
go func() {
|
||||
v2reader := buf.NewReader(reader)
|
||||
defer v2reader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(v2reader, input); err != nil {
|
||||
log.Info("Socks|Server: Failed to transport all TCP request: ", err)
|
||||
return err
|
||||
}
|
||||
input.Close()
|
||||
}()
|
||||
return nil
|
||||
})
|
||||
|
||||
v2writer := buf.NewWriter(writer)
|
||||
defer v2writer.Release()
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer output.ForceClose()
|
||||
|
||||
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
|
||||
log.Info("Socks|Server: Failed to transport all TCP response: ", err)
|
||||
v2writer := buf.NewWriter(writer)
|
||||
defer v2writer.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
|
||||
log.Info("Socks|Server: Failed to transport all TCP response: ", err)
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("Socks|Server: Connection ends with ", err)
|
||||
}
|
||||
output.Release()
|
||||
}
|
||||
|
||||
type ServerFactory struct{}
|
||||
@@ -333,5 +345,5 @@ func (v *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *pro
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
|
||||
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
|
||||
}
|
||||
|
||||
@@ -35,10 +35,10 @@ func (v *Account) AsAccount() (protocol.Account, error) {
|
||||
log.Error("VMess: Failed to parse ID: ", err)
|
||||
return nil, err
|
||||
}
|
||||
protoId := protocol.NewID(id)
|
||||
protoID := protocol.NewID(id)
|
||||
return &InternalAccount{
|
||||
ID: protoId,
|
||||
AlterIDs: protocol.NewAlterIDs(protoId, uint16(v.AlterId)),
|
||||
ID: protoID,
|
||||
AlterIDs: protocol.NewAlterIDs(protoID, uint16(v.AlterId)),
|
||||
Security: v.SecuritySettings.AsSecurity(),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -15,11 +15,13 @@ import (
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/protocol"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/common/uuid"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/proxy/vmess"
|
||||
"v2ray.com/core/proxy/vmess/encoding"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/ray"
|
||||
)
|
||||
|
||||
type userByEmail struct {
|
||||
@@ -128,6 +130,48 @@ func (v *VMessInboundHandler) Start() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func transferRequest(session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error {
|
||||
defer output.Close()
|
||||
|
||||
bodyReader := session.DecodeRequestBody(request, input)
|
||||
defer bodyReader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
|
||||
defer input.ForceClose()
|
||||
session.EncodeResponseHeader(response, output)
|
||||
|
||||
bodyWriter := session.EncodeResponseBody(request, output)
|
||||
|
||||
// Optimize for small response packet
|
||||
if data, err := input.Read(); err == nil {
|
||||
if err := bodyWriter.Write(data); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if bufferedWriter, ok := output.(*bufio.BufferedWriter); ok {
|
||||
bufferedWriter.SetBuffered(false)
|
||||
}
|
||||
|
||||
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
if request.Option.Has(protocol.RequestOptionChunkStream) {
|
||||
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
|
||||
defer connection.Close()
|
||||
|
||||
@@ -155,13 +199,13 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
|
||||
if err != nil {
|
||||
if errors.Cause(err) != io.EOF {
|
||||
log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
|
||||
log.Info("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
|
||||
log.Info("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err)
|
||||
}
|
||||
connection.SetReusable(false)
|
||||
return
|
||||
}
|
||||
log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
|
||||
log.Info("VMessIn: Received request for ", request.Destination())
|
||||
log.Info("VMess|Inbound: Received request for ", request.Destination())
|
||||
|
||||
connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
|
||||
|
||||
@@ -174,26 +218,15 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
|
||||
input := ray.InboundInput()
|
||||
output := ray.InboundOutput()
|
||||
defer input.Close()
|
||||
defer output.Release()
|
||||
|
||||
var readFinish sync.Mutex
|
||||
readFinish.Lock()
|
||||
defer output.ForceClose()
|
||||
|
||||
userSettings := request.User.GetSettings()
|
||||
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
|
||||
reader.SetCached(false)
|
||||
reader.SetBuffered(false)
|
||||
|
||||
go func() {
|
||||
bodyReader := session.DecodeRequestBody(request, reader)
|
||||
if err := buf.PipeUntilEOF(bodyReader, input); err != nil {
|
||||
log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
|
||||
connection.SetReusable(false)
|
||||
}
|
||||
bodyReader.Release()
|
||||
|
||||
input.Close()
|
||||
readFinish.Unlock()
|
||||
}()
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
return transferRequest(session, request, reader, input)
|
||||
})
|
||||
|
||||
writer := bufio.NewWriter(connection)
|
||||
defer writer.Release()
|
||||
@@ -206,34 +239,21 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
|
||||
response.Option.Set(protocol.ResponseOptionConnectionReuse)
|
||||
}
|
||||
|
||||
session.EncodeResponseHeader(response, writer)
|
||||
|
||||
bodyWriter := session.EncodeResponseBody(request, writer)
|
||||
|
||||
// Optimize for small response packet
|
||||
if data, err := output.Read(); err == nil {
|
||||
if err := bodyWriter.Write(data); err != nil {
|
||||
connection.SetReusable(false)
|
||||
}
|
||||
|
||||
writer.SetCached(false)
|
||||
|
||||
if err := buf.PipeUntilEOF(output, bodyWriter); err != nil {
|
||||
log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
|
||||
connection.SetReusable(false)
|
||||
}
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
return transferResponse(session, request, response, output, writer)
|
||||
})
|
||||
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("VMess|Inbound: Connection ending with ", err)
|
||||
connection.SetReusable(false)
|
||||
return
|
||||
}
|
||||
output.Release()
|
||||
if request.Option.Has(protocol.RequestOptionChunkStream) {
|
||||
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
|
||||
connection.SetReusable(false)
|
||||
}
|
||||
}
|
||||
writer.Flush()
|
||||
bodyWriter.Release()
|
||||
|
||||
readFinish.Lock()
|
||||
if err := writer.Flush(); err != nil {
|
||||
log.Info("VMess|Inbound: Failed to flush remain data: ", err)
|
||||
connection.SetReusable(false)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
type Factory struct{}
|
||||
@@ -271,5 +291,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Inb
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
|
||||
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
|
||||
}
|
||||
|
||||
@@ -1,9 +1,8 @@
|
||||
package outbound
|
||||
|
||||
import (
|
||||
"sync"
|
||||
|
||||
"v2ray.com/core/app"
|
||||
"v2ray.com/core/common"
|
||||
"v2ray.com/core/common/buf"
|
||||
"v2ray.com/core/common/bufio"
|
||||
"v2ray.com/core/common/log"
|
||||
@@ -11,6 +10,7 @@ import (
|
||||
"v2ray.com/core/common/protocol"
|
||||
"v2ray.com/core/common/retry"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/signal"
|
||||
"v2ray.com/core/proxy"
|
||||
"v2ray.com/core/proxy/vmess"
|
||||
"v2ray.com/core/proxy/vmess/encoding"
|
||||
@@ -27,7 +27,8 @@ type VMessOutboundHandler struct {
|
||||
|
||||
// Dispatch implements OutboundHandler.Dispatch().
|
||||
func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
|
||||
defer ray.OutboundInput().Release()
|
||||
defer payload.Release()
|
||||
defer ray.OutboundInput().ForceClose()
|
||||
defer ray.OutboundOutput().Close()
|
||||
|
||||
var rec *protocol.ServerSpec
|
||||
@@ -79,73 +80,65 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B
|
||||
input := ray.OutboundInput()
|
||||
output := ray.OutboundOutput()
|
||||
|
||||
var requestFinish, responseFinish sync.Mutex
|
||||
requestFinish.Lock()
|
||||
responseFinish.Lock()
|
||||
|
||||
session := encoding.NewClientSession(protocol.DefaultIDHash)
|
||||
|
||||
go v.handleRequest(session, conn, request, payload, input, &requestFinish)
|
||||
go v.handleResponse(session, conn, request, rec.Destination(), output, &responseFinish)
|
||||
requestDone := signal.ExecuteAsync(func() error {
|
||||
defer input.ForceClose()
|
||||
|
||||
requestFinish.Lock()
|
||||
responseFinish.Lock()
|
||||
return
|
||||
}
|
||||
writer := bufio.NewWriter(conn)
|
||||
defer writer.Release()
|
||||
|
||||
func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input buf.Reader, finish *sync.Mutex) {
|
||||
defer finish.Unlock()
|
||||
session.EncodeRequestHeader(request, writer)
|
||||
|
||||
writer := bufio.NewWriter(conn)
|
||||
defer writer.Release()
|
||||
session.EncodeRequestHeader(request, writer)
|
||||
bodyWriter := session.EncodeRequestBody(request, writer)
|
||||
defer bodyWriter.Release()
|
||||
|
||||
bodyWriter := session.EncodeRequestBody(request, writer)
|
||||
defer bodyWriter.Release()
|
||||
|
||||
if !payload.IsEmpty() {
|
||||
if err := bodyWriter.Write(payload); err != nil {
|
||||
log.Info("VMess|Outbound: Failed to write payload. Disabling connection reuse.", err)
|
||||
conn.SetReusable(false)
|
||||
if !payload.IsEmpty() {
|
||||
if err := bodyWriter.Write(payload); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
payload.Release()
|
||||
}
|
||||
writer.SetCached(false)
|
||||
writer.SetBuffered(false)
|
||||
|
||||
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
|
||||
conn.SetReusable(false)
|
||||
}
|
||||
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if request.Option.Has(protocol.RequestOptionChunkStream) {
|
||||
err := bodyWriter.Write(buf.NewLocal(8))
|
||||
if request.Option.Has(protocol.RequestOptionChunkStream) {
|
||||
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
responseDone := signal.ExecuteAsync(func() error {
|
||||
defer output.Close()
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
defer reader.Release()
|
||||
|
||||
header, err := session.DecodeResponseHeader(reader)
|
||||
if err != nil {
|
||||
conn.SetReusable(false)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
v.handleCommand(rec.Destination(), header.Command)
|
||||
|
||||
func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output buf.Writer, finish *sync.Mutex) {
|
||||
defer finish.Unlock()
|
||||
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
|
||||
|
||||
reader := bufio.NewReader(conn)
|
||||
defer reader.Release()
|
||||
reader.SetBuffered(false)
|
||||
bodyReader := session.DecodeResponseBody(request, reader)
|
||||
defer bodyReader.Release()
|
||||
|
||||
header, err := session.DecodeResponseHeader(reader)
|
||||
if err != nil {
|
||||
conn.SetReusable(false)
|
||||
log.Warning("VMess|Outbound: Failed to read response from ", request.Destination(), ": ", err)
|
||||
return
|
||||
}
|
||||
v.handleCommand(dest, header.Command)
|
||||
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
|
||||
return nil
|
||||
})
|
||||
|
||||
reader.SetCached(false)
|
||||
bodyReader := session.DecodeResponseBody(request, reader)
|
||||
defer bodyReader.Release()
|
||||
|
||||
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
|
||||
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
|
||||
log.Info("VMess|Outbound: Connection ending with ", err)
|
||||
conn.SetReusable(false)
|
||||
}
|
||||
|
||||
@@ -178,5 +171,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Out
|
||||
}
|
||||
|
||||
func init() {
|
||||
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
|
||||
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
|
||||
}
|
||||
|
||||
@@ -69,11 +69,6 @@ func (v *TimedUserValidator) Release() {
|
||||
}
|
||||
|
||||
v.running = false
|
||||
v.validUsers = nil
|
||||
v.userHash = nil
|
||||
v.ids = nil
|
||||
v.hasher = nil
|
||||
v.cancel = nil
|
||||
}
|
||||
|
||||
func (v *TimedUserValidator) generateNewHashes(nowSec protocol.Timestamp, idx int, entry *idEntry) {
|
||||
@@ -89,10 +84,8 @@ func (v *TimedUserValidator) generateNewHashes(nowSec protocol.Timestamp, idx in
|
||||
idHash.Sum(hashValueRemoval[:0])
|
||||
idHash.Reset()
|
||||
|
||||
v.Lock()
|
||||
v.userHash[hashValue] = &indexTimePair{idx, entry.lastSec}
|
||||
delete(v.userHash, hashValueRemoval)
|
||||
v.Unlock()
|
||||
|
||||
entry.lastSec++
|
||||
entry.lastSecRemoval++
|
||||
@@ -107,9 +100,11 @@ func (v *TimedUserValidator) updateUserHash(interval time.Duration) {
|
||||
select {
|
||||
case now := <-time.After(interval):
|
||||
nowSec := protocol.Timestamp(now.Unix() + cacheDurationSec)
|
||||
v.Lock()
|
||||
for _, entry := range v.ids {
|
||||
v.generateNewHashes(nowSec, entry.userIdx, entry)
|
||||
}
|
||||
v.Unlock()
|
||||
case <-v.cancel.WaitForCancel():
|
||||
return
|
||||
}
|
||||
@@ -117,6 +112,9 @@ func (v *TimedUserValidator) updateUserHash(interval time.Duration) {
|
||||
}
|
||||
|
||||
func (v *TimedUserValidator) Add(user *protocol.User) error {
|
||||
v.Lock()
|
||||
defer v.Unlock()
|
||||
|
||||
idx := len(v.validUsers)
|
||||
v.validUsers = append(v.validUsers, user)
|
||||
rawAccount, err := user.GetTypedAccount()
|
||||
|
||||
@@ -1,9 +1,12 @@
|
||||
package scenarios
|
||||
|
||||
import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"net"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
"v2ray.com/core"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
)
|
||||
@@ -16,6 +19,32 @@ func pickPort() v2net.Port {
|
||||
return v2net.Port(atomic.AddUint32(&port, 1))
|
||||
}
|
||||
|
||||
func xor(b []byte) []byte {
|
||||
r := make([]byte, len(b))
|
||||
for i, v := range b {
|
||||
r[i] = v ^ 'c'
|
||||
}
|
||||
return r
|
||||
}
|
||||
|
||||
func readFrom(conn net.Conn, timeout time.Duration, length int) []byte {
|
||||
b := make([]byte, 2048)
|
||||
totalBytes := 0
|
||||
deadline := time.Now().Add(timeout)
|
||||
conn.SetReadDeadline(deadline)
|
||||
for totalBytes < length {
|
||||
if time.Now().After(deadline) {
|
||||
break
|
||||
}
|
||||
n, err := conn.Read(b[totalBytes:])
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
totalBytes += n
|
||||
}
|
||||
return b[:totalBytes]
|
||||
}
|
||||
|
||||
func InitializeServerConfig(config *core.Config) error {
|
||||
err := BuildV2Ray()
|
||||
if err != nil {
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
package scenarios
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"io"
|
||||
"fmt"
|
||||
"net"
|
||||
"testing"
|
||||
|
||||
"v2ray.com/core/common/buf"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/testing/assert"
|
||||
"v2ray.com/core/testing/servers/tcp"
|
||||
@@ -43,10 +43,25 @@ func TestDynamicVMess(t *testing.T) {
|
||||
|
||||
conn.CloseWrite()
|
||||
|
||||
response := bytes.NewBuffer(nil)
|
||||
_, err = io.Copy(response, conn)
|
||||
assert.Error(err).IsNil()
|
||||
assert.String("Processed: " + payload).Equals(string(response.Bytes()))
|
||||
expectedResponse := "Processed: " + payload
|
||||
finished := false
|
||||
response := buf.New()
|
||||
for {
|
||||
err := response.AppendSupplier(buf.ReadFrom(conn))
|
||||
assert.Error(err).IsNil()
|
||||
if err != nil {
|
||||
break
|
||||
}
|
||||
if response.String() == expectedResponse {
|
||||
finished = true
|
||||
break
|
||||
}
|
||||
if response.Len() > len(expectedResponse) {
|
||||
fmt.Printf("Unexpected response: %v\n", response.Bytes())
|
||||
break
|
||||
}
|
||||
}
|
||||
assert.Bool(finished).IsTrue()
|
||||
|
||||
conn.Close()
|
||||
}
|
||||
|
||||
@@ -1,15 +1,148 @@
|
||||
package scenarios
|
||||
|
||||
import (
|
||||
"net"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
|
||||
"io/ioutil"
|
||||
"os"
|
||||
|
||||
"time"
|
||||
|
||||
"v2ray.com/core"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
"v2ray.com/core/common/protocol"
|
||||
"v2ray.com/core/common/serial"
|
||||
"v2ray.com/core/common/uuid"
|
||||
"v2ray.com/core/proxy/dokodemo"
|
||||
"v2ray.com/core/proxy/freedom"
|
||||
"v2ray.com/core/proxy/vmess"
|
||||
"v2ray.com/core/proxy/vmess/inbound"
|
||||
"v2ray.com/core/proxy/vmess/outbound"
|
||||
"v2ray.com/core/testing/assert"
|
||||
"v2ray.com/core/testing/servers/tcp"
|
||||
"v2ray.com/core/transport/internet"
|
||||
"v2ray.com/core/transport/internet/tls"
|
||||
)
|
||||
|
||||
var clientConfig = &core.Config{
|
||||
Inbound: []*core.InboundConnectionConfig{
|
||||
{
|
||||
PortRange: v2net.SinglePortRange(pickPort()),
|
||||
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
|
||||
},
|
||||
},
|
||||
func mustReadFile(name string) []byte {
|
||||
content, err := ioutil.ReadFile(name)
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return content
|
||||
}
|
||||
|
||||
func TestSimpleTLSConnection(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
tcpServer := tcp.Server{
|
||||
MsgProcessor: xor,
|
||||
}
|
||||
dest, err := tcpServer.Start()
|
||||
assert.Error(err).IsNil()
|
||||
|
||||
userID := protocol.NewID(uuid.New())
|
||||
serverPort := pickPort()
|
||||
serverConfig := &core.Config{
|
||||
Inbound: []*core.InboundConnectionConfig{
|
||||
{
|
||||
PortRange: v2net.SinglePortRange(serverPort),
|
||||
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
|
||||
Settings: serial.ToTypedMessage(&inbound.Config{
|
||||
User: []*protocol.User{
|
||||
{
|
||||
Account: serial.ToTypedMessage(&vmess.Account{
|
||||
Id: userID.String(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
}),
|
||||
StreamSettings: &internet.StreamConfig{
|
||||
SecurityType: serial.GetMessageType(&tls.Config{}),
|
||||
SecuritySettings: []*serial.TypedMessage{
|
||||
serial.ToTypedMessage(&tls.Config{
|
||||
Certificate: []*tls.Certificate{
|
||||
{
|
||||
Certificate: mustReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "cert.pem")),
|
||||
Key: mustReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "key.pem")),
|
||||
},
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
Outbound: []*core.OutboundConnectionConfig{
|
||||
{
|
||||
Settings: serial.ToTypedMessage(&freedom.Config{}),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
clientPort := pickPort()
|
||||
clientConfig := &core.Config{
|
||||
Inbound: []*core.InboundConnectionConfig{
|
||||
{
|
||||
PortRange: v2net.SinglePortRange(clientPort),
|
||||
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
|
||||
Settings: serial.ToTypedMessage(&dokodemo.Config{
|
||||
Address: v2net.NewIPOrDomain(dest.Address),
|
||||
Port: uint32(dest.Port),
|
||||
NetworkList: &v2net.NetworkList{
|
||||
Network: []v2net.Network{v2net.Network_TCP},
|
||||
},
|
||||
}),
|
||||
},
|
||||
},
|
||||
Outbound: []*core.OutboundConnectionConfig{
|
||||
{
|
||||
Settings: serial.ToTypedMessage(&outbound.Config{
|
||||
Receiver: []*protocol.ServerEndpoint{
|
||||
{
|
||||
Address: v2net.NewIPOrDomain(v2net.LocalHostIP),
|
||||
Port: uint32(serverPort),
|
||||
User: []*protocol.User{
|
||||
{
|
||||
Account: serial.ToTypedMessage(&vmess.Account{
|
||||
Id: userID.String(),
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}),
|
||||
StreamSettings: &internet.StreamConfig{
|
||||
SecurityType: serial.GetMessageType(&tls.Config{}),
|
||||
SecuritySettings: []*serial.TypedMessage{
|
||||
serial.ToTypedMessage(&tls.Config{
|
||||
AllowInsecure: true,
|
||||
}),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
assert.Error(InitializeServerConfig(serverConfig)).IsNil()
|
||||
assert.Error(InitializeServerConfig(clientConfig)).IsNil()
|
||||
|
||||
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
|
||||
IP: []byte{127, 0, 0, 1},
|
||||
Port: int(clientPort),
|
||||
})
|
||||
|
||||
payload := "dokodemo request."
|
||||
nBytes, err := conn.Write([]byte(payload))
|
||||
assert.Error(err).IsNil()
|
||||
assert.Int(nBytes).Equals(len(payload))
|
||||
|
||||
conn.CloseWrite()
|
||||
|
||||
response := readFrom(conn, time.Second*2, len(payload))
|
||||
assert.Bytes(response).Equals(xor([]byte(payload)))
|
||||
conn.Close()
|
||||
|
||||
CloseAllServers()
|
||||
}
|
||||
|
||||
@@ -39,10 +39,6 @@ const (
|
||||
StateTerminated State = 5
|
||||
)
|
||||
|
||||
const (
|
||||
headerSize uint32 = 2
|
||||
)
|
||||
|
||||
func nowMillisec() int64 {
|
||||
now := time.Now()
|
||||
return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
|
||||
@@ -517,6 +513,10 @@ func (v *Connection) Input(segments []Segment) {
|
||||
v.SetState(StateTerminated)
|
||||
}
|
||||
}
|
||||
if seg.Option == SegmentOptionClose || seg.Command() == CommandTerminate {
|
||||
v.OnDataInput()
|
||||
v.OnDataOutput()
|
||||
}
|
||||
v.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
|
||||
v.receivingWorker.ProcessSendingNext(seg.SendingNext)
|
||||
v.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)
|
||||
|
||||
@@ -5,11 +5,11 @@ import (
|
||||
"v2ray.com/core/transport/internet"
|
||||
)
|
||||
|
||||
func (v *ConnectionReuse) IsEnabled() bool {
|
||||
if v == nil {
|
||||
func (v *Config) IsConnectionReuse() bool {
|
||||
if v == nil || v.ConnectionReuse == nil {
|
||||
return true
|
||||
}
|
||||
return v.Enable
|
||||
return v.ConnectionReuse.Enable
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -39,7 +39,7 @@ func NewConnection(id internal.ConnectionId, conn net.Conn, manager ConnectionMa
|
||||
id: id,
|
||||
conn: conn,
|
||||
listener: manager,
|
||||
reusable: config.ConnectionReuse.IsEnabled(),
|
||||
reusable: config.IsConnectionReuse(),
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
@@ -97,7 +97,7 @@ func (v *Connection) SetReusable(reusable bool) {
|
||||
}
|
||||
|
||||
func (v *Connection) Reusable() bool {
|
||||
return v.config.ConnectionReuse.IsEnabled() && v.reusable
|
||||
return v.config.IsConnectionReuse() && v.reusable
|
||||
}
|
||||
|
||||
func (v *Connection) SysFd() (int, error) {
|
||||
|
||||
@@ -3,6 +3,7 @@ package tcp
|
||||
import (
|
||||
"crypto/tls"
|
||||
"net"
|
||||
|
||||
"v2ray.com/core/common/errors"
|
||||
"v2ray.com/core/common/log"
|
||||
v2net "v2ray.com/core/common/net"
|
||||
@@ -28,7 +29,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
|
||||
|
||||
id := internal.NewConnectionId(src, dest)
|
||||
var conn net.Conn
|
||||
if dest.Network == v2net.Network_TCP && tcpSettings.ConnectionReuse.IsEnabled() {
|
||||
if dest.Network == v2net.Network_TCP && tcpSettings.IsConnectionReuse() {
|
||||
conn = globalCache.Get(id)
|
||||
}
|
||||
if conn == nil {
|
||||
|
||||
@@ -26,6 +26,7 @@ func (v *Config) BuildCertificates() []tls.Certificate {
|
||||
func (v *Config) GetTLSConfig() *tls.Config {
|
||||
config := &tls.Config{
|
||||
ClientSessionCache: globalSessionCache,
|
||||
NextProtos: []string{"http/1.1"},
|
||||
}
|
||||
if v == nil {
|
||||
return config
|
||||
|
||||
@@ -5,11 +5,11 @@ import (
|
||||
"v2ray.com/core/transport/internet"
|
||||
)
|
||||
|
||||
func (v *ConnectionReuse) IsEnabled() bool {
|
||||
if v == nil {
|
||||
func (c *Config) IsConnectionReuse() bool {
|
||||
if c == nil || c.ConnectionReuse == nil {
|
||||
return false
|
||||
}
|
||||
return v.Enable
|
||||
return c.ConnectionReuse.Enable
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
||||
@@ -29,7 +29,7 @@ func NewConnection(dest string, conn *wsconn, manager ConnectionManager, config
|
||||
dest: dest,
|
||||
conn: conn,
|
||||
listener: manager,
|
||||
reusable: config.ConnectionReuse.IsEnabled(),
|
||||
reusable: config.IsConnectionReuse(),
|
||||
config: config,
|
||||
}
|
||||
}
|
||||
@@ -83,14 +83,11 @@ func (v *Connection) SetWriteDeadline(t time.Time) error {
|
||||
}
|
||||
|
||||
func (v *Connection) SetReusable(reusable bool) {
|
||||
if !v.config.ConnectionReuse.IsEnabled() {
|
||||
return
|
||||
}
|
||||
v.reusable = reusable
|
||||
}
|
||||
|
||||
func (v *Connection) Reusable() bool {
|
||||
return v.reusable
|
||||
return v.config.IsConnectionReuse() && v.reusable
|
||||
}
|
||||
|
||||
func (v *Connection) SysFd() (int, error) {
|
||||
|
||||
@@ -28,7 +28,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
|
||||
|
||||
id := src.String() + "-" + dest.NetAddr()
|
||||
var conn *wsconn
|
||||
if dest.Network == v2net.Network_TCP && wsSettings.ConnectionReuse.IsEnabled() {
|
||||
if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() {
|
||||
connt := globalCache.Get(id)
|
||||
if connt != nil {
|
||||
conn = connt.(*wsconn)
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
/*Package ws implements Websocket transport
|
||||
/*Package websocket implements Websocket transport
|
||||
|
||||
Websocket transport implements a HTTP(S) compliable, surveillance proof transport method with plausible deniability.
|
||||
|
||||
|
||||
@@ -2,6 +2,8 @@ package websocket_test
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
@@ -100,8 +102,8 @@ func Test_listenWSAndDial_TLS(t *testing.T) {
|
||||
AllowInsecure: true,
|
||||
Certificate: []*v2tls.Certificate{
|
||||
{
|
||||
Certificate: ReadFile("./../../../testing/tls/cert.pem", assert),
|
||||
Key: ReadFile("./../../../testing/tls/key.pem", assert),
|
||||
Certificate: ReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "cert.pem"), assert),
|
||||
Key: ReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "key.pem"), assert),
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
@@ -159,13 +159,10 @@ func (ws *wsconn) setup() {
|
||||
}
|
||||
|
||||
func (ws *wsconn) Reusable() bool {
|
||||
return ws.reusable && !ws.connClosing
|
||||
return ws.config.IsConnectionReuse() && ws.reusable && !ws.connClosing
|
||||
}
|
||||
|
||||
func (ws *wsconn) SetReusable(reusable bool) {
|
||||
if !ws.config.ConnectionReuse.IsEnabled() {
|
||||
return
|
||||
}
|
||||
ws.reusable = reusable
|
||||
}
|
||||
|
||||
|
||||
@@ -65,11 +65,17 @@ func (v *Stream) Read() (*buf.Buffer, error) {
|
||||
return b, nil
|
||||
case <-v.srcClose:
|
||||
return nil, io.EOF
|
||||
case <-v.destClose:
|
||||
return nil, io.ErrClosedPipe
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Stream) Write(data *buf.Buffer) (err error) {
|
||||
if data.IsEmpty() {
|
||||
return
|
||||
}
|
||||
|
||||
select {
|
||||
case <-v.destClose:
|
||||
return io.ErrClosedPipe
|
||||
@@ -93,7 +99,7 @@ func (v *Stream) Close() {
|
||||
close(v.srcClose)
|
||||
}
|
||||
|
||||
func (v *Stream) Release() {
|
||||
func (v *Stream) ForceClose() {
|
||||
defer swallowPanic()
|
||||
|
||||
close(v.destClose)
|
||||
@@ -110,6 +116,8 @@ func (v *Stream) Release() {
|
||||
}
|
||||
}
|
||||
|
||||
func (v *Stream) Release() {}
|
||||
|
||||
func swallowPanic() {
|
||||
recover()
|
||||
}
|
||||
|
||||
@@ -13,7 +13,9 @@ func TestStreamIO(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
stream := NewStream()
|
||||
assert.Error(stream.Write(buf.New())).IsNil()
|
||||
b1 := buf.New()
|
||||
b1.AppendBytes('a')
|
||||
assert.Error(stream.Write(b1)).IsNil()
|
||||
|
||||
_, err := stream.Read()
|
||||
assert.Error(err).IsNil()
|
||||
@@ -22,7 +24,9 @@ func TestStreamIO(t *testing.T) {
|
||||
_, err = stream.Read()
|
||||
assert.Error(err).Equals(io.EOF)
|
||||
|
||||
err = stream.Write(buf.New())
|
||||
b2 := buf.New()
|
||||
b2.AppendBytes('b')
|
||||
err = stream.Write(b2)
|
||||
assert.Error(err).Equals(io.ErrClosedPipe)
|
||||
}
|
||||
|
||||
@@ -30,7 +34,9 @@ func TestStreamClose(t *testing.T) {
|
||||
assert := assert.On(t)
|
||||
|
||||
stream := NewStream()
|
||||
assert.Error(stream.Write(buf.New())).IsNil()
|
||||
b1 := buf.New()
|
||||
b1.AppendBytes('a')
|
||||
assert.Error(stream.Write(b1)).IsNil()
|
||||
|
||||
stream.Close()
|
||||
|
||||
|
||||
@@ -35,7 +35,7 @@ type Ray interface {
|
||||
|
||||
type InputStream interface {
|
||||
buf.Reader
|
||||
Close()
|
||||
ForceClose()
|
||||
}
|
||||
|
||||
type OutputStream interface {
|
||||
|
||||
Reference in New Issue
Block a user