Compare commits

..

31 Commits

Author SHA1 Message Date
v2ray
44bf412dfc release all adaptive readers and writers 2016-04-18 19:01:24 +02:00
v2ray
7768674df8 revert unintentional change 2016-04-18 18:59:57 +02:00
v2ray
7407c8d561 use stream instead of raw chan 2016-04-18 18:44:10 +02:00
v2ray
42b8dbe871 release all readers and writers 2016-04-12 21:56:36 +02:00
v2ray
70f803173a simplify reader/writer interface 2016-04-12 21:43:13 +02:00
v2ray
308c40553c split BufferPool from buffer.go 2016-04-12 16:52:57 +02:00
v2ray
1381fd32b4 fix build test for windows 2016-03-27 22:21:44 +08:00
v2ray
e8023f0d92 releasable writer 2016-03-24 23:36:18 +08:00
v2ray
760247a4ab fix log test for windows 2016-03-24 23:36:06 +08:00
Darien Raymond
40945f2967 Merge pull request #109 from korzonek/master
codebeat badge
2016-03-16 00:55:56 +01:00
Tomasz Korzeniowski
3108db1d6a codebeat badge
Is it fine to add codebeat badge to README? 

codebeat is automated code review tool for Swift,Ruby,Go & Python that helps get instant feedback on code quality. 

"Quick wins" suggested by codebeat could be a nice candidate for a pull request and help other developers become contributors.

FYI. To be fully open and honest. I'm co-founder of codebeat.
2016-03-15 15:12:23 +01:00
v2ray
77ecd1649b fix test break 2016-03-13 11:34:09 +01:00
v2ray
ff69ac7a0e Release all references 2016-03-11 23:51:58 +01:00
v2ray
f5960db27e more china sites 2016-03-11 23:51:24 +01:00
v2ray
1b5b599741 agressively cleanup references at release 2016-03-10 16:56:19 +01:00
v2ray
a341cab302 implement releasable interface in v2io 2016-03-10 00:33:14 +01:00
v2ray
c7f6426c88 release interface 2016-03-10 00:33:01 +01:00
v2ray
be90c04deb prevent double release in buffer 2016-03-09 11:34:39 +01:00
v2ray
6f7fff8173 release all references in tcp hub after it is closed 2016-03-09 11:34:27 +01:00
v2ray
50ef8e0465 more china sites 2016-03-08 17:35:33 +01:00
v2ray
486965d1eb more china sites 2016-03-08 15:45:30 +01:00
v2ray
185c4e6eae remove unnecessary ExecStop as systemd does it automatically 2016-03-08 15:45:09 +01:00
v2ray
bd94b8254a Merge branch 'master' of https://github.com/v2ray/v2ray-core 2016-03-06 16:36:18 +01:00
v2ray
3b35df1244 Handler state 2016-03-06 16:36:08 +01:00
v2ray
83e0482d2b Fix dynamic port allocation 2016-03-06 16:36:03 +01:00
v2ray
441a198b86 more china sites 2016-03-05 22:03:28 +01:00
Darien Raymond
d391dcd9c9 Merge pull request #103 from kxjhlele/patch-10
Update README.md
2016-03-05 14:18:35 +01:00
kxjhlele
f575dcc354 Update README.md 2016-03-05 12:23:05 +08:00
v2ray
26925a7d90 more china sites 2016-03-04 00:27:33 +01:00
v2ray
17a6c7ed1b update vscode settings 2016-02-29 23:20:15 +01:00
v2ray
7b74665a64 more china sites 2016-02-29 23:20:08 +01:00
43 changed files with 522 additions and 267 deletions

View File

@@ -1,6 +1,6 @@
// Place your settings in this file to overwrite default and user settings.
{
"go.buildFlags": ["-tags=json"],
"go.lintFlags": ["-tags=json"],
"go.vetFlags": ["-tags=json"]
"go.buildFlags": ["-tags", "json"],
"go.lintFlags": ["-tags", "json"],
"go.vetFlags": ["-tags", "json"]
}

View File

