Compare commits

..

26 Commits

Author SHA1 Message Date
Darien Raymond
e9d1e8d1d4 update license 2017-01-01 21:19:40 +01:00
Darien Raymond
58ae7e4967 refine proto settings 2017-01-01 21:19:12 +01:00
Darien Raymond
2a52ae9e96 default next protos 2016-12-31 23:22:26 +01:00
Darien Raymond
c74ca91bd3 use go 1.7.4 2016-12-30 23:38:29 +01:00
Darien Raymond
3cc0783d9c fix test break 2016-12-30 23:30:35 +01:00
Darien Raymond
a44d556667 test case for tls connection 2016-12-30 23:12:00 +01:00
Darien Raymond
f6aa7a0053 fix usage of ray stream. 2016-12-30 00:51:39 +01:00
Darien Raymond
609dbc1f13 task engine for all proxies 2016-12-30 00:32:20 +01:00
Darien Raymond
690d71b16e go style task engine 2016-12-29 22:17:12 +01:00
Darien Raymond
bdfb879963 comments 2016-12-29 01:05:16 +01:00
Darien Raymond
e6214b7a87 fix lint warnings 2016-12-29 00:58:00 +01:00
Darien Raymond
7bc98503a8 remove buggy CloseWrite() 2016-12-29 00:56:17 +01:00
Darien Raymond
5ff2b3453a task engine 2016-12-28 23:42:32 +01:00
Darien Raymond
7d31c0641b fix lint warnings 2016-12-28 00:58:53 +01:00
Darien Raymond
7c751fcca0 common.Must 2016-12-28 00:53:29 +01:00
Darien Raymond
22fa151391 comments 2016-12-27 23:09:08 +01:00
Darien Raymond
c347e50c28 rename cached to buffered 2016-12-27 21:41:44 +01:00
Darien Raymond
7d1426ff7f fix lint warnings 2016-12-27 21:34:14 +01:00
Darien Raymond
c68da6a0e8 unified release 2016-12-27 21:33:34 +01:00
Darien Raymond
5769df496b check for double release 2016-12-27 21:15:32 +01:00
Darien Raymond
538b0720d5 simplified Release() 2016-12-27 21:06:55 +01:00
Darien Raymond
ea33b7691b remove lock on bytes reader and writer 2016-12-27 20:53:29 +01:00
Darien Raymond
f195f15536 remove lock on buffered reader and writer 2016-12-27 01:01:47 +01:00
Darien Raymond
f7379bc1c3 skip empty payload in ray 2016-12-27 00:44:11 +01:00
Darien Raymond
38e4cad8d1 remove unused variable 2016-12-26 16:49:36 +01:00
Darien Raymond
29d62185cf notify read and write on remote close 2016-12-26 08:22:25 +01:00
54 changed files with 831 additions and 481 deletions

View File

@@ -1,6 +1,6 @@
language: go
go:
- 1.7.3
- 1.7.4
go_import_path: v2ray.com/core
git:
depth: 5

View File

@@ -1,6 +1,6 @@
The MIT License (MIT)
Copyright (c) 2015-2016 V2Ray
Copyright (c) 2015-2017 V2Ray
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal

View File

@@ -14,3 +14,6 @@ const (
type PacketDispatcher interface {
DispatchToOutbound(session *proxy.SessionInfo) ray.InboundRay
}
type Inspector interface {
}

View File

@@ -39,6 +39,7 @@ func (v *OutboundProxy) RegisterDialer() {
internet.ProxyDialer = v.Dial
}
// Dial implements internet.Dialer.
func (v *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOptions) (internet.Connection, error) {
handler := v.outboundManager.GetHandler(options.Proxy.Tag)
if handler == nil {
@@ -50,14 +51,15 @@ func (v *OutboundProxy) Dial(src v2net.Address, dest v2net.Destination, options
log.Info("Proxy: Dialing to ", dest)
stream := ray.NewRay()
go handler.Dispatch(dest, nil, stream)
return NewProxyConnection(src, dest, stream), nil
return NewConnection(src, dest, stream), nil
}
// Release implements common.Releasable.Release().
func (v *OutboundProxy) Release() {
}
type ProxyConnection struct {
type Connection struct {
stream ray.Ray
closed bool
localAddr net.Addr
@@ -67,8 +69,8 @@ type ProxyConnection struct {
writer *buf.BytesToBufferWriter
}
func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *ProxyConnection {
return &ProxyConnection{
func NewConnection(src v2net.Address, dest v2net.Destination, stream ray.Ray) *Connection {
return &Connection{
stream: stream,
localAddr: &net.TCPAddr{
IP: []byte{0, 0, 0, 0},
@@ -83,21 +85,24 @@ func NewProxyConnection(src v2net.Address, dest v2net.Destination, stream ray.Ra
}
}
func (v *ProxyConnection) Read(b []byte) (int, error) {
// Read implements net.Conn.Read().
func (v *Connection) Read(b []byte) (int, error) {
if v.closed {
return 0, io.EOF
}
return v.reader.Read(b)
}
func (v *ProxyConnection) Write(b []byte) (int, error) {
// Write implements net.Conn.Write().
func (v *Connection) Write(b []byte) (int, error) {
if v.closed {
return 0, io.ErrClosedPipe
}
return v.writer.Write(b)
}
func (v *ProxyConnection) Close() error {
// Close implements net.Conn.Close().
func (v *Connection) Close() error {
v.closed = true
v.stream.InboundInput().Close()
v.stream.InboundOutput().Release()
@@ -106,30 +111,32 @@ func (v *ProxyConnection) Close() error {
return nil
}
func (v *ProxyConnection) LocalAddr() net.Addr {
// LocalAddr implements net.Conn.LocalAddr().
func (v *Connection) LocalAddr() net.Addr {
return v.localAddr
}
func (v *ProxyConnection) RemoteAddr() net.Addr {
// RemoteAddr implements net.Conn.RemoteAddr().
func (v *Connection) RemoteAddr() net.Addr {
return v.remoteAddr
}
func (v *ProxyConnection) SetDeadline(t time.Time) error {
func (v *Connection) SetDeadline(t time.Time) error {
return nil
}
func (v *ProxyConnection) SetReadDeadline(t time.Time) error {
func (v *Connection) SetReadDeadline(t time.Time) error {
return nil
}
func (v *ProxyConnection) SetWriteDeadline(t time.Time) error {
func (v *Connection) SetWriteDeadline(t time.Time) error {
return nil
}
func (v *ProxyConnection) Reusable() bool {
func (v *Connection) Reusable() bool {
return false
}
func (v *ProxyConnection) SetReusable(bool) {
func (v *Connection) SetReusable(bool) {
}

View File

@@ -7,12 +7,14 @@ import (
. "v2ray.com/core/app/proxy"
"v2ray.com/core/app/proxyman"
"v2ray.com/core/app/proxyman/outbound"
"v2ray.com/core/common"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/proxy"
"v2ray.com/core/proxy/freedom"
"v2ray.com/core/testing/assert"
"v2ray.com/core/testing/servers/tcp"
"v2ray.com/core/transport/internet"
_ "v2ray.com/core/transport/internet/tcp"
)
func TestProxyDial(t *testing.T) {
@@ -20,12 +22,12 @@ func TestProxyDial(t *testing.T) {
space := app.NewSpace()
outboundManager := outbound.New()
outboundManager.SetHandler("tag", freedom.New(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{
common.Must(outboundManager.SetHandler("tag", freedom.New(&freedom.Config{}, space, &proxy.OutboundHandlerMeta{
Tag: "tag",
StreamSettings: &internet.StreamConfig{
Network: v2net.Network_RawTCP,
},
}))
})))
space.BindApp(proxyman.APP_ID_OUTBOUND_MANAGER, outboundManager)
proxy := NewOutboundProxy(space)
@@ -64,6 +66,6 @@ func TestProxyDial(t *testing.T) {
assert.Bytes(xor(b[:nBytes])).Equals([]byte{'a', 'b', 'c', 'd'})
conn.Close()
common.Must(conn.Close())
tcpServer.Close()
}

View File

@@ -29,6 +29,8 @@ func (b *Buffer) Release() {
}
b.v = nil
b.pool = nil
b.start = 0
b.end = 0
}
// Clear clears the content of the buffer, results an empty buffer with

View File

@@ -1,9 +1,7 @@
package buf
import (
"io"
"sync"
)
import "io"
import "v2ray.com/core/common"
// BytesToBufferReader is a Reader that adjusts its reading speed automatically.
type BytesToBufferReader struct {
@@ -48,11 +46,10 @@ func (v *BytesToBufferReader) Read() (*Buffer, error) {
// Release implements Releasable.Release().
func (v *BytesToBufferReader) Release() {
v.reader = nil
common.Release(v.reader)
}
type BufferToBytesReader struct {
sync.Mutex
stream Reader
current *Buffer
eof bool
@@ -74,8 +71,6 @@ func (v *BufferToBytesReader) Read(b []byte) (int, error) {
return 0, io.EOF
}
v.Lock()
defer v.Unlock()
if v.current == nil {
v.Fill()
if v.eof {
@@ -92,11 +87,7 @@ func (v *BufferToBytesReader) Read(b []byte) (int, error) {
// Release implements Releasable.Release().
func (v *BufferToBytesReader) Release() {
v.Lock()
defer v.Unlock()
v.eof = true
v.current.Release()
v.current = nil
v.stream = nil
}

View File

@@ -1,9 +1,7 @@
package buf
import (
"io"
"sync"
)
import "io"
import "v2ray.com/core/common"
// BufferToBytesWriter is a Writer that writes alloc.Buffer into underlying writer.
type BufferToBytesWriter struct {
@@ -28,21 +26,14 @@ func (v *BufferToBytesWriter) Write(buffer *Buffer) error {
// Release implements Releasable.Release().
func (v *BufferToBytesWriter) Release() {
v.writer = nil
common.Release(v.writer)
}
type BytesToBufferWriter struct {
sync.Mutex
writer Writer
}
func (v *BytesToBufferWriter) Write(payload []byte) (int, error) {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return 0, io.ErrClosedPipe
}
bytesWritten := 0
size := len(payload)
for size > 0 {
@@ -62,8 +53,5 @@ func (v *BytesToBufferWriter) Write(payload []byte) (int, error) {
// Release implements Releasable.Release()
func (v *BytesToBufferWriter) Release() {
v.Lock()
v.writer.Release()
v.writer = nil
v.Unlock()
}

View File

@@ -2,7 +2,6 @@ package bufio
import (
"io"
"sync"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
@@ -10,55 +9,44 @@ import (
// BufferedReader is a reader with internal cache.
type BufferedReader struct {
sync.Mutex
reader io.Reader
buffer *buf.Buffer
cached bool
reader io.Reader
buffer *buf.Buffer
buffered bool
}
// NewReader creates a new BufferedReader based on an io.Reader.
func NewReader(rawReader io.Reader) *BufferedReader {
return &BufferedReader{
reader: rawReader,
buffer: buf.New(),
cached: true,
reader: rawReader,
buffer: buf.New(),
buffered: true,
}
}
// Release implements Releasable.Release().
func (v *BufferedReader) Release() {
v.Lock()
defer v.Unlock()
v.buffer.Release()
v.buffer = nil
if releasable, ok := v.reader.(common.Releasable); ok {
releasable.Release()
if v.buffer != nil {
v.buffer.Release()
v.buffer = nil
}
v.reader = nil
common.Release(v.reader)
}
// Cached returns true if the internal cache is effective.
func (v *BufferedReader) Cached() bool {
return v.cached
// IsBuffered returns true if the internal cache is effective.
func (v *BufferedReader) IsBuffered() bool {
return v.buffered
}
// SetCached is to enable or disable internal cache. If cache is disabled,
// Read() and Write() calls will be delegated to the underlying io.Reader directly.
func (v *BufferedReader) SetCached(cached bool) {
v.cached = cached
// SetBuffered is to enable or disable internal cache. If cache is disabled,
// Read() calls will be delegated to the underlying io.Reader directly.
func (v *BufferedReader) SetBuffered(cached bool) {
v.buffered = cached
}
// Read implements io.Reader.Read().
func (v *BufferedReader) Read(b []byte) (int, error) {
v.Lock()
defer v.Unlock()
if v.reader == nil {
return 0, io.EOF
}
if !v.cached {
if !v.buffered || v.buffer == nil {
if !v.buffer.IsEmpty() {
return v.buffer.Read(b)
}

View File

@@ -18,7 +18,7 @@ func TestBufferedReader(t *testing.T) {
len := content.Len()
reader := NewReader(content)
assert.Bool(reader.Cached()).IsTrue()
assert.Bool(reader.IsBuffered()).IsTrue()
payload := make([]byte, 16)

View File

@@ -2,36 +2,31 @@ package bufio
import (
"io"
"sync"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
)
// BufferedWriter is an io.Writer with internal buffer. It writes to underlying writer when buffer is full or on demand.
// This type is not thread safe.
type BufferedWriter struct {
sync.Mutex
writer io.Writer
buffer *buf.Buffer
cached bool
writer io.Writer
buffer *buf.Buffer
buffered bool
}
// NewWriter creates a new BufferedWriter.
func NewWriter(rawWriter io.Writer) *BufferedWriter {
return &BufferedWriter{
writer: rawWriter,
buffer: buf.NewSmall(),
cached: true,
writer: rawWriter,
buffer: buf.NewSmall(),
buffered: true,
}
}
// ReadFrom implements io.ReaderFrom.ReadFrom().
func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return 0, io.ErrClosedPipe
}
totalBytes := int64(0)
for {
oriSize := v.buffer.Len()
@@ -43,19 +38,14 @@ func (v *BufferedWriter) ReadFrom(reader io.Reader) (int64, error) {
}
return totalBytes, err
}
v.FlushWithoutLock()
if err := v.Flush(); err != nil {
return totalBytes, err
}
}
}
func (v *BufferedWriter) Write(b []byte) (int, error) {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return 0, io.ErrClosedPipe
}
if !v.cached {
if !v.buffered || v.buffer == nil {
return v.writer.Write(b)
}
nBytes, err := v.buffer.Write(b)
@@ -63,7 +53,7 @@ func (v *BufferedWriter) Write(b []byte) (int, error) {
return 0, err
}
if v.buffer.IsFull() {
err := v.FlushWithoutLock()
err := v.Flush()
if err != nil {
return 0, err
}
@@ -76,18 +66,8 @@ func (v *BufferedWriter) Write(b []byte) (int, error) {
return len(b), nil
}
// Flush writes all buffered content into underlying writer, if any.
func (v *BufferedWriter) Flush() error {
v.Lock()
defer v.Unlock()
if v.writer == nil {
return io.ErrClosedPipe
}
return v.FlushWithoutLock()
}
func (v *BufferedWriter) FlushWithoutLock() error {
defer v.buffer.Clear()
for !v.buffer.IsEmpty() {
nBytes, err := v.writer.Write(v.buffer.Bytes())
@@ -99,28 +79,28 @@ func (v *BufferedWriter) FlushWithoutLock() error {
return nil
}
func (v *BufferedWriter) Cached() bool {
return v.cached
// IsBuffered returns true if this BufferedWriter holds a buffer.
func (v *BufferedWriter) IsBuffered() bool {
return v.buffered
}
func (v *BufferedWriter) SetCached(cached bool) {
v.cached = cached
// SetBuffered controls whether the BufferedWriter holds a buffer for writing. If not buffered, any write() calls into underlying writer directly.
func (v *BufferedWriter) SetBuffered(cached bool) {
v.buffered = cached
if !cached && !v.buffer.IsEmpty() {
v.Flush()
}
}
// Release implements common.Releasable.Release().
func (v *BufferedWriter) Release() {
v.Flush()
v.Lock()
defer v.Unlock()
v.buffer.Release()
v.buffer = nil
if releasable, ok := v.writer.(common.Releasable); ok {
releasable.Release()
if !v.buffer.IsEmpty() {
v.Flush()
}
v.writer = nil
if v.buffer != nil {
v.buffer.Release()
v.buffer = nil
}
common.Release(v.writer)
}

View File

@@ -15,7 +15,7 @@ func TestBufferedWriter(t *testing.T) {
content := buf.New()
writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue()
assert.Bool(writer.IsBuffered()).IsTrue()
payload := make([]byte, 16)
@@ -25,7 +25,7 @@ func TestBufferedWriter(t *testing.T) {
assert.Bool(content.IsEmpty()).IsTrue()
writer.SetCached(false)
writer.SetBuffered(false)
assert.Int(content.Len()).Equals(16)
}
@@ -35,7 +35,7 @@ func TestBufferedWriterLargePayload(t *testing.T) {
content := buf.NewLocal(128 * 1024)
writer := NewWriter(content)
assert.Bool(writer.Cached()).IsTrue()
assert.Bool(writer.IsBuffered()).IsTrue()
payload := make([]byte, 64*1024)
rand.Read(payload)

View File

@@ -18,3 +18,17 @@ type Releasable interface {
// Release releases all references to accelerate garbage collection.
Release()
}
// Release tries to release the given object.
func Release(v interface{}) {
if releasable, ok := v.(Releasable); ok {
releasable.Release()
}
}
// Must panics if err is not nil.
func Must(err error) {
if err != nil {
panic(err)
}
}

View File

@@ -5,6 +5,7 @@ import (
"errors"
"io"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/serial"
)
@@ -135,9 +136,9 @@ func (v *AuthenticationReader) EnsureChunk() error {
v.buffer.Clear()
} else {
leftover := v.buffer.Bytes()
v.buffer.Reset(func(b []byte) (int, error) {
common.Must(v.buffer.Reset(func(b []byte) (int, error) {
return copy(b, leftover), nil
})
}))
}
err = v.buffer.AppendSupplier(buf.ReadFrom(v.reader))
if err == nil {

View File

@@ -1,3 +1,2 @@
// Provides common crypto libraries for V2Ray.
// Package crypto provides common crypto libraries for V2Ray.
package crypto

View File

@@ -20,9 +20,6 @@ func NewCryptionReader(stream cipher.Stream, reader io.Reader) *CryptionReader {
}
func (v *CryptionReader) Read(data []byte) (int, error) {
if v.reader == nil {
return 0, common.ErrObjectReleased
}
nBytes, err := v.reader.Read(data)
if nBytes > 0 {
v.stream.XORKeyStream(data[:nBytes], data[:nBytes])
@@ -31,8 +28,8 @@ func (v *CryptionReader) Read(data []byte) (int, error) {
}
func (v *CryptionReader) Release() {
v.reader = nil
v.stream = nil
common.Release(v.reader)
common.Release(v.stream)
}
type CryptionWriter struct {
@@ -40,6 +37,7 @@ type CryptionWriter struct {
writer io.Writer
}
// NewCryptionWriter creates a new CryptionWriter.
func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
return &CryptionWriter{
stream: stream,
@@ -47,15 +45,14 @@ func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
}
}
// Write implements io.Writer.Write().
func (v *CryptionWriter) Write(data []byte) (int, error) {
if v.writer == nil {
return 0, common.ErrObjectReleased
}
v.stream.XORKeyStream(data, data)
return v.writer.Write(data)
}
// Release implements common.Releasable.Release().
func (v *CryptionWriter) Release() {
v.writer = nil
v.stream = nil
common.Release(v.writer)
common.Release(v.stream)
}

View File

@@ -19,10 +19,9 @@ type ErrorLog struct {
}
func (v *ErrorLog) Release() {
for index := range v.Values {
v.Values[index] = nil
for _, val := range v.Values {
common.Release(val)
}
v.Values = nil
}
func (v *ErrorLog) String() string {
@@ -37,9 +36,9 @@ type AccessLog struct {
}
func (v *AccessLog) Release() {
v.From = nil
v.To = nil
v.Reason = nil
common.Release(v.From)
common.Release(v.To)
common.Release(v.Reason)
}
func (v *AccessLog) String() string {

View File

@@ -4,6 +4,8 @@ import (
"io"
"net"
"time"
"v2ray.com/core/common"
)
var (
@@ -51,8 +53,8 @@ func (reader *TimeOutReader) SetTimeOut(value uint32) {
}
func (reader *TimeOutReader) Release() {
reader.connection = nil
reader.worker = nil
common.Release(reader.connection)
common.Release(reader.worker)
}
type timedReaderWorker struct {

View File

@@ -23,11 +23,11 @@ func GetMessageType(message proto.Message) string {
}
func GetInstance(messageType string) (interface{}, error) {
mType := proto.MessageType(messageType).Elem()
if mType == nil {
mType := proto.MessageType(messageType)
if mType == nil || mType.Elem() == nil {
return nil, errors.New("Unknown type: " + messageType)
}
return reflect.New(mType).Interface(), nil
return reflect.New(mType.Elem()).Interface(), nil
}
func (v *TypedMessage) GetInstance() (interface{}, error) {

30
common/signal/exec.go Normal file
View File

@@ -0,0 +1,30 @@
package signal
func executeAndFulfill(f func() error, done chan<- error) {
err := f()
if err != nil {
done <- err
}
close(done)
}
func ExecuteAsync(f func() error) <-chan error {
done := make(chan error, 1)
go executeAndFulfill(f, done)
return done
}
func ErrorOrFinish2(c1, c2 <-chan error) error {
select {
case err, failed := <-c1:
if failed {
return err
}
return <-c2
case err, failed := <-c2:
if failed {
return err
}
return <-c1
}
}

View File

@@ -0,0 +1,87 @@
package signal_test
import (
"errors"
"testing"
. "v2ray.com/core/common/signal"
"v2ray.com/core/testing/assert"
)
func TestErrorOrFinish2_Error(t *testing.T) {
assert := assert.On(t)
c1 := make(chan error, 1)
c2 := make(chan error, 2)
c := make(chan error, 1)
go func() {
c <- ErrorOrFinish2(c1, c2)
}()
c1 <- errors.New("test")
err := <-c
assert.String(err.Error()).Equals("test")
}
func TestErrorOrFinish2_Error2(t *testing.T) {
assert := assert.On(t)
c1 := make(chan error, 1)
c2 := make(chan error, 2)
c := make(chan error, 1)
go func() {
c <- ErrorOrFinish2(c1, c2)
}()
c2 <- errors.New("test")
err := <-c
assert.String(err.Error()).Equals("test")
}
func TestErrorOrFinish2_NoneError(t *testing.T) {
assert := assert.On(t)
c1 := make(chan error, 1)
c2 := make(chan error, 2)
c := make(chan error, 1)
go func() {
c <- ErrorOrFinish2(c1, c2)
}()
close(c1)
select {
case <-c:
t.Fail()
default:
}
close(c2)
err := <-c
assert.Error(err).IsNil()
}
func TestErrorOrFinish2_NoneError2(t *testing.T) {
assert := assert.On(t)
c1 := make(chan error, 1)
c2 := make(chan error, 2)
c := make(chan error, 1)
go func() {
c <- ErrorOrFinish2(c1, c2)
}()
close(c2)
select {
case <-c:
t.Fail()
default:
}
close(c1)
err := <-c
assert.Error(err).IsNil()
}

View File

@@ -1,11 +1,12 @@
package blackhole
import (
"v2ray.com/core/common"
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy"
)
func init() {
// Must listed after config.pb.go
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
}

View File

@@ -5,11 +5,13 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp"
@@ -165,37 +167,42 @@ func (v *DokodemoDoor) HandleTCPConnection(conn internet.Connection) {
Destination: dest,
Inbound: v.meta,
})
defer ray.InboundOutput().Release()
var wg sync.WaitGroup
output := ray.InboundOutput()
defer output.ForceClose()
reader := v2net.NewTimeOutReader(v.config.Timeout, conn)
defer reader.Release()
wg.Add(1)
go func() {
requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close()
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
log.Info("Dokodemo: Failed to transport all TCP request: ", err)
return err
}
wg.Done()
ray.InboundInput().Close()
}()
wg.Add(1)
go func() {
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer output.ForceClose()
v2writer := buf.NewWriter(conn)
defer v2writer.Release()
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
log.Info("Dokodemo: Failed to transport all TCP response: ", err)
return err
}
wg.Done()
}()
return nil
})
wg.Wait()
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Dokodemo: Connection ends with ", err)
}
}
type Factory struct{}
@@ -211,5 +218,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Inb
}
func init() {
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
}

View File

@@ -18,6 +18,7 @@ import (
"v2ray.com/core/testing/servers/tcp"
"v2ray.com/core/testing/servers/udp"
"v2ray.com/core/transport/internet"
_ "v2ray.com/core/transport/internet/tcp"
)
func TestDokodemoTCP(t *testing.T) {

View File

@@ -5,6 +5,7 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/dns"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/dice"
"v2ray.com/core/common/errors"
@@ -12,9 +13,9 @@ import (
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/retry"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tcp"
"v2ray.com/core/transport/ray"
)
@@ -70,8 +71,10 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
log.Info("Freedom: Opening connection to ", destination)
defer payload.Release()
defer ray.OutboundInput().Release()
defer ray.OutboundOutput().Close()
input := ray.OutboundInput()
output := ray.OutboundOutput()
defer input.ForceClose()
defer output.Close()
var conn internet.Connection
if v.domainStrategy == Config_USE_IP && destination.Address.Family().IsDomain() {
@@ -91,24 +94,24 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
}
defer conn.Close()
input := ray.OutboundInput()
output := ray.OutboundOutput()
if !payload.IsEmpty() {
conn.Write(payload.Bytes())
if _, err := conn.Write(payload.Bytes()); err != nil {
log.Warning("Freedom: Failed to write to destination: ", destination, ": ", err)
return
}
}
go func() {
requestDone := signal.ExecuteAsync(func() error {
defer input.ForceClose()
v2writer := buf.NewWriter(conn)
defer v2writer.Release()
if err := buf.PipeUntilEOF(input, v2writer); err != nil {
log.Info("Freedom: Failed to transport all TCP request: ", err)
return err
}
if tcpConn, ok := conn.(*tcp.RawConnection); ok {
tcpConn.CloseWrite()
}
}()
return nil
})
var reader io.Reader = conn
@@ -120,12 +123,21 @@ func (v *Handler) Dispatch(destination v2net.Destination, payload *buf.Buffer, r
reader = v2net.NewTimeOutReader(timeout /* seconds */, conn)
}
v2reader := buf.NewReader(reader)
if err := buf.PipeUntilEOF(v2reader, output); err != nil {
log.Info("Freedom: Failed to transport all TCP response: ", err)
responseDone := signal.ExecuteAsync(func() error {
defer output.Close()
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := buf.PipeUntilEOF(v2reader, output); err != nil {
return err
}
return nil
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Freedom: Connection ending with ", err)
}
v2reader.Release()
ray.OutboundOutput().Close()
}
type Factory struct{}
@@ -141,5 +153,5 @@ func (v *Factory) Create(space app.Space, config interface{}, meta *proxy.Outbou
}
func init() {
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
}

View File

@@ -1,6 +1,7 @@
package freedom_test
import (
"fmt"
"testing"
"v2ray.com/core/app"
@@ -18,6 +19,7 @@ import (
"v2ray.com/core/testing/assert"
"v2ray.com/core/testing/servers/tcp"
"v2ray.com/core/transport/internet"
_ "v2ray.com/core/transport/internet/tcp"
"v2ray.com/core/transport/ray"
)
@@ -32,7 +34,7 @@ func TestSinglePacket(t *testing.T) {
return buffer
},
}
_, err := tcpServer.Start()
tcpServerAddr, err := tcpServer.Start()
assert.Error(err).IsNil()
space := app.NewSpace()
@@ -52,7 +54,8 @@ func TestSinglePacket(t *testing.T) {
payload := buf.NewLocal(2048)
payload.Append([]byte(data2Send))
go freedom.Dispatch(v2net.TCPDestination(v2net.LocalHostIP, tcpServer.Port), payload, traffic)
fmt.Println(tcpServerAddr.Network, tcpServerAddr.Address, tcpServerAddr.Port)
go freedom.Dispatch(tcpServerAddr, payload, traffic)
traffic.InboundInput().Close()
respPayload, err := traffic.InboundOutput().Read()

View File

@@ -4,6 +4,7 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/common"
"v2ray.com/core/common/errors"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/transport/internet"
)
@@ -20,12 +21,6 @@ func RegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) e
return nil
}
func MustRegisterInboundHandlerCreator(name string, creator InboundHandlerFactory) {
if err := RegisterInboundHandlerCreator(name, creator); err != nil {
panic(err)
}
}
func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) error {
if _, found := outboundFactories[name]; found {
return common.ErrDuplicatedName
@@ -34,12 +29,6 @@ func RegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory)
return nil
}
func MustRegisterOutboundHandlerCreator(name string, creator OutboundHandlerFactory) {
if err := RegisterOutboundHandlerCreator(name, creator); err != nil {
panic(err)
}
}
func CreateInboundHandler(name string, space app.Space, config interface{}, meta *InboundHandlerMeta) (InboundHandler, error) {
creator, found := inboundFactories[name]
if !found {
@@ -49,6 +38,8 @@ func CreateInboundHandler(name string, space app.Space, config interface{}, meta
meta.StreamSettings = &internet.StreamConfig{
Network: creator.StreamCapability().Get(0),
}
} else if meta.StreamSettings.Network == v2net.Network_Unknown {
meta.StreamSettings.Network = creator.StreamCapability().Get(0)
} else {
if !creator.StreamCapability().HasNetwork(meta.StreamSettings.Network) {
return nil, errors.New("Proxy: Invalid network: " + meta.StreamSettings.Network.String())
@@ -67,6 +58,8 @@ func CreateOutboundHandler(name string, space app.Space, config interface{}, met
meta.StreamSettings = &internet.StreamConfig{
Network: creator.StreamCapability().Get(0),
}
} else if meta.StreamSettings.Network == v2net.Network_Unknown {
meta.StreamSettings.Network = creator.StreamCapability().Get(0)
} else {
if !creator.StreamCapability().HasNetwork(meta.StreamSettings.Network) {
return nil, errors.New("Proxy: Invalid network: " + meta.StreamSettings.Network.String())

View File

@@ -17,9 +17,9 @@ import (
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/ray"
)
// Server is a HTTP proxy server.
@@ -155,35 +155,32 @@ func (v *Server) handleConnect(request *http.Request, session *proxy.SessionInfo
}
ray := v.packetDispatcher.DispatchToOutbound(session)
v.transport(reader, writer, ray)
}
func (v *Server) transport(input io.Reader, output io.Writer, ray ray.InboundRay) {
var wg sync.WaitGroup
wg.Add(2)
defer wg.Wait()
requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close()
go func() {
v2reader := buf.NewReader(input)
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := buf.PipeUntilEOF(v2reader, ray.InboundInput()); err != nil {
log.Info("HTTP: Failed to transport all TCP request: ", err)
return err
}
ray.InboundInput().Close()
wg.Done()
}()
return nil
})
go func() {
v2writer := buf.NewWriter(output)
responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().ForceClose()
v2writer := buf.NewWriter(writer)
defer v2writer.Release()
if err := buf.PipeUntilEOF(ray.InboundOutput(), v2writer); err != nil {
log.Info("HTTP: Failed to transport all TCP response: ", err)
return err
}
ray.InboundOutput().Release()
wg.Done()
}()
return nil
})
signal.ErrorOrFinish2(requestDone, responseDone)
}
// @VisibleForTesting
@@ -223,7 +220,7 @@ func (v *Server) GenerateResponse(statusCode int, status string) *http.Response
Header: hdr,
Body: nil,
ContentLength: 0,
Close: false,
Close: true,
}
}
@@ -239,25 +236,31 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
StripHopByHopHeaders(request)
ray := v.packetDispatcher.DispatchToOutbound(session)
defer ray.InboundInput().Close()
defer ray.InboundOutput().Release()
input := ray.InboundInput()
output := ray.InboundOutput()
defer input.Close()
defer output.ForceClose()
requestDone := signal.ExecuteAsync(func() error {
defer input.Close()
var finish sync.WaitGroup
finish.Add(1)
go func() {
defer finish.Done()
requestWriter := bufio.NewWriter(buf.NewBytesWriter(ray.InboundInput()))
defer requestWriter.Release()
err := request.Write(requestWriter)
if err != nil {
log.Warning("HTTP: Failed to write request: ", err)
return
return err
}
requestWriter.Flush()
}()
if err := requestWriter.Flush(); err != nil {
return err
}
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer output.ForceClose()
finish.Add(1)
go func() {
defer finish.Done()
responseReader := bufio.OriginalReader(buf.NewBytesReader(ray.InboundOutput()))
response, err := http.ReadResponse(responseReader, request)
if err != nil {
@@ -265,14 +268,19 @@ func (v *Server) handlePlainHTTP(request *http.Request, session *proxy.SessionIn
response = v.GenerateResponse(503, "Service Unavailable")
}
responseWriter := bufio.NewWriter(writer)
err = response.Write(responseWriter)
if err != nil {
log.Warning("HTTP: Failed to write response: ", err)
return
if err := response.Write(responseWriter); err != nil {
return err
}
responseWriter.Flush()
}()
finish.Wait()
if err := responseWriter.Flush(); err != nil {
return err
}
return nil
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("HTTP|Server: Connecton ending with ", err)
}
}
// ServerFactory is a InboundHandlerFactory.
@@ -297,5 +305,5 @@ func (v *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *pro
}
func init() {
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
}

View File

@@ -1,8 +1,6 @@
package shadowsocks
import (
"sync"
"v2ray.com/core/app"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
@@ -10,6 +8,7 @@ import (
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/retry"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/ray"
@@ -38,8 +37,6 @@ func NewClient(config *ClientConfig, space app.Space, meta *proxy.OutboundHandle
// Dispatch implements OutboundHandler.Dispatch().
func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
defer payload.Release()
defer ray.OutboundInput().Release()
defer ray.OutboundOutput().Close()
network := destination.Network
@@ -109,48 +106,38 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
}
}
var responseMutex sync.Mutex
responseMutex.Lock()
bufferedWriter.SetBuffered(false)
go func() {
defer responseMutex.Unlock()
requestDone := signal.ExecuteAsync(func() error {
defer ray.OutboundInput().ForceClose()
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
return err
}
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer ray.OutboundOutput().Close()
responseReader, err := ReadTCPResponse(user, conn)
if err != nil {
log.Warning("Shadowsocks|Client: Failed to read response: ", err)
return
return err
}
if err := buf.PipeUntilEOF(responseReader, ray.OutboundOutput()); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all TCP response: ", err)
return err
}
}()
bufferedWriter.SetCached(false)
if err := buf.PipeUntilEOF(ray.OutboundInput(), bodyWriter); err != nil {
log.Info("Shadowsocks|Client: Failed to trasnport all TCP request: ", err)
return nil
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Shadowsocks|Client: Connection ends with ", err)
}
responseMutex.Lock()
}
if request.Command == protocol.RequestCommandUDP {
timedReader := v2net.NewTimeOutReader(16, conn)
var responseMutex sync.Mutex
responseMutex.Lock()
go func() {
defer responseMutex.Unlock()
reader := &UDPReader{
Reader: timedReader,
User: user,
}
if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
}
}()
writer := &UDPWriter{
Writer: conn,
@@ -162,11 +149,35 @@ func (v *Client) Dispatch(destination v2net.Destination, payload *buf.Buffer, ra
return
}
}
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
}
responseMutex.Lock()
requestDone := signal.ExecuteAsync(func() error {
defer ray.OutboundInput().ForceClose()
if err := buf.PipeUntilEOF(ray.OutboundInput(), writer); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP request: ", err)
return err
}
return nil
})
timedReader := v2net.NewTimeOutReader(16, conn)
responseDone := signal.ExecuteAsync(func() error {
defer ray.OutboundOutput().Close()
reader := &UDPReader{
Reader: timedReader,
User: user,
}
if err := buf.PipeUntilEOF(reader, ray.OutboundOutput()); err != nil {
log.Info("Shadowsocks|Client: Failed to transport all UDP response: ", err)
return err
}
return nil
})
signal.ErrorOrFinish2(requestDone, responseDone)
}
}

View File

@@ -1,12 +1,13 @@
package shadowsocks
import (
"v2ray.com/core/common"
"v2ray.com/core/common/serial"
"v2ray.com/core/proxy"
)
func init() {
// Must happen after config is initialized
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(ClientConfig)), new(ClientFactory))
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(ClientConfig)), new(ClientFactory)))
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
}

View File

@@ -6,6 +6,7 @@ import (
"crypto/sha1"
"io"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/serial"
@@ -70,8 +71,8 @@ func NewChunkReader(reader io.Reader, auth *Authenticator) *ChunkReader {
}
func (v *ChunkReader) Release() {
v.reader = nil
v.auth = nil
common.Release(v.reader)
common.Release(v.auth)
}
func (v *ChunkReader) Read() (*buf.Buffer, error) {
@@ -124,8 +125,8 @@ func NewChunkWriter(writer io.Writer, auth *Authenticator) *ChunkWriter {
}
func (v *ChunkWriter) Release() {
v.writer = nil
v.auth = nil
common.Release(v.writer)
common.Release(v.auth)
}
func (v *ChunkWriter) Write(payload *buf.Buffer) error {

View File

@@ -1,8 +1,6 @@
package shadowsocks
import (
"sync"
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common"
@@ -12,6 +10,7 @@ import (
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/udp"
@@ -161,7 +160,7 @@ func (v *Server) handleConnection(conn internet.Connection) {
}
defer bodyReader.Release()
bufferedReader.SetCached(false)
bufferedReader.SetBuffered(false)
userSettings := v.user.GetSettings()
timedReader.SetTimeOut(userSettings.PayloadReadTimeout)
@@ -176,12 +175,11 @@ func (v *Server) handleConnection(conn internet.Connection) {
User: request.User,
Inbound: v.meta,
})
defer ray.InboundOutput().Release()
defer ray.InboundOutput().ForceClose()
defer ray.InboundInput().Close()
var writeFinish sync.Mutex
writeFinish.Lock()
go func() {
defer writeFinish.Unlock()
requestDone := signal.ExecuteAsync(func() error {
defer ray.InboundOutput().ForceClose()
bufferedWriter := bufio.NewWriter(conn)
defer bufferedWriter.Release()
@@ -189,26 +187,38 @@ func (v *Server) handleConnection(conn internet.Connection) {
responseWriter, err := WriteTCPResponse(request, bufferedWriter)
if err != nil {
log.Warning("Shadowsocks|Server: Failed to write response: ", err)
return
return err
}
defer responseWriter.Release()
if payload, err := ray.InboundOutput().Read(); err == nil {
responseWriter.Write(payload)
bufferedWriter.SetCached(false)
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
}
payload, err := ray.InboundOutput().Read()
if err != nil {
return err
}
}()
responseWriter.Write(payload)
bufferedWriter.SetBuffered(false)
if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
if err := buf.PipeUntilEOF(ray.InboundOutput(), responseWriter); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP response: ", err)
return err
}
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer ray.InboundInput().Close()
if err := buf.PipeUntilEOF(bodyReader, ray.InboundInput()); err != nil {
log.Info("Shadowsocks|Server: Failed to transport all TCP request: ", err)
return err
}
return nil
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Shadowsocks|Server: Connection ends with ", err)
}
ray.InboundInput().Close()
writeFinish.Lock()
}
type ServerFactory struct{}

View File

@@ -7,6 +7,7 @@ import (
"v2ray.com/core/app"
"v2ray.com/core/app/dispatcher"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/crypto"
@@ -14,6 +15,7 @@ import (
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/proxy/socks/protocol"
"v2ray.com/core/transport/internet"
@@ -216,8 +218,8 @@ func (v *Server) handleSocks5(clientAddr v2net.Destination, reader *bufio.Buffer
return err
}
reader.SetCached(false)
writer.SetCached(false)
reader.SetBuffered(false)
writer.SetBuffered(false)
dest := request.Destination()
session := &proxy.SessionInfo{
@@ -279,8 +281,8 @@ func (v *Server) handleSocks4(clientAddr v2net.Destination, reader *bufio.Buffer
return ErrUnsupportedSocksCommand
}
reader.SetCached(false)
writer.SetCached(false)
reader.SetBuffered(false)
writer.SetBuffered(false)
dest := v2net.TCPDestination(v2net.IPAddress(auth.IP[:]), auth.Port)
session := &proxy.SessionInfo{
@@ -298,26 +300,36 @@ func (v *Server) transport(reader io.Reader, writer io.Writer, session *proxy.Se
input := ray.InboundInput()
output := ray.InboundOutput()
defer input.Close()
defer output.Release()
requestDone := signal.ExecuteAsync(func() error {
defer input.Close()
go func() {
v2reader := buf.NewReader(reader)
defer v2reader.Release()
if err := buf.PipeUntilEOF(v2reader, input); err != nil {
log.Info("Socks|Server: Failed to transport all TCP request: ", err)
return err
}
input.Close()
}()
return nil
})
v2writer := buf.NewWriter(writer)
defer v2writer.Release()
responseDone := signal.ExecuteAsync(func() error {
defer output.ForceClose()
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
log.Info("Socks|Server: Failed to transport all TCP response: ", err)
v2writer := buf.NewWriter(writer)
defer v2writer.Release()
if err := buf.PipeUntilEOF(output, v2writer); err != nil {
log.Info("Socks|Server: Failed to transport all TCP response: ", err)
return err
}
return nil
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("Socks|Server: Connection ends with ", err)
}
output.Release()
}
type ServerFactory struct{}
@@ -333,5 +345,5 @@ func (v *ServerFactory) Create(space app.Space, rawConfig interface{}, meta *pro
}
func init() {
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory))
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(ServerConfig)), new(ServerFactory)))
}

View File

@@ -35,10 +35,10 @@ func (v *Account) AsAccount() (protocol.Account, error) {
log.Error("VMess: Failed to parse ID: ", err)
return nil, err
}
protoId := protocol.NewID(id)
protoID := protocol.NewID(id)
return &InternalAccount{
ID: protoId,
AlterIDs: protocol.NewAlterIDs(protoId, uint16(v.AlterId)),
ID: protoID,
AlterIDs: protocol.NewAlterIDs(protoID, uint16(v.AlterId)),
Security: v.SecuritySettings.AsSecurity(),
}, nil
}

View File

@@ -15,11 +15,13 @@ import (
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/common/uuid"
"v2ray.com/core/proxy"
"v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/ray"
)
type userByEmail struct {
@@ -128,6 +130,48 @@ func (v *VMessInboundHandler) Start() error {
return nil
}
func transferRequest(session *encoding.ServerSession, request *protocol.RequestHeader, input io.Reader, output ray.OutputStream) error {
defer output.Close()
bodyReader := session.DecodeRequestBody(request, input)
defer bodyReader.Release()
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
return err
}
return nil
}
func transferResponse(session *encoding.ServerSession, request *protocol.RequestHeader, response *protocol.ResponseHeader, input ray.InputStream, output io.Writer) error {
defer input.ForceClose()
session.EncodeResponseHeader(response, output)
bodyWriter := session.EncodeResponseBody(request, output)
// Optimize for small response packet
if data, err := input.Read(); err == nil {
if err := bodyWriter.Write(data); err != nil {
return err
}
if bufferedWriter, ok := output.(*bufio.BufferedWriter); ok {
bufferedWriter.SetBuffered(false)
}
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
return err
}
}
if request.Option.Has(protocol.RequestOptionChunkStream) {
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
return err
}
}
return nil
}
func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
defer connection.Close()
@@ -155,13 +199,13 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
if err != nil {
if errors.Cause(err) != io.EOF {
log.Access(connection.RemoteAddr(), "", log.AccessRejected, err)
log.Info("VMessIn: Invalid request from ", connection.RemoteAddr(), ": ", err)
log.Info("VMess|Inbound: Invalid request from ", connection.RemoteAddr(), ": ", err)
}
connection.SetReusable(false)
return
}
log.Access(connection.RemoteAddr(), request.Destination(), log.AccessAccepted, "")
log.Info("VMessIn: Received request for ", request.Destination())
log.Info("VMess|Inbound: Received request for ", request.Destination())
connection.SetReusable(request.Option.Has(protocol.RequestOptionConnectionReuse))
@@ -174,26 +218,15 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
input := ray.InboundInput()
output := ray.InboundOutput()
defer input.Close()
defer output.Release()
var readFinish sync.Mutex
readFinish.Lock()
defer output.ForceClose()
userSettings := request.User.GetSettings()
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetCached(false)
reader.SetBuffered(false)
go func() {
bodyReader := session.DecodeRequestBody(request, reader)
if err := buf.PipeUntilEOF(bodyReader, input); err != nil {
log.Debug("VMess|Inbound: Error when sending data to outbound: ", err)
connection.SetReusable(false)
}
bodyReader.Release()
input.Close()
readFinish.Unlock()
}()
requestDone := signal.ExecuteAsync(func() error {
return transferRequest(session, request, reader, input)
})
writer := bufio.NewWriter(connection)
defer writer.Release()
@@ -206,34 +239,21 @@ func (v *VMessInboundHandler) HandleConnection(connection internet.Connection) {
response.Option.Set(protocol.ResponseOptionConnectionReuse)
}
session.EncodeResponseHeader(response, writer)
bodyWriter := session.EncodeResponseBody(request, writer)
// Optimize for small response packet
if data, err := output.Read(); err == nil {
if err := bodyWriter.Write(data); err != nil {
connection.SetReusable(false)
}
writer.SetCached(false)
if err := buf.PipeUntilEOF(output, bodyWriter); err != nil {
log.Debug("VMess|Inbound: Error when sending data to downstream: ", err)
connection.SetReusable(false)
}
responseDone := signal.ExecuteAsync(func() error {
return transferResponse(session, request, response, output, writer)
})
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("VMess|Inbound: Connection ending with ", err)
connection.SetReusable(false)
return
}
output.Release()
if request.Option.Has(protocol.RequestOptionChunkStream) {
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
connection.SetReusable(false)
}
}
writer.Flush()
bodyWriter.Release()
readFinish.Lock()
if err := writer.Flush(); err != nil {
log.Info("VMess|Inbound: Failed to flush remain data: ", err)
connection.SetReusable(false)
return
}
}
type Factory struct{}
@@ -271,5 +291,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Inb
}
func init() {
proxy.MustRegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
common.Must(proxy.RegisterInboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
}

View File

@@ -1,9 +1,8 @@
package outbound
import (
"sync"
"v2ray.com/core/app"
"v2ray.com/core/common"
"v2ray.com/core/common/buf"
"v2ray.com/core/common/bufio"
"v2ray.com/core/common/log"
@@ -11,6 +10,7 @@ import (
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/retry"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/signal"
"v2ray.com/core/proxy"
"v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/encoding"
@@ -27,7 +27,8 @@ type VMessOutboundHandler struct {
// Dispatch implements OutboundHandler.Dispatch().
func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.Buffer, ray ray.OutboundRay) {
defer ray.OutboundInput().Release()
defer payload.Release()
defer ray.OutboundInput().ForceClose()
defer ray.OutboundOutput().Close()
var rec *protocol.ServerSpec
@@ -79,73 +80,65 @@ func (v *VMessOutboundHandler) Dispatch(target v2net.Destination, payload *buf.B
input := ray.OutboundInput()
output := ray.OutboundOutput()
var requestFinish, responseFinish sync.Mutex
requestFinish.Lock()
responseFinish.Lock()
session := encoding.NewClientSession(protocol.DefaultIDHash)
go v.handleRequest(session, conn, request, payload, input, &requestFinish)
go v.handleResponse(session, conn, request, rec.Destination(), output, &responseFinish)
requestDone := signal.ExecuteAsync(func() error {
defer input.ForceClose()
requestFinish.Lock()
responseFinish.Lock()
return
}
writer := bufio.NewWriter(conn)
defer writer.Release()
func (v *VMessOutboundHandler) handleRequest(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, payload *buf.Buffer, input buf.Reader, finish *sync.Mutex) {
defer finish.Unlock()
session.EncodeRequestHeader(request, writer)
writer := bufio.NewWriter(conn)
defer writer.Release()
session.EncodeRequestHeader(request, writer)
bodyWriter := session.EncodeRequestBody(request, writer)
defer bodyWriter.Release()
bodyWriter := session.EncodeRequestBody(request, writer)
defer bodyWriter.Release()
if !payload.IsEmpty() {
if err := bodyWriter.Write(payload); err != nil {
log.Info("VMess|Outbound: Failed to write payload. Disabling connection reuse.", err)
conn.SetReusable(false)
if !payload.IsEmpty() {
if err := bodyWriter.Write(payload); err != nil {
return err
}
}
payload.Release()
}
writer.SetCached(false)
writer.SetBuffered(false)
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
conn.SetReusable(false)
}
if err := buf.PipeUntilEOF(input, bodyWriter); err != nil {
return err
}
if request.Option.Has(protocol.RequestOptionChunkStream) {
err := bodyWriter.Write(buf.NewLocal(8))
if request.Option.Has(protocol.RequestOptionChunkStream) {
if err := bodyWriter.Write(buf.NewLocal(8)); err != nil {
return err
}
}
return nil
})
responseDone := signal.ExecuteAsync(func() error {
defer output.Close()
reader := bufio.NewReader(conn)
defer reader.Release()
header, err := session.DecodeResponseHeader(reader)
if err != nil {
conn.SetReusable(false)
return err
}
}
return
}
v.handleCommand(rec.Destination(), header.Command)
func (v *VMessOutboundHandler) handleResponse(session *encoding.ClientSession, conn internet.Connection, request *protocol.RequestHeader, dest v2net.Destination, output buf.Writer, finish *sync.Mutex) {
defer finish.Unlock()
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
reader := bufio.NewReader(conn)
defer reader.Release()
reader.SetBuffered(false)
bodyReader := session.DecodeResponseBody(request, reader)
defer bodyReader.Release()
header, err := session.DecodeResponseHeader(reader)
if err != nil {
conn.SetReusable(false)
log.Warning("VMess|Outbound: Failed to read response from ", request.Destination(), ": ", err)
return
}
v.handleCommand(dest, header.Command)
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
return err
}
conn.SetReusable(header.Option.Has(protocol.ResponseOptionConnectionReuse))
return nil
})
reader.SetCached(false)
bodyReader := session.DecodeResponseBody(request, reader)
defer bodyReader.Release()
if err := buf.PipeUntilEOF(bodyReader, output); err != nil {
if err := signal.ErrorOrFinish2(requestDone, responseDone); err != nil {
log.Info("VMess|Outbound: Connection ending with ", err)
conn.SetReusable(false)
}
@@ -178,5 +171,5 @@ func (v *Factory) Create(space app.Space, rawConfig interface{}, meta *proxy.Out
}
func init() {
proxy.MustRegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory))
common.Must(proxy.RegisterOutboundHandlerCreator(serial.GetMessageType(new(Config)), new(Factory)))
}

View File

@@ -69,11 +69,6 @@ func (v *TimedUserValidator) Release() {
}
v.running = false
v.validUsers = nil
v.userHash = nil
v.ids = nil
v.hasher = nil
v.cancel = nil
}
func (v *TimedUserValidator) generateNewHashes(nowSec protocol.Timestamp, idx int, entry *idEntry) {
@@ -89,10 +84,8 @@ func (v *TimedUserValidator) generateNewHashes(nowSec protocol.Timestamp, idx in
idHash.Sum(hashValueRemoval[:0])
idHash.Reset()
v.Lock()
v.userHash[hashValue] = &indexTimePair{idx, entry.lastSec}
delete(v.userHash, hashValueRemoval)
v.Unlock()
entry.lastSec++
entry.lastSecRemoval++
@@ -107,9 +100,11 @@ func (v *TimedUserValidator) updateUserHash(interval time.Duration) {
select {
case now := <-time.After(interval):
nowSec := protocol.Timestamp(now.Unix() + cacheDurationSec)
v.Lock()
for _, entry := range v.ids {
v.generateNewHashes(nowSec, entry.userIdx, entry)
}
v.Unlock()
case <-v.cancel.WaitForCancel():
return
}
@@ -117,6 +112,9 @@ func (v *TimedUserValidator) updateUserHash(interval time.Duration) {
}
func (v *TimedUserValidator) Add(user *protocol.User) error {
v.Lock()
defer v.Unlock()
idx := len(v.validUsers)
v.validUsers = append(v.validUsers, user)
rawAccount, err := user.GetTypedAccount()

View File

@@ -1,9 +1,12 @@
package scenarios
import (
"github.com/golang/protobuf/proto"
"sync/atomic"
"time"
"net"
"github.com/golang/protobuf/proto"
"v2ray.com/core"
v2net "v2ray.com/core/common/net"
)
@@ -16,6 +19,32 @@ func pickPort() v2net.Port {
return v2net.Port(atomic.AddUint32(&port, 1))
}
func xor(b []byte) []byte {
r := make([]byte, len(b))
for i, v := range b {
r[i] = v ^ 'c'
}
return r
}
func readFrom(conn net.Conn, timeout time.Duration, length int) []byte {
b := make([]byte, 2048)
totalBytes := 0
deadline := time.Now().Add(timeout)
conn.SetReadDeadline(deadline)
for totalBytes < length {
if time.Now().After(deadline) {
break
}
n, err := conn.Read(b[totalBytes:])
if err != nil {
break
}
totalBytes += n
}
return b[:totalBytes]
}
func InitializeServerConfig(config *core.Config) error {
err := BuildV2Ray()
if err != nil {

View File

@@ -1,11 +1,11 @@
package scenarios
import (
"bytes"
"io"
"fmt"
"net"
"testing"
"v2ray.com/core/common/buf"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/testing/assert"
"v2ray.com/core/testing/servers/tcp"
@@ -43,10 +43,25 @@ func TestDynamicVMess(t *testing.T) {
conn.CloseWrite()
response := bytes.NewBuffer(nil)
_, err = io.Copy(response, conn)
assert.Error(err).IsNil()
assert.String("Processed: " + payload).Equals(string(response.Bytes()))
expectedResponse := "Processed: " + payload
finished := false
response := buf.New()
for {
err := response.AppendSupplier(buf.ReadFrom(conn))
assert.Error(err).IsNil()
if err != nil {
break
}
if response.String() == expectedResponse {
finished = true
break
}
if response.Len() > len(expectedResponse) {
fmt.Printf("Unexpected response: %v\n", response.Bytes())
break
}
}
assert.Bool(finished).IsTrue()
conn.Close()
}

View File

@@ -1,15 +1,148 @@
package scenarios
import (
"net"
"path/filepath"
"testing"
"io/ioutil"
"os"
"time"
"v2ray.com/core"
v2net "v2ray.com/core/common/net"
"v2ray.com/core/common/protocol"
"v2ray.com/core/common/serial"
"v2ray.com/core/common/uuid"
"v2ray.com/core/proxy/dokodemo"
"v2ray.com/core/proxy/freedom"
"v2ray.com/core/proxy/vmess"
"v2ray.com/core/proxy/vmess/inbound"
"v2ray.com/core/proxy/vmess/outbound"
"v2ray.com/core/testing/assert"
"v2ray.com/core/testing/servers/tcp"
"v2ray.com/core/transport/internet"
"v2ray.com/core/transport/internet/tls"
)
var clientConfig = &core.Config{
Inbound: []*core.InboundConnectionConfig{
{
PortRange: v2net.SinglePortRange(pickPort()),
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
},
},
func mustReadFile(name string) []byte {
content, err := ioutil.ReadFile(name)
if err != nil {
panic(err)
}
return content
}
func TestSimpleTLSConnection(t *testing.T) {
assert := assert.On(t)
tcpServer := tcp.Server{
MsgProcessor: xor,
}
dest, err := tcpServer.Start()
assert.Error(err).IsNil()
userID := protocol.NewID(uuid.New())
serverPort := pickPort()
serverConfig := &core.Config{
Inbound: []*core.InboundConnectionConfig{
{
PortRange: v2net.SinglePortRange(serverPort),
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
Settings: serial.ToTypedMessage(&inbound.Config{
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vmess.Account{
Id: userID.String(),
}),
},
},
}),
StreamSettings: &internet.StreamConfig{
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
Certificate: []*tls.Certificate{
{
Certificate: mustReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "cert.pem")),
Key: mustReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "key.pem")),
},
},
}),
},
},
},
},
Outbound: []*core.OutboundConnectionConfig{
{
Settings: serial.ToTypedMessage(&freedom.Config{}),
},
},
}
clientPort := pickPort()
clientConfig := &core.Config{
Inbound: []*core.InboundConnectionConfig{
{
PortRange: v2net.SinglePortRange(clientPort),
ListenOn: v2net.NewIPOrDomain(v2net.LocalHostIP),
Settings: serial.ToTypedMessage(&dokodemo.Config{
Address: v2net.NewIPOrDomain(dest.Address),
Port: uint32(dest.Port),
NetworkList: &v2net.NetworkList{
Network: []v2net.Network{v2net.Network_TCP},
},
}),
},
},
Outbound: []*core.OutboundConnectionConfig{
{
Settings: serial.ToTypedMessage(&outbound.Config{
Receiver: []*protocol.ServerEndpoint{
{
Address: v2net.NewIPOrDomain(v2net.LocalHostIP),
Port: uint32(serverPort),
User: []*protocol.User{
{
Account: serial.ToTypedMessage(&vmess.Account{
Id: userID.String(),
}),
},
},
},
},
}),
StreamSettings: &internet.StreamConfig{
SecurityType: serial.GetMessageType(&tls.Config{}),
SecuritySettings: []*serial.TypedMessage{
serial.ToTypedMessage(&tls.Config{
AllowInsecure: true,
}),
},
},
},
},
}
assert.Error(InitializeServerConfig(serverConfig)).IsNil()
assert.Error(InitializeServerConfig(clientConfig)).IsNil()
conn, err := net.DialTCP("tcp", nil, &net.TCPAddr{
IP: []byte{127, 0, 0, 1},
Port: int(clientPort),
})
payload := "dokodemo request."
nBytes, err := conn.Write([]byte(payload))
assert.Error(err).IsNil()
assert.Int(nBytes).Equals(len(payload))
conn.CloseWrite()
response := readFrom(conn, time.Second*2, len(payload))
assert.Bytes(response).Equals(xor([]byte(payload)))
conn.Close()
CloseAllServers()
}

View File

@@ -39,10 +39,6 @@ const (
StateTerminated State = 5
)
const (
headerSize uint32 = 2
)
func nowMillisec() int64 {
now := time.Now()
return now.Unix()*1000 + int64(now.Nanosecond()/1000000)
@@ -517,6 +513,10 @@ func (v *Connection) Input(segments []Segment) {
v.SetState(StateTerminated)
}
}
if seg.Option == SegmentOptionClose || seg.Command() == CommandTerminate {
v.OnDataInput()
v.OnDataOutput()
}
v.sendingWorker.ProcessReceivingNext(seg.ReceivinNext)
v.receivingWorker.ProcessSendingNext(seg.SendingNext)
v.roundTrip.UpdatePeerRTO(seg.PeerRTO, current)

View File

@@ -5,11 +5,11 @@ import (
"v2ray.com/core/transport/internet"
)
func (v *ConnectionReuse) IsEnabled() bool {
if v == nil {
func (v *Config) IsConnectionReuse() bool {
if v == nil || v.ConnectionReuse == nil {
return true
}
return v.Enable
return v.ConnectionReuse.Enable
}
func init() {

View File

@@ -39,7 +39,7 @@ func NewConnection(id internal.ConnectionId, conn net.Conn, manager ConnectionMa
id: id,
conn: conn,
listener: manager,
reusable: config.ConnectionReuse.IsEnabled(),
reusable: config.IsConnectionReuse(),
config: config,
}
}
@@ -97,7 +97,7 @@ func (v *Connection) SetReusable(reusable bool) {
}
func (v *Connection) Reusable() bool {
return v.config.ConnectionReuse.IsEnabled() && v.reusable
return v.config.IsConnectionReuse() && v.reusable
}
func (v *Connection) SysFd() (int, error) {

View File

@@ -3,6 +3,7 @@ package tcp
import (
"crypto/tls"
"net"
"v2ray.com/core/common/errors"
"v2ray.com/core/common/log"
v2net "v2ray.com/core/common/net"
@@ -28,7 +29,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
id := internal.NewConnectionId(src, dest)
var conn net.Conn
if dest.Network == v2net.Network_TCP && tcpSettings.ConnectionReuse.IsEnabled() {
if dest.Network == v2net.Network_TCP && tcpSettings.IsConnectionReuse() {
conn = globalCache.Get(id)
}
if conn == nil {

View File

@@ -26,6 +26,7 @@ func (v *Config) BuildCertificates() []tls.Certificate {
func (v *Config) GetTLSConfig() *tls.Config {
config := &tls.Config{
ClientSessionCache: globalSessionCache,
NextProtos: []string{"http/1.1"},
}
if v == nil {
return config

View File

@@ -5,11 +5,11 @@ import (
"v2ray.com/core/transport/internet"
)
func (v *ConnectionReuse) IsEnabled() bool {
if v == nil {
func (c *Config) IsConnectionReuse() bool {
if c == nil || c.ConnectionReuse == nil {
return false
}
return v.Enable
return c.ConnectionReuse.Enable
}
func init() {

View File

@@ -29,7 +29,7 @@ func NewConnection(dest string, conn *wsconn, manager ConnectionManager, config
dest: dest,
conn: conn,
listener: manager,
reusable: config.ConnectionReuse.IsEnabled(),
reusable: config.IsConnectionReuse(),
config: config,
}
}
@@ -83,14 +83,11 @@ func (v *Connection) SetWriteDeadline(t time.Time) error {
}
func (v *Connection) SetReusable(reusable bool) {
if !v.config.ConnectionReuse.IsEnabled() {
return
}
v.reusable = reusable
}
func (v *Connection) Reusable() bool {
return v.reusable
return v.config.IsConnectionReuse() && v.reusable
}
func (v *Connection) SysFd() (int, error) {

View File

@@ -28,7 +28,7 @@ func Dial(src v2net.Address, dest v2net.Destination, options internet.DialerOpti
id := src.String() + "-" + dest.NetAddr()
var conn *wsconn
if dest.Network == v2net.Network_TCP && wsSettings.ConnectionReuse.IsEnabled() {
if dest.Network == v2net.Network_TCP && wsSettings.IsConnectionReuse() {
connt := globalCache.Get(id)
if connt != nil {
conn = connt.(*wsconn)

View File

@@ -1,4 +1,4 @@
/*Package ws implements Websocket transport
/*Package websocket implements Websocket transport
Websocket transport implements a HTTP(S) compliable, surveillance proof transport method with plausible deniability.

View File

@@ -2,6 +2,8 @@ package websocket_test
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
@@ -100,8 +102,8 @@ func Test_listenWSAndDial_TLS(t *testing.T) {
AllowInsecure: true,
Certificate: []*v2tls.Certificate{
{
Certificate: ReadFile("./../../../testing/tls/cert.pem", assert),
Key: ReadFile("./../../../testing/tls/key.pem", assert),
Certificate: ReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "cert.pem"), assert),
Key: ReadFile(filepath.Join(os.Getenv("GOPATH"), "src", "v2ray.com", "core", "testing", "tls", "key.pem"), assert),
},
},
}

View File

@@ -159,13 +159,10 @@ func (ws *wsconn) setup() {
}
func (ws *wsconn) Reusable() bool {
return ws.reusable && !ws.connClosing
return ws.config.IsConnectionReuse() && ws.reusable && !ws.connClosing
}
func (ws *wsconn) SetReusable(reusable bool) {
if !ws.config.ConnectionReuse.IsEnabled() {
return
}
ws.reusable = reusable
}

View File

@@ -65,11 +65,17 @@ func (v *Stream) Read() (*buf.Buffer, error) {
return b, nil
case <-v.srcClose:
return nil, io.EOF
case <-v.destClose:
return nil, io.ErrClosedPipe
}
}
}
func (v *Stream) Write(data *buf.Buffer) (err error) {
if data.IsEmpty() {
return
}
select {
case <-v.destClose:
return io.ErrClosedPipe
@@ -93,7 +99,7 @@ func (v *Stream) Close() {
close(v.srcClose)
}
func (v *Stream) Release() {
func (v *Stream) ForceClose() {
defer swallowPanic()
close(v.destClose)
@@ -110,6 +116,8 @@ func (v *Stream) Release() {
}
}
func (v *Stream) Release() {}
func swallowPanic() {
recover()
}

View File

@@ -13,7 +13,9 @@ func TestStreamIO(t *testing.T) {
assert := assert.On(t)
stream := NewStream()
assert.Error(stream.Write(buf.New())).IsNil()
b1 := buf.New()
b1.AppendBytes('a')
assert.Error(stream.Write(b1)).IsNil()
_, err := stream.Read()
assert.Error(err).IsNil()
@@ -22,7 +24,9 @@ func TestStreamIO(t *testing.T) {
_, err = stream.Read()
assert.Error(err).Equals(io.EOF)
err = stream.Write(buf.New())
b2 := buf.New()
b2.AppendBytes('b')
err = stream.Write(b2)
assert.Error(err).Equals(io.ErrClosedPipe)
}
@@ -30,7 +34,9 @@ func TestStreamClose(t *testing.T) {
assert := assert.On(t)
stream := NewStream()
assert.Error(stream.Write(buf.New())).IsNil()
b1 := buf.New()
b1.AppendBytes('a')
assert.Error(stream.Write(b1)).IsNil()
stream.Close()

View File

@@ -35,7 +35,7 @@ type Ray interface {
type InputStream interface {
buf.Reader
Close()
ForceClose()
}
type OutputStream interface {