@@ -5,12 +5,13 @@
[![codecov.io](https://codecov.io/github/v2ray/v2ray-core/coverage.svg?branch=master)](https://codecov.io/github/v2ray/v2ray-core?branch=master)
[![Go Report](http://goreportcard.com/badge/v2ray/v2ray-core)](https://goreportcard.com/report/v2ray/v2ray-core)
[![GoDoc](https://godoc.org/github.com/v2ray/v2ray-core?status.svg)](https://godoc.org/github.com/v2ray/v2ray-core)
[![codebeat](https://codebeat.co/badges/f2354ca8-3e24-463d-a2e3-159af73b2477)](https://codebeat.co/projects/github-com-v2ray-v2ray-core)
V2Ray 是一个模块化的代理软件包,它的目标是提供常用的代理软件模块,简化网络代理软件的开发。
[官方网站](https://www.v2ray.com/) | [Wiki](https://github.com/v2ray/v2ray.github.io/wiki)
[官方网站](https://www.v2ray.com/)
V2Ray provides building blocks for network proxy development. Read our [Wiki](https://github.com/v2ray/v2ray.github.io/wiki/en-us:Home) for more information.
V2Ray provides building blocks for network proxy development. Read our [Wiki](https://www.v2ray.com/en/index.html) for more information.
## License
[The MIT License (MIT)](https://raw.githubusercontent.com/v2ray/v2ray-core/master/LICENSE)

View File

@@ -6,28 +6,32 @@ import (
)
type TestPacketDispatcher struct {
LastPacket chan v2net.Packet
Handler func(packet v2net.Packet, traffic ray.OutboundRay)
Destination chan v2net.Destination
Handler func(packet v2net.Packet, traffic ray.OutboundRay)
}
func NewTestPacketDispatcher(handler func(packet v2net.Packet, traffic ray.OutboundRay)) *TestPacketDispatcher {
if handler == nil {
handler = func(packet v2net.Packet, traffic ray.OutboundRay) {
for payload := range traffic.OutboundInput() {
traffic.OutboundOutput() <- payload.Prepend([]byte("Processed: "))
for {
payload, err := traffic.OutboundInput().Read()
if err != nil {
break
}
traffic.OutboundOutput().Write(payload.Prepend([]byte("Processed: ")))
}
close(traffic.OutboundOutput())
traffic.OutboundOutput().Close()
}
}
return &TestPacketDispatcher{
LastPacket: make(chan v2net.Packet, 16),
Handler: handler,
Destination: make(chan v2net.Destination),
Handler: handler,
}
}
func (this *TestPacketDispatcher) DispatchToOutbound(packet v2net.Packet) ray.InboundRay {
traffic := ray.NewRay()
this.LastPacket <- packet
this.Destination <- packet.Destination()
go this.Handler(packet, traffic)
return traffic

View File

@@ -13,6 +13,7 @@ const (
dotCc = "\\.cc$"
dotCn = "\\.cn$"
dotCom = "\\.com$"
dotInfo = "\\.info$"
dotIo = "\\.io$"
dotLa = "\\.la$"
dotMe = "\\.me$"
@@ -42,10 +43,12 @@ func init() {
anySubDomain + "163" + dotCom,
anySubDomain + "17173" + dotCom,
anySubDomain + "17cdn" + dotCom,
anySubDomain + "188" + dotCom,
anySubDomain + "1905" + dotCom,
anySubDomain + "21cn" + dotCom,
anySubDomain + "2288" + dotOrg,
anySubDomain + "2345" + dotCom,
anySubDomain + "263" + dotNet,
anySubDomain + "2cto" + dotCom,
anySubDomain + "3322" + dotOrg,
anySubDomain + "35" + dotCom,
@@ -61,18 +64,24 @@ func init() {
anySubDomain + "500d" + dotMe,
anySubDomain + "50bang" + dotOrg,
anySubDomain + "51" + dotLa,
anySubDomain + "51credit" + dotCom,
anySubDomain + "51cto" + dotCom,
anySubDomain + "51job" + dotCom,
anySubDomain + "51jobcdn" + dotCom,
anySubDomain + "51wendang" + dotCom,
anySubDomain + "55" + dotCom,
anySubDomain + "51yes" + dotCom,
anySubDomain + "55bbs" + dotCom,
anySubDomain + "58" + dotCom,
anySubDomain + "6rooms" + dotCom,
anySubDomain + "71" + dotAm,
anySubDomain + "7k7k" + dotCom,
anySubDomain + "900" + dotLa,
anySubDomain + "9718" + dotCom,
anySubDomain + "9xu" + dotCom,
anySubDomain + "abchina" + dotCom,
anySubDomain + "acfun" + dotTv,
anySubDomain + "agrantsem" + dotCom,
anySubDomain + "aicdn" + dotCom,
anySubDomain + "aixifan" + dotCom,
anySubDomain + "alibaba" + dotCom,
@@ -89,8 +98,10 @@ func init() {
anySubDomain + "anquan" + dotOrg,
anySubDomain + "appinn" + dotCom,
anySubDomain + "babytree" + dotCom,
anySubDomain + "babytreeimg" + dotCom,
anySubDomain + "baidu" + dotCom,
anySubDomain + "baiducontent" + dotCom,
anySubDomain + "baidupcs" + dotCom,
anySubDomain + "baidustatic" + dotCom,
anySubDomain + "baifendian" + dotCom,
anySubDomain + "baifubao" + dotCom,
@@ -104,6 +115,7 @@ func init() {
anySubDomain + "bdimg" + dotCom,
anySubDomain + "bdstatic" + dotCom,
anySubDomain + "bilibili" + dotCom,
"cn\\.bing" + dotCom,
anySubDomain + "bitauto" + dotCom,
anySubDomain + "bitautoimg" + dotCom,
anySubDomain + "bobo" + dotCom,
@@ -113,13 +125,16 @@ func init() {
anySubDomain + "cctv" + dotCom,
anySubDomain + "cctvpic" + dotCom,
anySubDomain + "cdn20" + dotCom,
anySubDomain + "cebbank" + dotCom,
anySubDomain + "ch" + dotCom,
anySubDomain + "chashebao" + dotCom,
anySubDomain + "che168" + dotCom,
anySubDomain + "china" + dotCom,
anySubDomain + "chinacache" + dotCom,
anySubDomain + "chinacache" + dotNet,
anySubDomain + "chinahr" + dotCom,
anySubDomain + "chinamobile" + dotCom,
anySubDomain + "chinatranslation" + dotNet,
anySubDomain + "chinaz" + dotCom,
anySubDomain + "chouti" + dotCom,
anySubDomain + "chuangxin" + dotCom,
@@ -133,6 +148,7 @@ func init() {
anySubDomain + "cnepub" + dotCom,
anySubDomain + "cnzz" + dotCom,
anySubDomain + "coding" + dotNet,
anySubDomain + "cqvip" + dotCom,
anySubDomain + "csbew" + dotCom,
anySubDomain + "csdn" + dotNet,
anySubDomain + "ctrip" + dotCom,
@@ -142,9 +158,11 @@ func init() {
anySubDomain + "dangdang" + dotCom,
anySubDomain + "daocloud" + dotIo,
anySubDomain + "daovoice" + dotIo,
anySubDomain + "dbank" + dotCom,
anySubDomain + "dedecms" + dotCom,
anySubDomain + "diandian" + dotCom,
anySubDomain + "dianping" + dotCom,
anySubDomain + "diopic" + dotNet,
anySubDomain + "docin" + dotCom,
anySubDomain + "dockerone" + dotCom,
anySubDomain + "dockone" + dotIo,
@@ -152,8 +170,10 @@ func init() {
anySubDomain + "douban" + dotCom,
anySubDomain + "doubanio" + dotCom,
anySubDomain + "dpfile" + dotCom,
anySubDomain + "duomai" + dotCom,
anySubDomain + "duoshuo" + dotCom,
anySubDomain + "duowan" + dotCom,
anySubDomain + "dxpmedia" + dotCom,
anySubDomain + "eastday" + dotCom,
anySubDomain + "ecitic" + dotCom,
anySubDomain + "emarbox" + dotCom,
@@ -162,10 +182,14 @@ func init() {
anySubDomain + "fanli" + dotCom,
anySubDomain + "fengniao" + dotCom,
anySubDomain + "fhldns" + dotCom,
anySubDomain + "foxmail" + dotCom,
anySubDomain + "geekpark" + dotNet,
anySubDomain + "geetest" + dotCom,
anySubDomain + "geilicdn" + dotCom,
anySubDomain + "getui" + dotCom,
anySubDomain + "growingio" + dotCom,
anySubDomain + "gtags" + dotNet,
anySubDomain + "gwdang" + dotCom,
anySubDomain + "hao123" + dotCom,
anySubDomain + "hao123img" + dotCom,
anySubDomain + "haosou" + dotCom,
@@ -185,9 +209,12 @@ func init() {
anySubDomain + "ifanr" + dotCom,
anySubDomain + "ifanrusercontent" + dotCom,
anySubDomain + "ifanrx" + dotCom,
anySubDomain + "ifeng" + dotCom,
anySubDomain + "ifengimg" + dotCom,
anySubDomain + "ijinshan" + dotCom,
anySubDomain + "imedao" + dotCom,
anySubDomain + "imgo" + dotTv,
anySubDomain + "imooc" + dotCom,
anySubDomain + "infoq" + dotCom,
anySubDomain + "infoqstatic" + dotCom,
anySubDomain + "ip138" + dotCom,
@@ -205,12 +232,15 @@ func init() {
anySubDomain + "jb51" + dotCom,
anySubDomain + "jia" + dotCom,
anySubDomain + "jianshu" + dotCom,
anySubDomain + "jianshu" + dotIo,
anySubDomain + "jiasuhui" + dotCom,
anySubDomain + "jiathis" + dotCom,
anySubDomain + "jiayuan" + dotCom,
anySubDomain + "jikexueyuan" + dotCom,
anySubDomain + "jisuanke" + dotCom,
anySubDomain + "jmstatic" + dotCom,
anySubDomain + "jstv" + dotCom,
anySubDomain + "jumei" + dotCom,
anySubDomain + "jyimg" + dotCom,
anySubDomain + "kaixin001" + dotCom,
anySubDomain + "kanimg" + dotCom,
@@ -227,6 +257,7 @@ func init() {
anySubDomain + "kuqin" + dotCom,
anySubDomain + "lady8844" + dotCom,
anySubDomain + "lagou" + dotCom,
anySubDomain + "le" + dotCom,
anySubDomain + "leanote" + dotCom,
anySubDomain + "leiphone" + dotCom,
anySubDomain + "leju" + dotCom,
@@ -236,10 +267,14 @@ func init() {
anySubDomain + "letvimg" + dotCom,
anySubDomain + "liantu" + dotMe,
anySubDomain + "liaoxuefeng" + dotCom,
anySubDomain + "liba" + dotCom,
anySubDomain + "libaclub" + dotCom,
anySubDomain + "liepin" + dotCom,
anySubDomain + "lietou" + dotCom,
anySubDomain + "linkvans" + dotCom,
anySubDomain + "lightonus" + dotCom,
anySubDomain + "linkvans" + dotCom,
anySubDomain + "linuxidc" + dotCom,
anySubDomain + "liuxiaoer" + dotCom,
anySubDomain + "lofter" + dotCom,
anySubDomain + "lu" + dotCom,
anySubDomain + "lufax" + dotCom,
@@ -248,8 +283,10 @@ func init() {
anySubDomain + "lxdns" + dotCom,
anySubDomain + "lxway" + dotCom,
anySubDomain + "ly" + dotCom,
anySubDomain + "mayihr" + dotCom,
anySubDomain + "mechina" + dotOrg,
anySubDomain + "mediav" + dotCom,
anySubDomain + "meiqia" + dotCom,
anySubDomain + "meika360" + dotCom,
anySubDomain + "meilishuo" + dotCom,
anySubDomain + "meishij" + dotNet,
@@ -259,17 +296,25 @@ func init() {
anySubDomain + "mi" + dotCom,
anySubDomain + "miaopai" + dotCom,
anySubDomain + "miaozhen" + dotCom,
anySubDomain + "mmbang" + dotCom,
anySubDomain + "mmbang" + dotInfo,
anySubDomain + "mmstat" + dotCom,
anySubDomain + "mogucdn" + dotCom,
anySubDomain + "mogujie" + dotCom,
anySubDomain + "mop" + dotCom,
anySubDomain + "mukewang" + dotCom,
anySubDomain + "mydrivers" + dotCom,
anySubDomain + "myshow360" + dotNet,
anySubDomain + "mzstatic" + dotCom,
anySubDomain + "netease" + dotCom,
anySubDomain + "newbandeng" + dotCom,
anySubDomain + "ngacn" + dotCc,
anySubDomain + "ntalker" + dotCom,
anySubDomain + "nvsheng" + dotCom,
anySubDomain + "oeeee" + dotCom,
anySubDomain + "ol-img" + dotCom,
anySubDomain + "oneapm" + dotCom,
anySubDomain + "onlinedown" + dotNet,
anySubDomain + "onlinesjtu" + dotCom,
anySubDomain + "oschina" + dotNet,
anySubDomain + "paipai" + dotCom,
@@ -277,6 +322,7 @@ func init() {
anySubDomain + "pingan" + dotCom,
anySubDomain + "pingplusplus" + dotCom,
anySubDomain + "pps" + dotTv,
anySubDomain + "psbc" + dotCom,
anySubDomain + "pubyun" + dotCom,
anySubDomain + "qbox" + dotMe,
anySubDomain + "qcloud" + dotCom,
@@ -303,6 +349,7 @@ func init() {
anySubDomain + "sanguosha" + dotCom,
anySubDomain + "sanwen" + dotNet,
anySubDomain + "segmentfault" + dotCom,
anySubDomain + "sf-express" + dotCom,
anySubDomain + "sharejs" + dotCom,
anySubDomain + "shutcm" + dotCom,
anySubDomain + "simei8" + dotCom,
@@ -315,16 +362,21 @@ func init() {
anySubDomain + "smzdm" + dotCom,
anySubDomain + "sohu" + dotCom,
anySubDomain + "sogou" + dotCom,
anySubDomain + "sogoucdn" + dotCom,
anySubDomain + "soso" + dotCom,
anySubDomain + "sspai" + dotCom,
anySubDomain + "starbaby" + dotCc,
anySubDomain + "starbaby" + dotCom,
anySubDomain + "staticfile" + dotOrg,
anySubDomain + "stockstar" + dotCom,
anySubDomain + "suning" + dotCom,
anySubDomain + "szfw" + dotOrg,
anySubDomain + "t1y5" + dotCom,
anySubDomain + "tanx" + dotCom,
anySubDomain + "tao123" + dotCom,
anySubDomain + "taobao" + dotCom,
anySubDomain + "taobaocdn" + dotCom,
anySubDomain + "tbcache" + dotCom,
anySubDomain + "tencent" + dotCom,
anySubDomain + "tenpay" + dotCom,
anySubDomain + "tenxcloud" + dotCom,
@@ -337,6 +389,7 @@ func init() {
anySubDomain + "tudou" + dotCom,
anySubDomain + "tudouui" + dotCom,
anySubDomain + "tuicool" + dotCom,
anySubDomain + "tuniu" + dotCom,
anySubDomain + "u17" + dotCom,
anySubDomain + "useso" + dotCom,
anySubDomain + "unionpay" + dotCom,
@@ -344,9 +397,11 @@ func init() {
anySubDomain + "upyun" + dotCom,
anySubDomain + "upaiyun" + dotCom,
anySubDomain + "v2ex" + dotCom,
anySubDomain + "v5875" + dotCom,
anySubDomain + "vamaker" + dotCom,
anySubDomain + "vancl" + dotCom,
anySubDomain + "vip" + dotCom,
anySubDomain + "wallstreetcn" + dotCom,
anySubDomain + "wandoujia" + dotCom,
anySubDomain + "wdjimg" + dotCom,
anySubDomain + "webterren" + dotCom,
@@ -355,9 +410,11 @@ func init() {
anySubDomain + "weidian" + dotCom,
anySubDomain + "weiyun" + dotCom,
anySubDomain + "wonnder" + dotCom,
anySubDomain + "worktile" + dotCom,
anySubDomain + "wooyun" + dotOrg,
anySubDomain + "wrating" + dotCom,
anySubDomain + "wscdns" + dotCom,
anySubDomain + "wumii" + dotCom,
anySubDomain + "xiachufang" + dotCom,
anySubDomain + "xiami" + dotCom,
anySubDomain + "xiaokaxiu" + dotCom,
@@ -372,21 +429,25 @@ func init() {
anySubDomain + "xywy" + dotCom,
anySubDomain + "yaolan" + dotCom,
anySubDomain + "yccdn" + dotCom,
anySubDomain + "yeepay" + dotCom,
anySubDomain + "yesky" + dotCom,
anySubDomain + "yigao" + dotCom,
anySubDomain + "yihaodian" + dotCom,
anySubDomain + "yihaodianimg" + dotCom,
anySubDomain + "yingjiesheng" + dotCom,
anySubDomain + "yinxiang" + dotCom,
anySubDomain + "yjbys" + dotCom,
anySubDomain + "yhd" + dotCom,
anySubDomain + "youboy" + dotCom,
anySubDomain + "youku" + dotCom,
anySubDomain + "yunba" + dotIo,
anySubDomain + "yundaex" + dotCom,
anySubDomain + "yunshipei" + dotCom,
anySubDomain + "yupoo" + dotCom,
anySubDomain + "yuzua" + dotCom,
anySubDomain + "yy" + dotCom,
anySubDomain + "yytcdn" + dotCom,
anySubDomain + "zampda" + dotNet,
anySubDomain + "zastatic" + dotCom,
anySubDomain + "zbjimg" + dotCom,
anySubDomain + "zhenai" + dotCom,
@@ -397,6 +458,7 @@ func init() {
anySubDomain + "zhiziyun" + dotCom,
anySubDomain + "zjstv" + dotCom,
anySubDomain + "zhubajie" + dotCom,
anySubDomain + "zrblog" + dotNet,
anySubDomain + "zuche" + dotCom,
anySubDomain + "zuchecdn" + dotCom,
}

View File

@@ -2,7 +2,6 @@ package alloc
import (
"io"
"sync"
)
const (
@@ -14,7 +13,7 @@ const (
// quickly.
type Buffer struct {
head []byte
pool *bufferPool
pool *BufferPool
Value []byte
offset int
}
@@ -24,7 +23,7 @@ func (b *Buffer) Release() {
if b == nil {
return
}
b.pool.free(b)
b.pool.Free(b)
b.head = nil
b.Value = nil
b.pool = nil
@@ -132,62 +131,17 @@ func (b *Buffer) FillFrom(reader io.Reader) (int, error) {
return nBytes, err
}
type bufferPool struct {
chain chan []byte
allocator *sync.Pool
}
func newBufferPool(bufferSize, poolSize int) *bufferPool {
pool := &bufferPool{
chain: make(chan []byte, poolSize),
allocator: &sync.Pool{
New: func() interface{} { return make([]byte, bufferSize) },
},
}
for i := 0; i < poolSize/2; i++ {
pool.chain <- make([]byte, bufferSize)
}
return pool
}
func (p *bufferPool) allocate() *Buffer {
var b []byte
select {
case b = <-p.chain:
default:
b = p.allocator.Get().([]byte)
}
return &Buffer{
head: b,
pool: p,
Value: b[defaultOffset:],
offset: defaultOffset,
}
}
func (p *bufferPool) free(buffer *Buffer) {
select {
case p.chain <- buffer.head:
default:
p.allocator.Put(buffer.head)
}
}
var smallPool = newBufferPool(1024, 64)
var mediumPool = newBufferPool(8*1024, 128)
var largePool = newBufferPool(64*1024, 64)
// NewSmallBuffer creates a Buffer with 1K bytes of arbitrary content.
func NewSmallBuffer() *Buffer {
return smallPool.allocate()
return smallPool.Allocate()
}
// NewBuffer creates a Buffer with 8K bytes of arbitrary content.
func NewBuffer() *Buffer {
return mediumPool.allocate()
return mediumPool.Allocate()
}
// NewLargeBuffer creates a Buffer with 64K bytes of arbitrary content.
func NewLargeBuffer() *Buffer {
return largePool.allocate()
return largePool.Allocate()
}

View File

@@ -0,0 +1,54 @@
package alloc
import (
"sync"
)
type BufferPool struct {
chain chan []byte
allocator *sync.Pool
}
func NewBufferPool(bufferSize, poolSize int) *BufferPool {
pool := &BufferPool{
chain: make(chan []byte, poolSize),
allocator: &sync.Pool{
New: func() interface{} { return make([]byte, bufferSize) },
},
}
for i := 0; i < poolSize/2; i++ {
pool.chain <- make([]byte, bufferSize)
}
return pool
}
func (p *BufferPool) Allocate() *Buffer {
var b []byte
select {
case b = <-p.chain:
default:
b = p.allocator.Get().([]byte)
}
return &Buffer{
head: b,
pool: p,
Value: b[defaultOffset:],
offset: defaultOffset,
}
}
func (p *BufferPool) Free(buffer *Buffer) {
rawBuffer := buffer.head
if rawBuffer == nil {
return
}
select {
case p.chain <- rawBuffer:
default:
p.allocator.Put(rawBuffer)
}
}
var smallPool = NewBufferPool(1024, 64)
var mediumPool = NewBufferPool(8*1024, 128)
var largePool = NewBufferPool(64*1024, 64)

View File

@@ -1,3 +1,17 @@
// Package common contains common utilities that are shared among other packages.
// See each sub-package for detail.
package common
import (
"errors"
)
var (
ErrorAlreadyReleased = errors.New("Object already released.")
)
// Releasable interface is for those types that can release its members.
type Releasable interface {
// Release releases all references to accelerate garbage collection.
Release()
}

View File

@@ -3,21 +3,26 @@ package crypto
import (
"crypto/cipher"
"io"
"github.com/v2ray/v2ray-core/common"
)
type cryptionReader struct {
type CryptionReader struct {
stream cipher.Stream
reader io.Reader
}
func NewCryptionReader(stream cipher.Stream, reader io.Reader) io.Reader {
return &cryptionReader{
func NewCryptionReader(stream cipher.Stream, reader io.Reader) *CryptionReader {
return &CryptionReader{
stream: stream,
reader: reader,
}
}
func (this *cryptionReader) Read(data []byte) (int, error) {
func (this *CryptionReader) Read(data []byte) (int, error) {
if this.reader == nil {
return 0, common.ErrorAlreadyReleased
}
nBytes, err := this.reader.Read(data)
if nBytes > 0 {
this.stream.XORKeyStream(data[:nBytes], data[:nBytes])
@@ -25,19 +30,32 @@ func (this *cryptionReader) Read(data []byte) (int, error) {
return nBytes, err
}
type cryptionWriter struct {
func (this *CryptionReader) Release() {
this.reader = nil
this.stream = nil
}
type CryptionWriter struct {
stream cipher.Stream
writer io.Writer
}
func NewCryptionWriter(stream cipher.Stream, writer io.Writer) io.Writer {
return &cryptionWriter{
func NewCryptionWriter(stream cipher.Stream, writer io.Writer) *CryptionWriter {
return &CryptionWriter{
stream: stream,
writer: writer,
}
}
func (this *cryptionWriter) Write(data []byte) (int, error) {
func (this *CryptionWriter) Write(data []byte) (int, error) {
if this.writer == nil {
return 0, common.ErrorAlreadyReleased
}
this.stream.XORKeyStream(data, data)
return this.writer.Write(data)
}
func (this *CryptionWriter) Release() {
this.writer = nil
this.stream = nil
}

View File

@@ -22,6 +22,8 @@ func NewBufferedReader(rawReader io.Reader) *BufferedReader {
func (this *BufferedReader) Release() {
this.buffer.Release()
this.buffer = nil
this.reader = nil
}
func (this *BufferedReader) Cached() bool {

View File

@@ -60,4 +60,6 @@ func (this *BufferedWriter) SetCached(cached bool) {
func (this *BufferedWriter) Release() {
this.buffer.Release()
this.buffer = nil
this.writer = nil
}

View File

@@ -3,6 +3,7 @@ package io // import "github.com/v2ray/v2ray-core/common/io"
import (
"io"
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
)
@@ -19,6 +20,7 @@ func ReadFrom(reader io.Reader, buffer *alloc.Buffer) (*alloc.Buffer, error) {
// Reader extends io.Reader with alloc.Buffer.
type Reader interface {
common.Releasable
// Read reads content from underlying reader, and put it into an alloc.Buffer.
Read() (*alloc.Buffer, error)
}
@@ -57,3 +59,7 @@ func (this *AdaptiveReader) Read() (*alloc.Buffer, error) {
}
return buffer, nil
}
func (this *AdaptiveReader) Release() {
this.reader = nil
}

View File

@@ -1,43 +1,16 @@
package io
import (
"io"
"github.com/v2ray/v2ray-core/common/alloc"
)
func RawReaderToChan(stream chan<- *alloc.Buffer, reader io.Reader) error {
return ReaderToChan(stream, NewAdaptiveReader(reader))
}
// ReaderToChan dumps all content from a given reader to a chan by constantly reading it until EOF.
func ReaderToChan(stream chan<- *alloc.Buffer, reader Reader) error {
func Pipe(reader Reader, writer Writer) error {
for {
buffer, err := reader.Read()
if buffer.Len() > 0 {
stream <- buffer
err = writer.Write(buffer)
} else {
buffer.Release()
}
if err != nil {
return err
return nil
}
}
}
func ChanToRawWriter(writer io.Writer, stream <-chan *alloc.Buffer) error {
return ChanToWriter(NewAdaptiveWriter(writer), stream)
}
// ChanToWriter dumps all content from a given chan to a writer until the chan is closed.
func ChanToWriter(writer Writer, stream <-chan *alloc.Buffer) error {
for buffer := range stream {
err := writer.Write(buffer)
buffer.Release()
if err != nil {
return err
}
}
return nil
}

View File

@@ -1,37 +0,0 @@
package io_test
import (
"bytes"
"crypto/rand"
"io"
"testing"
"github.com/v2ray/v2ray-core/common/alloc"
. "github.com/v2ray/v2ray-core/common/io"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
)
func TestReaderAndWrite(t *testing.T) {
v2testing.Current(t)
size := 1024 * 1024
buffer := make([]byte, size)
nBytes, err := rand.Read(buffer)
assert.Int(nBytes).Equals(len(buffer))
assert.Error(err).IsNil()
readerBuffer := bytes.NewReader(buffer)
writerBuffer := bytes.NewBuffer(make([]byte, 0, size))
transportChan := make(chan *alloc.Buffer, 1024)
err = ReaderToChan(transportChan, NewAdaptiveReader(readerBuffer))
assert.Error(err).Equals(io.EOF)
close(transportChan)
err = ChanToRawWriter(writerBuffer, transportChan)
assert.Error(err).IsNil()
assert.Bytes(buffer).Equals(writerBuffer.Bytes())
}

View File

@@ -3,11 +3,13 @@ package io
import (
"io"
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
)
// Writer extends io.Writer with alloc.Buffer.
type Writer interface {
common.Releasable
// Write writes an alloc.Buffer into underlying writer.
Write(*alloc.Buffer) error
}
@@ -30,5 +32,10 @@ func (this *AdaptiveWriter) Write(buffer *alloc.Buffer) error {
if nBytes < buffer.Len() {
_, err = this.writer.Write(buffer.Value[nBytes:])
}
buffer.Release()
return err
}
func (this *AdaptiveWriter) Release() {
this.writer = nil
}

View File

@@ -5,6 +5,7 @@ import (
"log"
"testing"
"github.com/v2ray/v2ray-core/common/platform"
"github.com/v2ray/v2ray-core/common/serial"
v2testing "github.com/v2ray/v2ray-core/testing"
"github.com/v2ray/v2ray-core/testing/assert"
@@ -30,10 +31,10 @@ func TestStreamLogger(t *testing.T) {
logger: log.New(buffer, "", 0),
}
Info("Test ", "Stream Logger", " Format")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Info]Test Stream Logger Format\n")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Info]Test Stream Logger Format" + platform.LineSeparator())
buffer.Reset()
errorLogger = infoLogger
Error("Test ", serial.StringLiteral("literal"), " Format")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Error]Test literal Format\n")
assert.StringLiteral(string(buffer.Bytes())).Equals("[Error]Test literal Format" + platform.LineSeparator())
}

View File

@@ -1,11 +1,14 @@
package net
import (
"github.com/v2ray/v2ray-core/common"
"github.com/v2ray/v2ray-core/common/alloc"
)
// Packet is a network packet to be sent to destination.
type Packet interface {
common.Releasable
Destination() Destination
Chunk() *alloc.Buffer // First chunk of this commnunication
MoreChunks() bool
@@ -37,3 +40,8 @@ func (packet *packetImpl) Chunk() *alloc.Buffer {
func (packet *packetImpl) MoreChunks() bool {
return packet.moreData
}
func (packet *packetImpl) Release() {
packet.data.Release()
packet.data = nil
}

View File

@@ -50,6 +50,11 @@ func (reader *TimeOutReader) SetTimeOut(value int) {
}
}
func (reader *TimeOutReader) Release() {
reader.connection = nil
reader.worker = nil
}
type timedReaderWorker struct {
timeout int
connection net.Conn

View File

@@ -57,3 +57,7 @@ func GetUserSettings(level UserLevel) UserSettings {
}
return settings
}
type Account interface {
CryptionKey() []byte
}

View File

@@ -1,10 +1,7 @@
package blackhole
import (
"io/ioutil"
"github.com/v2ray/v2ray-core/app"
v2io "github.com/v2ray/v2ray-core/common/io"
v2net "github.com/v2ray/v2ray-core/common/net"
"github.com/v2ray/v2ray-core/proxy"
"github.com/v2ray/v2ray-core/proxy/internal"
@@ -20,14 +17,14 @@ func NewBlackHole() *BlackHole {
}
func (this *BlackHole) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
if chunk := firstPacket.Chunk(); chunk != nil {
chunk.Release()
}
firstPacket.Release()
ray.OutboundOutput().Close()
ray.OutboundOutput().Release()
ray.OutboundInput().Close()
ray.OutboundInput().Release()
close(ray.OutboundOutput())
if firstPacket.MoreChunks() {
v2io.ChanToRawWriter(ioutil.Discard, ray.OutboundInput())
}
return nil
}

View File

@@ -1,7 +1,6 @@
package dokodemo
import (
"io"
"sync"
"github.com/v2ray/v2ray-core/app/dispatcher"
@@ -126,25 +125,29 @@ func (this *DokodemoDoor) HandleTCPConnection(conn *hub.TCPConn) {
packet := v2net.NewPacket(v2net.TCPDestination(this.address, this.port), nil, true)
ray := this.packetDispatcher.DispatchToOutbound(packet)
defer ray.InboundOutput().Release()
var inputFinish, outputFinish sync.Mutex
inputFinish.Lock()
outputFinish.Lock()
reader := v2net.NewTimeOutReader(this.config.Timeout, conn)
go dumpInput(reader, ray.InboundInput(), &inputFinish)
go dumpOutput(conn, ray.InboundOutput(), &outputFinish)
go func() {
v2reader := v2io.NewAdaptiveReader(reader)
defer v2reader.Release()
v2io.Pipe(v2reader, ray.InboundInput())
inputFinish.Unlock()
ray.InboundInput().Close()
}()
go func() {
v2writer := v2io.NewAdaptiveWriter(conn)
defer v2writer.Release()
v2io.Pipe(ray.InboundOutput(), v2writer)
outputFinish.Unlock()
}()
outputFinish.Lock()
}
func dumpInput(reader io.Reader, input chan<- *alloc.Buffer, finish *sync.Mutex) {
v2io.RawReaderToChan(input, reader)
finish.Unlock()
close(input)
}
func dumpOutput(writer io.Writer, output <-chan *alloc.Buffer, finish *sync.Mutex) {
v2io.ChanToRawWriter(writer, output)
finish.Unlock()
}

View File

@@ -43,7 +43,7 @@ func TestDokodemoTCP(t *testing.T) {
tcpClient.Write([]byte(data2Send))
tcpClient.CloseWrite()
lastPacket := <-testPacketDispatcher.LastPacket
destination := <-testPacketDispatcher.Destination
response := make([]byte, 1024)
nBytes, err := tcpClient.Read(response)
@@ -51,9 +51,9 @@ func TestDokodemoTCP(t *testing.T) {
tcpClient.Close()
assert.StringLiteral("Processed: " + data2Send).Equals(string(response[:nBytes]))
assert.Bool(lastPacket.Destination().IsTCP()).IsTrue()
netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4}))
netassert.Port(lastPacket.Destination().Port()).Equals(128)
assert.Bool(destination.IsTCP()).IsTrue()
netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{1, 2, 3, 4}))
netassert.Port(destination.Port()).Equals(128)
}
func TestDokodemoUDP(t *testing.T) {
@@ -86,10 +86,9 @@ func TestDokodemoUDP(t *testing.T) {
udpClient.Write([]byte(data2Send))
udpClient.Close()
lastPacket := <-testPacketDispatcher.LastPacket
destination := <-testPacketDispatcher.Destination
assert.StringLiteral(data2Send).Equals(string(lastPacket.Chunk().Value))
assert.Bool(lastPacket.Destination().IsUDP()).IsTrue()
netassert.Address(lastPacket.Destination().Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8}))
netassert.Port(lastPacket.Destination().Port()).Equals(256)
assert.Bool(destination.IsUDP()).IsTrue()
netassert.Address(destination.Address()).Equals(v2net.IPAddress([]byte{5, 6, 7, 8}))
netassert.Port(destination.Port()).Equals(256)
}

View File

@@ -19,6 +19,9 @@ type FreedomConnection struct {
func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.OutboundRay) error {
log.Info("Freedom: Opening connection to ", firstPacket.Destination())
defer firstPacket.Release()
defer ray.OutboundInput().Release()
var conn net.Conn
err := retry.Timed(5, 100).On(func() error {
rawConn, err := dialer.Dial(firstPacket.Destination())
@@ -29,7 +32,6 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
return nil
})
if err != nil {
close(ray.OutboundOutput())
log.Error("Freedom: Failed to open connection to ", firstPacket.Destination(), ": ", err)
return err
}
@@ -43,21 +45,23 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
if chunk := firstPacket.Chunk(); chunk != nil {
conn.Write(chunk.Value)
chunk.Release()
}
if !firstPacket.MoreChunks() {
writeMutex.Unlock()
} else {
go func() {
v2io.ChanToRawWriter(conn, input)
v2writer := v2io.NewAdaptiveWriter(conn)
defer v2writer.Release()
v2io.Pipe(input, v2writer)
writeMutex.Unlock()
}()
}
go func() {
defer readMutex.Unlock()
defer close(output)
defer output.Close()
var reader io.Reader = conn
@@ -65,7 +69,10 @@ func (this *FreedomConnection) Dispatch(firstPacket v2net.Packet, ray ray.Outbou
reader = v2net.NewTimeOutReader(16 /* seconds */, conn)
}
v2io.RawReaderToChan(output, reader)
v2reader := v2io.NewAdaptiveReader(reader)
defer v2reader.Release()
v2io.Pipe(v2reader, output)
}()
writeMutex.Lock()

View File

@@ -37,15 +37,12 @@ func TestSinglePacket(t *testing.T) {
err = freedom.Dispatch(packet, traffic)
assert.Error(err).IsNil()
close(traffic.InboundInput())
traffic.InboundInput().Close()
respPayload := <-traffic.InboundOutput()
defer respPayload.Release()
respPayload, err := traffic.InboundOutput().Read()
assert.Error(err).IsNil()
assert.Bytes(respPayload.Value).Equals([]byte("Processed: Data to be sent to remote"))
_, open := <-traffic.InboundOutput()
assert.Bool(open).IsFalse()
tcpServer.Close()
}
@@ -60,7 +57,4 @@ func TestUnreachableDestination(t *testing.T) {
err := freedom.Dispatch(packet, traffic)
assert.Error(err).IsNotNil()
_, open := <-traffic.InboundOutput()
assert.Bool(open).IsFalse()
}

View File

@@ -4,15 +4,16 @@ import (
"io"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
)
type ChanReader struct {
stream <-chan *alloc.Buffer
stream v2io.Reader
current *alloc.Buffer
eof bool
}
func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader {
func NewChanReader(stream v2io.Reader) *ChanReader {
this := &ChanReader{
stream: stream,
}
@@ -21,9 +22,9 @@ func NewChanReader(stream <-chan *alloc.Buffer) *ChanReader {
}
func (this *ChanReader) fill() {
b, open := <-this.stream
b, err := this.stream.Read()
this.current = b
if !open {
if err != nil {
this.eof = true
this.current = nil
}

View File

@@ -154,13 +154,20 @@ func (this *HttpProxyServer) transport(input io.Reader, output io.Writer, ray ra
defer wg.Wait()
go func() {
v2io.RawReaderToChan(ray.InboundInput(), input)
close(ray.InboundInput())
v2reader := v2io.NewAdaptiveReader(input)
defer v2reader.Release()
v2io.Pipe(v2reader, ray.InboundInput())
ray.InboundInput().Close()
wg.Done()
}()
go func() {
v2io.ChanToRawWriter(output, ray.InboundOutput())
v2writer := v2io.NewAdaptiveWriter(output)
defer v2writer.Release()
v2io.Pipe(ray.InboundOutput(), v2writer)
ray.InboundOutput().Release()
wg.Done()
}()
}
@@ -222,7 +229,7 @@ func (this *HttpProxyServer) handlePlainHTTP(request *http.Request, dest v2net.D
packet := v2net.NewPacket(dest, requestBuffer, true)
ray := this.packetDispatcher.DispatchToOutbound(packet)
defer close(ray.InboundInput())
defer ray.InboundInput().Close()
var wg sync.WaitGroup
wg.Add(1)

View File

@@ -6,6 +6,13 @@ import (
"github.com/v2ray/v2ray-core/transport/ray"
)
type HandlerState int
const (
HandlerStateStopped = HandlerState(0)
HandlerStateRunning = HandlerState(1)
)
// An InboundHandler handles inbound network connections to V2Ray.
type InboundHandler interface {
// Listen starts a InboundHandler by listen on a specific port.

View File

@@ -66,6 +66,11 @@ func NewChunkReader(reader io.Reader, auth *Authenticator) *ChunkReader {
}
}
func (this *ChunkReader) Release() {
this.reader = nil
this.auth = nil
}
func (this *ChunkReader) Read() (*alloc.Buffer, error) {
buffer := alloc.NewLargeBuffer()
if _, err := io.ReadFull(this.reader, buffer.Value[:2]); err != nil {

View File

@@ -204,7 +204,7 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
var writeFinish sync.Mutex
writeFinish.Lock()
go func() {
if payload, ok := <-ray.InboundOutput(); ok {
if payload, err := ray.InboundOutput().Read(); err == nil {
payload.SliceBack(ivLen)
rand.Read(payload.Value[:ivLen])
@@ -219,7 +219,11 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
payload.Release()
writer := crypto.NewCryptionWriter(stream, conn)
v2io.ChanToRawWriter(writer, ray.InboundOutput())
v2writer := v2io.NewAdaptiveWriter(writer)
defer writer.Release()
v2io.Pipe(ray.InboundOutput(), v2writer)
ray.InboundOutput().Release()
}
writeFinish.Unlock()
}()
@@ -232,8 +236,9 @@ func (this *Shadowsocks) handleConnection(conn *hub.TCPConn) {
payloadReader = v2io.NewAdaptiveReader(reader)
}
v2io.ReaderToChan(ray.InboundInput(), payloadReader)
close(ray.InboundInput())
v2io.Pipe(payloadReader, ray.InboundInput())
ray.InboundInput().Close()
payloadReader.Release()
writeFinish.Lock()
}

View File

@@ -95,8 +95,10 @@ func (this *SocksServer) handleConnection(connection *hub.TCPConn) {
timedReader := v2net.NewTimeOutReader(120, connection)
reader := v2io.NewBufferedReader(timedReader)
defer reader.Release()
writer := v2io.NewBufferedWriter(connection)
defer writer.Release()
auth, auth4, err := protocol.ReadAuthentication(reader)
if err != nil && err != protocol.Socks4Downgrade {
@@ -274,14 +276,21 @@ func (this *SocksServer) transport(reader io.Reader, writer io.Writer, firstPack
outputFinish.Lock()
go func() {
v2io.RawReaderToChan(input, reader)
v2reader := v2io.NewAdaptiveReader(reader)
defer v2reader.Release()
v2io.Pipe(v2reader, input)
inputFinish.Unlock()
close(input)
input.Close()
}()
go func() {
v2io.ChanToRawWriter(writer, output)
v2writer := v2io.NewAdaptiveWriter(writer)
defer v2writer.Release()
v2io.Pipe(output, v2writer)
outputFinish.Unlock()
output.Release()
}()
outputFinish.Lock()
}

View File

@@ -42,13 +42,20 @@ func (this *InboundConnectionHandler) Communicate(packet v2net.Packet) error {
writeFinish.Lock()
go func() {
v2io.RawReaderToChan(input, this.ConnInput)
close(input)
v2reader := v2io.NewAdaptiveReader(this.ConnInput)
defer v2reader.Release()
v2io.Pipe(v2reader, input)
input.Close()
readFinish.Unlock()
}()
go func() {
v2io.ChanToRawWriter(this.ConnOutput, output)
v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
defer v2writer.Release()
v2io.Pipe(output, v2writer)
output.Release()
writeFinish.Unlock()
}()

View File

@@ -33,15 +33,22 @@ func (this *OutboundConnectionHandler) Dispatch(packet v2net.Packet, ray ray.Out
writeFinish.Lock()
go func() {
v2io.ChanToRawWriter(this.ConnOutput, input)
v2writer := v2io.NewAdaptiveWriter(this.ConnOutput)
defer v2writer.Release()
v2io.Pipe(input, v2writer)
writeFinish.Unlock()
input.Release()
}()
writeFinish.Lock()
}
v2io.RawReaderToChan(output, this.ConnInput)
close(output)
v2reader := v2io.NewAdaptiveReader(this.ConnInput)
defer v2reader.Release()
v2io.Pipe(v2reader, output)
output.Close()
return nil
}

View File

@@ -118,8 +118,11 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
defer connection.Close()
connReader := v2net.NewTimeOutReader(16, connection)
defer connReader.Release()
reader := v2io.NewBufferedReader(connReader)
defer reader.Release()
session := raw.NewServerSession(this.clients)
request, err := session.DecodeRequestHeader(reader)
@@ -142,7 +145,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
connReader.SetTimeOut(userSettings.PayloadReadTimeout)
reader.SetCached(false)
go func() {
defer close(input)
defer input.Close()
defer readFinish.Unlock()
bodyReader := session.DecodeRequestBody(reader)
var requestReader v2io.Reader
@@ -151,10 +154,12 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
} else {
requestReader = v2io.NewAdaptiveReader(bodyReader)
}
v2io.ReaderToChan(input, requestReader)
v2io.Pipe(requestReader, input)
requestReader.Release()
}()
writer := v2io.NewBufferedWriter(connection)
defer writer.Release()
response := &proto.ResponseHeader{
Command: this.generateCommand(request),
@@ -165,7 +170,7 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
bodyWriter := session.EncodeResponseBody(writer)
// Optimize for small response packet
if data, open := <-output; open {
if data, err := output.Read(); err == nil {
if request.Option.IsChunkStream() {
vmessio.Authenticate(data)
}
@@ -178,7 +183,9 @@ func (this *VMessInboundHandler) HandleConnection(connection *hub.TCPConn) {
if request.Option.IsChunkStream() {
writer = vmessio.NewAuthChunkWriter(writer)
}
v2io.ChanToWriter(writer, output)
v2io.Pipe(output, writer)
output.Release()
writer.Release()
finish.Unlock()
}(&writeFinish)
writeFinish.Lock()

View File

@@ -44,3 +44,7 @@ func (this *AuthChunkReader) Read() (*alloc.Buffer, error) {
buffer.SliceFrom(4)
return buffer, nil
}
func (this *AuthChunkReader) Release() {
this.reader = nil
}

View File

@@ -23,6 +23,11 @@ func (this *AuthChunkWriter) Write(buffer *alloc.Buffer) error {
return this.writer.Write(buffer)
}
func (this *AuthChunkWriter) Release() {
this.writer.Release()
this.writer = nil
}
func Authenticate(buffer *alloc.Buffer) {
fnvHash := fnv.New32a()
fnvHash.Write(buffer.Value)

View File

@@ -5,7 +5,6 @@ import (
"sync"
"github.com/v2ray/v2ray-core/app"
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
"github.com/v2ray/v2ray-core/common/log"
v2net "github.com/v2ray/v2ray-core/common/net"
@@ -60,7 +59,7 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader,
if err != nil {
log.Error("Failed to open ", dest, ": ", err)
if ray != nil {
close(ray.OutboundOutput())
ray.OutboundOutput().Close()
}
return err
}
@@ -83,28 +82,22 @@ func (this *VMessOutboundHandler) startCommunicate(request *proto.RequestHeader,
requestFinish.Lock()
conn.CloseWrite()
responseFinish.Lock()
output.Close()
input.Release()
return nil
}
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input <-chan *alloc.Buffer, finish *sync.Mutex) {
func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, firstPacket v2net.Packet, input v2io.Reader, finish *sync.Mutex) {
defer finish.Unlock()
writer := v2io.NewBufferedWriter(conn)
defer writer.Release()
session.EncodeRequestHeader(request, writer)
// Send first packet of payload together with request, in favor of small requests.
firstChunk := firstPacket.Chunk()
moreChunks := firstPacket.MoreChunks()
for firstChunk == nil && moreChunks {
firstChunk, moreChunks = <-input
}
if firstChunk == nil && !moreChunks {
log.Warning("VMessOut: Nothing to send. Existing...")
return
}
if request.Option.IsChunkStream() {
vmessio.Authenticate(firstChunk)
}
@@ -120,16 +113,17 @@ func (this *VMessOutboundHandler) handleRequest(session *raw.ClientSession, conn
if request.Option.IsChunkStream() {
streamWriter = vmessio.NewAuthChunkWriter(streamWriter)
}
v2io.ChanToWriter(streamWriter, input)
v2io.Pipe(input, streamWriter)
streamWriter.Release()
}
return
}
func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output chan<- *alloc.Buffer, finish *sync.Mutex) {
func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, conn net.Conn, request *proto.RequestHeader, dest v2net.Destination, output v2io.Writer, finish *sync.Mutex) {
defer finish.Unlock()
defer close(output)
reader := v2io.NewBufferedReader(conn)
defer reader.Release()
header, err := session.DecodeResponseHeader(reader)
if err != nil {
@@ -148,7 +142,8 @@ func (this *VMessOutboundHandler) handleResponse(session *raw.ClientSession, con
bodyReader = v2io.NewAdaptiveReader(decryptReader)
}
v2io.ReaderToChan(output, bodyReader)
v2io.Pipe(bodyReader, output)
bodyReader.Release()
return
}

View File

@@ -7,7 +7,6 @@ Wants=network.target
Type=simple
PIDFile=/var/run/v2ray.pid
ExecStart=/usr/bin/v2ray/v2ray -config /etc/v2ray/config.json
ExecStop=kill $(cat /var/run/v2ray.pid)
Restart=on-abnormal
[Install]

View File

@@ -51,7 +51,6 @@ func (this *InboundDetourHandlerDynamic) pickUnusedPort() v2net.Port {
port := this.config.PortRange.From + v2net.Port(r)
_, used := this.portsInUse[port]
if !used {
this.portsInUse[port] = true
return port
}
}
@@ -84,31 +83,33 @@ func (this *InboundDetourHandlerDynamic) Close() {
}
func (this *InboundDetourHandlerDynamic) refresh() error {
this.Lock()
defer this.Unlock()
this.lastRefresh = time.Now()
this.ich2Recycle, this.ichInUse = this.ichInUse, this.ich2Recycle
for _, ich := range this.ichInUse {
port := this.pickUnusedPort()
for _, ich := range this.ich2Recycle {
port2Delete := ich.Port()
delete(this.portsInUse, ich.Port())
ich.Close()
err := retry.Timed(100 /* times */, 1000 /* ms */).On(func() error {
port := this.pickUnusedPort()
err := ich.Listen(port)
if err != nil {
log.Error("Point: Failed to start inbound detour on port ", port, ": ", err)
return err
}
this.portsInUse[port] = true
return nil
})
if err != nil {
return err
continue
}
delete(this.portsInUse, port2Delete)
}
this.Lock()
this.ich2Recycle, this.ichInUse = this.ichInUse, this.ich2Recycle
this.Unlock()
return nil
}

View File

@@ -199,13 +199,17 @@ func (this *Point) FilterPacketAndDispatch(packet v2net.Packet, link ray.Outboun
chunk := packet.Chunk()
moreChunks := packet.MoreChunks()
changed := false
var err error
for chunk == nil && moreChunks {
changed = true
chunk, moreChunks = <-link.OutboundInput()
chunk, err = link.OutboundInput().Read()
if err != nil {
moreChunks = false
}
}
if chunk == nil && !moreChunks {
log.Info("Point: No payload to dispatch, stopping dispatching now.")
close(link.OutboundOutput())
link.OutboundOutput().Close()
return
}

View File

@@ -2,7 +2,6 @@ package main
import (
"bytes"
"fmt"
"os"
"os/exec"
"path/filepath"
@@ -19,10 +18,12 @@ func TestBuildAndRun(t *testing.T) {
v2testing.Current(t)
gopath := os.Getenv("GOPATH")
target := filepath.Join(gopath, "src", "v2ray_test")
fmt.Println(target)
goOS := parseOS(runtime.GOOS)
goArch := parseArch(runtime.GOARCH)
target := filepath.Join(gopath, "src", "v2ray_test")
if goOS == Windows {
target += ".exe"
}
err := buildV2Ray(target, "v1.0", goOS, goArch)
assert.Error(err).IsNil()

View File

@@ -1,6 +1,7 @@
package hub
import (
"errors"
"net"
"time"
@@ -8,30 +9,45 @@ import (
v2net "github.com/v2ray/v2ray-core/common/net"
)
var (
ErrorClosedConnection = errors.New("Connection already closed.")
)
type TCPConn struct {
conn *net.TCPConn
listener *TCPHub
dirty bool
}
func (this *TCPConn) Read(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, ErrorClosedConnection
}
return this.conn.Read(b)
}
func (this *TCPConn) Write(b []byte) (int, error) {
if this == nil || this.conn == nil {
return 0, ErrorClosedConnection
}
return this.conn.Write(b)
}
func (this *TCPConn) Close() error {
return this.conn.Close()
if this == nil || this.conn == nil {
return ErrorClosedConnection
}
err := this.conn.Close()
return err
}
func (this *TCPConn) Release() {
if this.dirty {
this.Close()
if this == nil || this.listener == nil {
return
}
this.listener.recycle(this.conn)
this.Close()
this.conn = nil
this.listener = nil
}
func (this *TCPConn) LocalAddr() net.Addr {
@@ -55,10 +71,16 @@ func (this *TCPConn) SetWriteDeadline(t time.Time) error {
}
func (this *TCPConn) CloseRead() error {
if this == nil || this.conn == nil {
return nil
}
return this.conn.CloseRead()
}
func (this *TCPConn) CloseWrite() error {
if this == nil || this.conn == nil {
return nil
}
return this.conn.CloseWrite()
}
@@ -88,6 +110,7 @@ func ListenTCP(port v2net.Port, callback func(*TCPConn)) (*TCPHub, error) {
func (this *TCPHub) Close() {
this.accepting = false
this.listener.Close()
this.listener = nil
}
func (this *TCPHub) start() {

View File

@@ -32,7 +32,7 @@ func (this *UDPServer) locateExistingAndDispatch(dest string, packet v2net.Packe
this.RLock()
defer this.RUnlock()
if entry, found := this.conns[dest]; found {
entry.inboundRay.InboundInput() <- packet.Chunk()
entry.inboundRay.InboundInput().Write(packet.Chunk())
return true
}
return false
@@ -55,8 +55,12 @@ func (this *UDPServer) Dispatch(source v2net.Destination, packet v2net.Packet, c
}
func (this *UDPServer) handleConnection(destString string, inboundRay ray.InboundRay, source v2net.Destination, callback UDPResponseCallback) {
for buffer := range inboundRay.InboundOutput() {
callback(v2net.NewPacket(source, buffer, false))
for {
data, err := inboundRay.InboundOutput().Read()
if err != nil {
break
}
callback(v2net.NewPacket(source, data, false))
}
this.Lock()
delete(this.conns, destString)

View File

@@ -1,6 +1,9 @@
package ray
import (
"io"
"sync"
"github.com/v2ray/v2ray-core/common/alloc"
)
@@ -11,28 +14,101 @@ const (
// NewRay creates a new Ray for direct traffic transport.
func NewRay() Ray {
return &directRay{
Input: make(chan *alloc.Buffer, bufferSize),
Output: make(chan *alloc.Buffer, bufferSize),
Input: NewStream(),
Output: NewStream(),
}
}
type directRay struct {
Input chan *alloc.Buffer
Output chan *alloc.Buffer
Input *Stream
Output *Stream
}
func (this *directRay) OutboundInput() <-chan *alloc.Buffer {
func (this *directRay) OutboundInput() InputStream {
return this.Input
}
func (this *directRay) OutboundOutput() chan<- *alloc.Buffer {
func (this *directRay) OutboundOutput() OutputStream {
return this.Output
}
func (this *directRay) InboundInput() chan<- *alloc.Buffer {
func (this *directRay) InboundInput() OutputStream {
return this.Input
}
func (this *directRay) InboundOutput() <-chan *alloc.Buffer {
func (this *directRay) InboundOutput() InputStream {
return this.Output
}
type Stream struct {
access sync.RWMutex
closed bool
buffer chan *alloc.Buffer
}
func NewStream() *Stream {
return &Stream{
buffer: make(chan *alloc.Buffer, bufferSize),
}
}
func (this *Stream) Read() (*alloc.Buffer, error) {
if this.buffer == nil {
return nil, io.EOF
}
this.access.RLock()
defer this.access.RUnlock()
if this.buffer == nil {
return nil, io.EOF
}
result, open := <-this.buffer
if !open {
return nil, io.EOF
}
return result, nil
}
func (this *Stream) Write(data *alloc.Buffer) error {
if this.closed {
return io.EOF
}
if this.buffer == nil {
return io.EOF
}
this.access.RLock()
defer this.access.RUnlock()
if this.buffer == nil {
return io.EOF
}
this.buffer <- data
return nil
}
func (this *Stream) Close() {
if this.closed {
return
}
this.access.RLock()
defer this.access.RUnlock()
if this.closed {
return
}
this.closed = true
close(this.buffer)
}
func (this *Stream) Release() {
if this.buffer == nil {
return
}
this.Close()
this.access.Lock()
defer this.access.Unlock()
if this.buffer == nil {
return
}
for data := range this.buffer {
data.Release()
}
this.buffer = nil
}

View File

@@ -1,19 +1,19 @@
package ray
import (
"github.com/v2ray/v2ray-core/common/alloc"
v2io "github.com/v2ray/v2ray-core/common/io"
)
// OutboundRay is a transport interface for outbound connections.
type OutboundRay interface {
// OutboundInput provides a stream for the input of the outbound connection.
// The outbound connection shall write all the input until it is closed.
OutboundInput() <-chan *alloc.Buffer
OutboundInput() InputStream
// OutboundOutput provides a stream to retrieve the response from the
// outbound connection. The outbound connection shall close the channel
// after all responses are receivced and put into the channel.
OutboundOutput() chan<- *alloc.Buffer
OutboundOutput() OutputStream
}
// InboundRay is a transport interface for inbound connections.
@@ -21,12 +21,12 @@ type InboundRay interface {
// InboundInput provides a stream to retrieve the request from client.
// The inbound connection shall close the channel after the entire request
// is received and put into the channel.
InboundInput() chan<- *alloc.Buffer
InboundInput() OutputStream
// InboudBound provides a stream of data for the inbound connection to write
// as response. The inbound connection shall write all the data from the
// channel until it is closed.
InboundOutput() <-chan *alloc.Buffer
InboundOutput() InputStream
}
// Ray is an internal tranport channel between inbound and outbound connection.
@@ -34,3 +34,13 @@ type Ray interface {
InboundRay
OutboundRay
}
type InputStream interface {
v2io.Reader
Close()
}
type OutputStream interface {
v2io.Writer
Close()
}