diff --git a/Dockerfile.npc b/Dockerfile.npc new file mode 100755 index 0000000..ae6715a --- /dev/null +++ b/Dockerfile.npc @@ -0,0 +1,10 @@ +FROM golang as builder +WORKDIR /go/src/github.com/cnlh/nps +COPY . . +RUN go get -d -v ./... +RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/npc/npc.go + +FROM scratch +COPY --from=builder /go/src/github.com/cnlh/nps/npc / +VOLUME /conf +ENTRYPOINT ["/npc"] diff --git a/Dockerfile.nps b/Dockerfile.nps new file mode 100755 index 0000000..698ced9 --- /dev/null +++ b/Dockerfile.nps @@ -0,0 +1,11 @@ +FROM golang as builder +WORKDIR /go/src/github.com/cnlh/nps +COPY . . +RUN go get -d -v ./... +RUN CGO_ENABLED=0 go build -ldflags="-w -s -extldflags -static" ./cmd/nps/nps.go + +FROM scratch +COPY --from=builder /go/src/github.com/cnlh/nps/nps / +COPY --from=builder /go/src/github.com/cnlh/nps/web /web +VOLUME /conf +CMD ["/nps"] diff --git a/README.md b/README.md index d1b95af..0f78bdb 100644 --- a/README.md +++ b/README.md @@ -26,6 +26,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 * [安装](#安装) * [编译安装](#源码安装) * [release安装](#release安装) + * [docker安装](#docker安装) * [使用示例(以web主控模式为主)](#使用示例) * [统一准备工作](#统一准备工作(必做)) * [http|https域名解析](#域名解析) @@ -111,6 +112,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 * [获取用户真实ip](#获取用户真实ip) * [客户端地址显示](#客户端地址显示) * [客户端与服务端版本对比](#客户端与服务端版本对比) + * [Linux系统限制](#Linux系统限制) * [webAPI](#webAPI) * [贡献](#贡献) * [支持nps发展](#捐赠) @@ -120,7 +122,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 ## 安装 -### releases安装 +### release安装 > [releases](https://github.com/cnlh/nps/releases) 下载对应的系统版本即可,服务端和客户端是单独的 @@ -133,6 +135,10 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 > go build cmd/npc/npc.go +### docker安装 +> [server](https://hub.docker.com/r/ffdfgdfg/nps) +> [client](https://hub.docker.com/r/ffdfgdfg/npc) + ## 使用示例 ### 统一准备工作(必做) @@ -144,6 +150,7 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 ```shell ./npc -server=1.1.1.1:8284 -vkey=客户端的密钥 ``` +**注意:运行服务端后,请确保能从客户端设备上正常访问配置文件中所配置的`bridge_port`端口,telnet,netcat这类的来检查** ### 域名解析 @@ -197,6 +204,9 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 - 在刚才创建的客户端隧道管理中添加一条socks5代理,填写监听的端口(8003),保存。 - 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理),ip为公网服务器ip(1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了 +**注意** +经过socks5代理,当收到socks5数据包时socket已经是accept状态。表现是扫描端口全open,建立连接后短时间关闭。若想同内网表现一致,建议远程连接一台设备。 + ### http正向代理 **适用范围:** 在外网环境下使用http正向代理访问内网站点 @@ -375,7 +385,13 @@ server { ``` (./nps|nps.exe) install ``` -安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行 +安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行,同时也会添加systemd配置。 + +``` +sudo systemctl enable|disable|start|stop|restart|status nps +``` +systemd,带有开机自启,自动重启配置,当进程结束后15秒会启动,日志输出至/var/log/nps/nps.log。 +建议采用此方式启动,能够捕获panic信息,便于排查问题。 ``` nps test|start|stop|restart|status @@ -432,6 +448,27 @@ server_ip=xxx ``` ./npc -config=npc配置文件路径 ``` +可自行添加systemd service,例如:`npc.service` +``` +[Unit] +Description=npc - convenient proxy server client +Documentation=https://github.com/cnlh/nps/ +After=network-online.target remote-fs.target nss-lookup.target +Wants=network-online.target + +[Service] +Type=simple +KillMode=process +Restart=always +RestartSec=15s +StandardOutput=append:/var/log/nps/npc.log +ExecStartPre=/bin/echo 'Starting npc' +ExecStopPost=/bin/echo 'Stopping npc' +ExecStart=/absolutely path to/npc -server=ip:port -vkey=web界面中显示的密钥 + +[Install] +WantedBy=multi-user.target +``` #### 配置文件说明 [示例配置文件](https://github.com/cnlh/nps/tree/master/conf/npc.conf) ##### 全局配置 @@ -794,6 +831,19 @@ nps支持对客户端的隧道数量进行限制,该功能默认是关闭的 nps主要通信默认基于多路复用,无需开启。 +多路复用基于TCP滑动窗口原理设计,动态计算延迟以及带宽来算出应该往网络管道中打入的流量。 +由于主要通信大多采用TCP协议,并无法探测其实时丢包情况,对于产生丢包重传的情况,采用较大的宽容度, +5分钟的等待时间,超时将会关闭当前隧道连接并重新建立,这将会抛弃当前所有的连接。 +在Linux上,可以通过调节内核参数来适应不同应用场景。 + +对于需求大带宽又有一定的丢包的场景,可以保持默认参数不变,尽可能少抛弃连接 +高并发下可根据[Linux系统限制](#Linux系统限制) 调整 + +对于延迟敏感而又有一定丢包的场景,可以适当调整TCP重传次数 +`tcp_syn_retries`, `tcp_retries1`, `tcp_retries2` +高并发同上 +nps会在系统主动关闭连接的时候拿到报错,进而重新建立隧道连接 + ### 环境变量渲染 npc支持环境变量渲染以适应在某些特殊场景下的要求。 @@ -902,6 +952,11 @@ LevelInformational->6 LevelDebug->7 ### 客户端与服务端版本对比 为了程序正常运行,客户端与服务端的核心版本必须一致,否则将导致客户端无法成功连接致服务端。 +### Linux系统限制 +默认情况下linux对连接数量有限制,对于性能好的机器完全可以调整内核参数以处理更多的连接。 +`tcp_max_syn_backlog` `somaxconn` +酌情调整参数,增强网络性能 + ## webAPI ### webAPI验证说明 diff --git a/client/client.go b/client/client.go index 52da907..5bd0b01 100755 --- a/client/client.go +++ b/client/client.go @@ -49,6 +49,11 @@ retry: time.Sleep(time.Second * 5) goto retry } + if c == nil { + logs.Error("Error data from server, and will be reconnected in five seconds") + time.Sleep(time.Second * 5) + goto retry + } logs.Info("Successful connection with server %s", s.svrAddr) //monitor the connection go s.ping() diff --git a/client/control.go b/client/control.go index 5673f14..3260113 100644 --- a/client/control.go +++ b/client/control.go @@ -223,8 +223,13 @@ func NewConn(tp string, vkey string, server string, connType string, proxyUrl st if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil { return nil, err } - if b, err := c.GetShortContent(32); err != nil || crypt.Md5(version.GetVersion()) != string(b) { - logs.Error("The client does not match the server version. The current version of the client is", version.GetVersion()) + b, err := c.GetShortContent(32) + if err != nil { + logs.Error(err) + return nil, err + } + if crypt.Md5(version.GetVersion()) != string(b) { + logs.Error("The client does not match the server version. The current core version of the client is", version.GetVersion()) return nil, err } if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index f66fe66..22835a2 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -61,7 +61,7 @@ func main() { logs.Error("Getting bridge_port error", err) os.Exit(0) } - logs.Info("the version of server is %s ,allow client version to be %s", version.VERSION, version.GetVersion()) + logs.Info("the version of server is %s ,allow client core version to be %s", version.VERSION, version.GetVersion()) connection.InitConnectionService() crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key")) tool.InitAllowPort() diff --git a/go.mod b/go.mod index 1f6b753..a540dae 100644 --- a/go.mod +++ b/go.mod @@ -10,18 +10,18 @@ require ( github.com/go-ole/go-ole v1.2.4 // indirect github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/klauspost/cpuid v1.2.1 // indirect - github.com/klauspost/reedsolomon v1.9.2 + github.com/klauspost/reedsolomon v1.9.2 // indirect github.com/onsi/gomega v1.5.0 // indirect + github.com/panjf2000/ants/v2 v2.2.2 github.com/pkg/errors v0.8.0 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect github.com/shirou/gopsutil v2.18.12+incompatible github.com/stretchr/testify v1.3.0 // indirect github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect - github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b - github.com/tjfoc/gmsm v1.0.1 + github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b // indirect + github.com/tjfoc/gmsm v1.0.1 // indirect github.com/xtaci/kcp-go v5.4.4+incompatible github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect - golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect ) diff --git a/go.sum b/go.sum index d57b283..8a74eb6 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,6 @@ github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= github.com/OwnLocal/goes v1.0.0/go.mod h1:8rIFjBGTue3lCU0wplczcUgt9Gxgrkkrw7etMIcn8TM= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/astaxie/beego v1.12.0 h1:MRhVoeeye5N+Flul5PoVfD9CslfdoH+xqC/xvSQ5u2Y= github.com/astaxie/beego v1.12.0/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o= @@ -15,14 +16,17 @@ github.com/couchbase/go-couchbase v0.0.0-20181122212707-3e9b6e1258bb/go.mod h1:T github.com/couchbase/gomemcached v0.0.0-20181122193126-5125a94a666c/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c= github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs= github.com/cupcake/rdb v0.0.0-20161107195141-43ba34106c76/go.mod h1:vYwsqCOLxGiisLwp9rITslkFNpZD5rz43tf41QFkTWY= +github.com/davecgh/go-spew v1.1.0 h1:ZDRjVQ15GmhC3fiQ8ni8+OwkZQO4DARzQgrnXU1Liz8= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/edsrzf/mmap-go v0.0.0-20170320065105-0bce6a688712/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= +github.com/elazarl/go-bindata-assetfs v1.0.0 h1:G/bYguwHIzWq9ZoyUQqrjTmJbbYn3j3CKKpKinvZLFk= github.com/elazarl/go-bindata-assetfs v1.0.0/go.mod h1:v+YaWX3bdea5J/mo8dSETolEo7R71Vk1u8bnjau5yw4= github.com/exfly/beego v1.12.0-export-init h1:VQNYKdXhAwZGUaFmQv8Aj921O3rQJZRIF8xeGrhsjrI= github.com/exfly/beego v1.12.0-export-init/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o= github.com/exfly/beego v1.12.0 h1:OXwIwngaAx35Mga+jLiZmArusBxj8/H0jYXzGDAdwOg= github.com/exfly/beego v1.12.0/go.mod h1:fysx+LZNZKnvh4GED/xND7jWtjCR6HzydR2Hh2Im57o= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-redis/redis v6.14.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= @@ -40,9 +44,12 @@ github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/panjf2000/ants/v2 v2.2.2 h1:TWzusBjq/IflXhy+/S6u5wmMLCBdJnB9tPIx9Zmhvok= +github.com/panjf2000/ants/v2 v2.2.2/go.mod h1:1GFm8bV8nyCQvU5K4WvBCTG1/YBFOD2VzjffD8fV55A= github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo= github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644/go.mod h1:nkxAfR/5quYxwPZhyDxgasBMnRtBZd0FCEpawpjMUFg= @@ -53,6 +60,7 @@ github.com/siddontang/ledisdb v0.0.0-20181029004158-becf5f38d373/go.mod h1:mF1Dp github.com/siddontang/rdb v0.0.0-20150307021120-fc89ed2e418d/go.mod h1:AMEsy7v5z92TR1JKMkLLoaOQk++LVnOKL3ScbJ8GNGA= github.com/ssdb/gossdb v0.0.0-20180723034631-88f6b59b84ec/go.mod h1:QBvMkMya+gXctz3kmljlUCu/yB3GZ6oee+dUozsezQE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU= @@ -64,6 +72,7 @@ github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/ github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= github.com/xtaci/kcp-go v5.4.4+incompatible h1:QIJ0a0Q0N1G20yLHL2+fpdzyy2v/Cb3PI+xiwx/KK9c= github.com/xtaci/kcp-go v5.4.4+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE= +github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 h1:et7+NAX3lLIk5qUCTA9QelBjGE/NkhzYw/mhnr0s7nI= golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= @@ -75,6 +84,7 @@ golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= diff --git a/lib/common/const.go b/lib/common/const.go index ffb2fa6..2fd5bb6 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -36,3 +36,19 @@ WWW-Authenticate: Basic realm="easyProxy" ` ) + +const ( + MUX_PING_FLAG uint8 = iota + MUX_NEW_CONN_OK + MUX_NEW_CONN_Fail + MUX_NEW_MSG + MUX_NEW_MSG_PART + MUX_MSG_SEND_OK + MUX_NEW_CONN + MUX_CONN_CLOSE + MUX_PING_RETURN + MUX_PING int32 = -1 + MAXIMUM_SEGMENT_SIZE = PoolSizeWindow + MAXIMUM_WINDOW_SIZE = 1 << 25 // 1<<31-1 TCP slide window size is very large, + // we use 32M, reduce memory usage +) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go new file mode 100644 index 0000000..91eeb98 --- /dev/null +++ b/lib/common/netpackager.go @@ -0,0 +1,235 @@ +package common + +import ( + "bytes" + "encoding/binary" + "encoding/json" + "errors" + "io" + "strings" +) + +type NetPackager interface { + Pack(writer io.Writer) (err error) + UnPack(reader io.Reader) (err error) +} + +type BasePackager struct { + Length uint16 + Content []byte +} + +func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { + Self.clean() + for _, content := range contents { + switch content.(type) { + case nil: + Self.Content = Self.Content[:0] + case []byte: + err = Self.appendByte(content.([]byte)) + case string: + err = Self.appendByte([]byte(content.(string))) + if err != nil { + return + } + err = Self.appendByte([]byte(CONN_DATA_SEQ)) + default: + err = Self.marshal(content) + } + } + Self.setLength() + return +} + +func (Self *BasePackager) appendByte(data []byte) (err error) { + m := len(Self.Content) + n := m + len(data) + if n <= cap(Self.Content) { + Self.Content = Self.Content[0:n] // grow the length for copy + copy(Self.Content[m:n], data) + return nil + } else { + return errors.New("pack content too large") + } +} + +//似乎这里涉及到父类作用域问题,当子类调用父类的方法时,其struct仅仅为父类的 +func (Self *BasePackager) Pack(writer io.Writer) (err error) { + err = binary.Write(writer, binary.LittleEndian, Self.Length) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.Content) + return +} + +//Unpack 会导致传入的数字类型转化成float64!! +//主要原因是json unmarshal并未传入正确的数据类型 +func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { + Self.clean() + n += 2 // uint16 + err = binary.Read(reader, binary.LittleEndian, &Self.Length) + if err != nil { + return + } + if int(Self.Length) > cap(Self.Content) { + err = errors.New("unpack err, content length too large") + } + Self.Content = Self.Content[:int(Self.Length)] + //n, err := io.ReadFull(reader, Self.Content) + //if n != int(Self.Length) { + // err = io.ErrUnexpectedEOF + //} + err = binary.Read(reader, binary.LittleEndian, Self.Content) + n += Self.Length + return +} + +func (Self *BasePackager) marshal(content interface{}) (err error) { + tmp, err := json.Marshal(content) + if err != nil { + return err + } + err = Self.appendByte(tmp) + return +} + +func (Self *BasePackager) Unmarshal(content interface{}) (err error) { + err = json.Unmarshal(Self.Content, content) + if err != nil { + return err + } + return +} + +func (Self *BasePackager) setLength() { + Self.Length = uint16(len(Self.Content)) + return +} + +func (Self *BasePackager) clean() { + Self.Length = 0 + Self.Content = Self.Content[:0] // reset length +} + +func (Self *BasePackager) Split() (strList []string) { + n := bytes.IndexByte(Self.Content, 0) + strList = strings.Split(string(Self.Content[:n]), CONN_DATA_SEQ) + strList = strList[0 : len(strList)-1] + return +} + +type ConnPackager struct { // Todo + ConnType uint8 + BasePackager +} + +func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { + Self.ConnType = connType + err = Self.BasePackager.NewPac(content...) + return +} + +func (Self *ConnPackager) Pack(writer io.Writer) (err error) { + err = binary.Write(writer, binary.LittleEndian, Self.ConnType) + if err != nil { + return + } + err = Self.BasePackager.Pack(writer) + return +} + +func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { + err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) + if err != nil && err != io.EOF { + return + } + n, err = Self.BasePackager.UnPack(reader) + n += 2 + return +} + +type MuxPackager struct { + Flag uint8 + Id int32 + Window uint32 + ReadLength uint32 + BasePackager +} + +func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { + Self.Flag = flag + Self.Id = id + switch flag { + case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: + Self.Content = WindowBuff.Get() + err = Self.BasePackager.NewPac(content...) + //logs.Warn(Self.Length, string(Self.Content)) + case MUX_MSG_SEND_OK: + // MUX_MSG_SEND_OK contains two data + switch content[0].(type) { + case int: + Self.Window = uint32(content[0].(int)) + case uint32: + Self.Window = content[0].(uint32) + } + switch content[1].(type) { + case int: + Self.ReadLength = uint32(content[1].(int)) + case uint32: + Self.ReadLength = content[1].(uint32) + } + } + return +} + +func (Self *MuxPackager) Pack(writer io.Writer) (err error) { + err = binary.Write(writer, binary.LittleEndian, Self.Flag) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.Id) + if err != nil { + return + } + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: + err = Self.BasePackager.Pack(writer) + WindowBuff.Put(Self.Content) + case MUX_MSG_SEND_OK: + err = binary.Write(writer, binary.LittleEndian, Self.Window) + if err != nil { + return + } + err = binary.Write(writer, binary.LittleEndian, Self.ReadLength) + } + return +} + +func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { + err = binary.Read(reader, binary.LittleEndian, &Self.Flag) + if err != nil { + return + } + err = binary.Read(reader, binary.LittleEndian, &Self.Id) + if err != nil { + return + } + switch Self.Flag { + case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: + Self.Content = WindowBuff.Get() // need get a window buf from pool + Self.BasePackager.clean() // also clean the content + n, err = Self.BasePackager.UnPack(reader) + //logs.Warn("unpack", Self.Length, string(Self.Content)) + case MUX_MSG_SEND_OK: + err = binary.Read(reader, binary.LittleEndian, &Self.Window) + if err != nil { + return + } + n += 4 // uint32 + err = binary.Read(reader, binary.LittleEndian, &Self.ReadLength) + n += 4 // uint32 + } + n += 5 //uint8 int32 + return +} diff --git a/lib/common/pool.go b/lib/common/pool.go new file mode 100644 index 0000000..1f7a47e --- /dev/null +++ b/lib/common/pool.go @@ -0,0 +1,199 @@ +package common + +import ( + "bytes" + "sync" +) + +const PoolSize = 64 * 1024 +const PoolSizeSmall = 100 +const PoolSizeUdp = 1472 +const PoolSizeCopy = 32 << 10 +const PoolSizeBuffer = 4096 +const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1 + +var BufPool = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSize) + }, +} + +var BufPoolUdp = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSizeUdp) + }, +} +var BufPoolMax = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSize) + }, +} +var BufPoolSmall = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSizeSmall) + }, +} +var BufPoolCopy = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSizeCopy) + }, +} + +func PutBufPoolUdp(buf []byte) { + if cap(buf) == PoolSizeUdp { + BufPoolUdp.Put(buf[:PoolSizeUdp]) + } +} + +func PutBufPoolCopy(buf []byte) { + if cap(buf) == PoolSizeCopy { + BufPoolCopy.Put(buf[:PoolSizeCopy]) + } +} + +func GetBufPoolCopy() []byte { + return (BufPoolCopy.Get().([]byte))[:PoolSizeCopy] +} + +func PutBufPoolMax(buf []byte) { + if cap(buf) == PoolSize { + BufPoolMax.Put(buf[:PoolSize]) + } +} + +type copyBufferPool struct { + pool sync.Pool +} + +func (Self *copyBufferPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSizeCopy, PoolSizeCopy) + }, + } +} + +func (Self *copyBufferPool) Get() []byte { + buf := Self.pool.Get().([]byte) + return buf[:PoolSizeCopy] // just like make a new slice, but data may not be 0 +} + +func (Self *copyBufferPool) Put(x []byte) { + if len(x) == PoolSizeCopy { + Self.pool.Put(x) + } else { + x = nil // buf is not full, not allowed, New method returns a full buf + } +} + +type windowBufferPool struct { + pool sync.Pool +} + +func (Self *windowBufferPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + return make([]byte, PoolSizeWindow, PoolSizeWindow) + }, + } +} + +func (Self *windowBufferPool) Get() (buf []byte) { + buf = Self.pool.Get().([]byte) + return buf[:PoolSizeWindow] +} + +func (Self *windowBufferPool) Put(x []byte) { + Self.pool.Put(x[:PoolSizeWindow]) // make buf to full +} + +type bufferPool struct { + pool sync.Pool +} + +func (Self *bufferPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer)) + }, + } +} + +func (Self *bufferPool) Get() *bytes.Buffer { + return Self.pool.Get().(*bytes.Buffer) +} + +func (Self *bufferPool) Put(x *bytes.Buffer) { + x.Reset() + Self.pool.Put(x) +} + +type muxPackagerPool struct { + pool sync.Pool +} + +func (Self *muxPackagerPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + pack := MuxPackager{} + return &pack + }, + } +} + +func (Self *muxPackagerPool) Get() *MuxPackager { + return Self.pool.Get().(*MuxPackager) +} + +func (Self *muxPackagerPool) Put(pack *MuxPackager) { + Self.pool.Put(pack) +} + +type ListElement struct { + Buf []byte + L uint16 + Part bool +} + +type listElementPool struct { + pool sync.Pool +} + +func (Self *listElementPool) New() { + Self.pool = sync.Pool{ + New: func() interface{} { + element := ListElement{} + return &element + }, + } +} + +func (Self *listElementPool) Get() *ListElement { + return Self.pool.Get().(*ListElement) +} + +func (Self *listElementPool) Put(element *ListElement) { + element.L = 0 + element.Buf = nil + element.Part = false + Self.pool.Put(element) +} + +var once = sync.Once{} +var BuffPool = bufferPool{} +var CopyBuff = copyBufferPool{} +var MuxPack = muxPackagerPool{} +var WindowBuff = windowBufferPool{} +var ListElementPool = listElementPool{} + +func newPool() { + BuffPool.New() + CopyBuff.New() + MuxPack.New() + WindowBuff.New() + ListElementPool.New() +} + +func init() { + once.Do(newPool) +} diff --git a/lib/common/util.go b/lib/common/util.go index 41f8b3c..9a60846 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -16,7 +16,6 @@ import ( "sync" "github.com/cnlh/nps/lib/crypt" - "github.com/cnlh/nps/lib/pool" ) //Get the corresponding IP address through domain name @@ -109,6 +108,9 @@ func ChangeHostAndHeader(r *http.Request, host string, header string, addr strin } } addr = strings.Split(addr, ":")[0] + if prior, ok := r.Header["X-Forwarded-For"]; ok { + addr = strings.Join(prior, ", ") + ", " + addr + } r.Header.Set("X-Forwarded-For", addr) r.Header.Set("X-Real-IP", addr) } @@ -264,11 +266,14 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { - buf := pool.GetBufPoolCopy() - defer pool.PutBufPoolCopy(buf) +func CopyBuffer(dst io.Writer, src io.Reader, label ...string) (written int64, err error) { + buf := CopyBuff.Get() + defer CopyBuff.Put(buf) for { nr, er := src.Read(buf) + //if len(pr)>0 && pr[0] && nr > 50 { + // logs.Warn(string(buf[:50])) + //} if nr > 0 { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { @@ -284,9 +289,7 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { } } if er != nil { - if er != io.EOF { - err = er - } + err = er break } } diff --git a/lib/conn/conn.go b/lib/conn/conn.go index c60941e..9f0c397 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -6,20 +6,20 @@ import ( "encoding/binary" "encoding/json" "errors" + "github.com/astaxie/beego/logs" + "github.com/cnlh/nps/lib/goroutine" "io" "net" "net/http" "net/url" "strconv" "strings" - "sync" "time" "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/crypt" "github.com/cnlh/nps/lib/file" "github.com/cnlh/nps/lib/mux" - "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/lib/rate" "github.com/xtaci/kcp-go" ) @@ -159,8 +159,8 @@ func (s *Conn) SendHealthInfo(info, status string) (int, error) { //get health info from conn func (s *Conn) GetHealthInfo() (info string, status bool, err error) { var l int - buf := pool.BufPoolMax.Get().([]byte) - defer pool.PutBufPoolMax(buf) + buf := common.BufPoolMax.Get().([]byte) + defer common.PutBufPoolMax(buf) if l, err = s.GetLen(); err != nil { return } else if _, err = s.ReadLen(l, buf); err != nil { @@ -233,8 +233,8 @@ func (s *Conn) SendInfo(t interface{}, flag string) (int, error) { //get task info func (s *Conn) getInfo(t interface{}) (err error) { var l int - buf := pool.BufPoolMax.Get().([]byte) - defer pool.PutBufPoolMax(buf) + buf := common.BufPoolMax.Get().([]byte) + defer common.PutBufPoolMax(buf) if l, err = s.GetLen(); err != nil { return } else if _, err = s.ReadLen(l, buf); err != nil { @@ -351,25 +351,29 @@ func SetUdpSession(sess *kcp.UDPSession) { //conn1 mux conn func CopyWaitGroup(conn1, conn2 net.Conn, crypt bool, snappy bool, rate *rate.Rate, flow *file.Flow, isServer bool, rb []byte) { - var in, out int64 - var wg sync.WaitGroup + //var in, out int64 + //var wg sync.WaitGroup connHandle := GetConn(conn1, crypt, snappy, rate, isServer) if rb != nil { connHandle.Write(rb) } - go func(in *int64) { - wg.Add(1) - *in, _ = common.CopyBuffer(connHandle, conn2) - connHandle.Close() - conn2.Close() - wg.Done() - }(&in) - out, _ = common.CopyBuffer(conn2, connHandle) - connHandle.Close() - conn2.Close() - wg.Wait() - if flow != nil { - flow.Add(in, out) + //go func(in *int64) { + // wg.Add(1) + // *in, _ = common.CopyBuffer(connHandle, conn2) + // connHandle.Close() + // conn2.Close() + // wg.Done() + //}(&in) + //out, _ = common.CopyBuffer(conn2, connHandle) + //connHandle.Close() + //conn2.Close() + //wg.Wait() + //if flow != nil { + // flow.Add(in, out) + //} + err := goroutine.CopyConnsPool.Invoke(goroutine.NewConns(connHandle, conn2, flow)) + if err != nil { + logs.Error(err) } } diff --git a/lib/conn/listener.go b/lib/conn/listener.go index f80e01d..bd8e443 100644 --- a/lib/conn/listener.go +++ b/lib/conn/listener.go @@ -43,9 +43,16 @@ func Accept(l net.Listener, f func(c net.Conn)) { if strings.Contains(err.Error(), "use of closed network connection") { break } + if strings.Contains(err.Error(), "the mux has closed") { + break + } logs.Warn(err) continue } + if c == nil { + logs.Warn("nil connection") + break + } go f(c) } } diff --git a/lib/conn/snappy.go b/lib/conn/snappy.go index a655627..3f22aa0 100644 --- a/lib/conn/snappy.go +++ b/lib/conn/snappy.go @@ -3,7 +3,7 @@ package conn import ( "io" - "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/lib/common" "github.com/golang/snappy" ) @@ -32,8 +32,8 @@ func (s *SnappyConn) Write(b []byte) (n int, err error) { //snappy压缩读 func (s *SnappyConn) Read(b []byte) (n int, err error) { - buf := pool.BufPool.Get().([]byte) - defer pool.BufPool.Put(buf) + buf := common.BufPool.Get().([]byte) + defer common.BufPool.Put(buf) if n, err = s.r.Read(buf); err != nil { return } diff --git a/lib/goroutine/pool.go b/lib/goroutine/pool.go new file mode 100644 index 0000000..287c711 --- /dev/null +++ b/lib/goroutine/pool.go @@ -0,0 +1,73 @@ +package goroutine + +import ( + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/file" + "github.com/panjf2000/ants/v2" + "io" + "net" + "sync" +) + +type connGroup struct { + src io.ReadWriteCloser + dst io.ReadWriteCloser + wg *sync.WaitGroup + n *int64 +} + +func newConnGroup(dst, src io.ReadWriteCloser, wg *sync.WaitGroup, n *int64) connGroup { + return connGroup{ + src: src, + dst: dst, + wg: wg, + n: n, + } +} + +func copyConnGroup(group interface{}) { + cg, ok := group.(connGroup) + if !ok { + return + } + var err error + *cg.n, err = common.CopyBuffer(cg.dst, cg.src) + if err != nil { + cg.src.Close() + cg.dst.Close() + //logs.Warn("close npc by copy from nps", err, c.connId) + } + cg.wg.Done() +} + +type Conns struct { + conn1 io.ReadWriteCloser // mux connection + conn2 net.Conn // outside connection + flow *file.Flow +} + +func NewConns(c1 io.ReadWriteCloser, c2 net.Conn, flow *file.Flow) Conns { + return Conns{ + conn1: c1, + conn2: c2, + flow: flow, + } +} + +func copyConns(group interface{}) { + conns := group.(Conns) + wg := new(sync.WaitGroup) + wg.Add(2) + var in, out int64 + _ = connCopyPool.Invoke(newConnGroup(conns.conn1, conns.conn2, wg, &in)) + // outside to mux : incoming + _ = connCopyPool.Invoke(newConnGroup(conns.conn2, conns.conn1, wg, &out)) + // mux to outside : outgoing + wg.Wait() + if conns.flow != nil { + conns.flow.Add(in, out) + } +} + +var connCopyPool, _ = ants.NewPoolWithFunc(200000, copyConnGroup, ants.WithNonblocking(false)) +var CopyConnsPool, _ = ants.NewPoolWithFunc(100000, copyConns, ants.WithNonblocking(false)) diff --git a/lib/install/install.go b/lib/install/install.go index 56f3cc5..24af9b9 100644 --- a/lib/install/install.go +++ b/lib/install/install.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os" "path/filepath" @@ -13,6 +14,23 @@ import ( ) func InstallNps() { + unit := `[Unit] +Description=nps - convenient proxy server +Documentation=https://github.com/cnlh/nps/ +After=network-online.target remote-fs.target nss-lookup.target +Wants=network-online.target` + service := `[Service] +Type=simple +KillMode=process +Restart=always +RestartSec=15s +StandardOutput=append:/var/log/nps/nps.log +ExecStartPre=/bin/echo 'Starting nps' +ExecStopPost=/bin/echo 'Stopping nps' +ExecStart=` + install := `[Install] +WantedBy=multi-user.target` + path := common.GetInstallPath() if common.FileExists(path) { log.Fatalf("the path %s has exist, does not support install", path) @@ -35,21 +53,35 @@ func InstallNps() { log.Fatalln(err) } else { os.Chmod("/usr/local/bin/nps", 0755) + service += "/usr/local/bin/nps" log.Println("Executable files have been copied to", "/usr/local/bin/nps") } } else { os.Chmod("/usr/bin/nps", 0755) + service += "/usr/bin/nps" log.Println("Executable files have been copied to", "/usr/bin/nps") } - + systemd := unit + "\n\n" + service + "\n\n" + install + _ = os.Remove("/usr/lib/systemd/system/nps.service") + err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644) + if err != nil { + log.Println("Write systemd service err ", err) + } + _ = os.Mkdir("/var/log/nps", 644) } log.Println("install ok!") log.Println("Static files and configuration files in the current directory will be useless") log.Println("The new configuration file is located in", path, "you can edit them") if !common.IsWindows() { - log.Println("You can start with nps test|start|stop|restart|status anywhere") + log.Println(`You can start with: +sudo systemctl enable|disable|start|stop|restart|status nps +or: +nps test|start|stop|restart|status +anywhere!`) } else { - log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status") + log.Println(`You can copy executable files to any directory and start working with: +nps.exe test|start|stop|restart|status +now!`) } } func MkidrDirAll(path string, v ...string) { diff --git a/lib/mux/bytes.go b/lib/mux/bytes.go index a7e17f7..c44bad4 100644 --- a/lib/mux/bytes.go +++ b/lib/mux/bytes.go @@ -20,7 +20,7 @@ func WriteLenBytes(buf []byte, w io.Writer) (int, error) { //read bytes by length func ReadLenBytes(buf []byte, r io.Reader) (int, error) { - var l int32 + var l uint32 var err error if binary.Read(r, binary.LittleEndian, &l) != nil { return 0, err diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 5016771..f665248 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -3,11 +3,14 @@ package mux import ( "errors" "io" + "math" "net" + "runtime" "sync" + "sync/atomic" "time" - "github.com/cnlh/nps/lib/pool" + "github.com/cnlh/nps/lib/common" ) type conn struct { @@ -15,34 +18,36 @@ type conn struct { getStatusCh chan struct{} connStatusOkCh chan struct{} connStatusFailCh chan struct{} - readTimeOut time.Time - writeTimeOut time.Time - readBuffer []byte - startRead int //now read position - endRead int //now end read - readFlag bool - readCh chan struct{} - waitQueue *sliceEntry - stopWrite bool connId int32 isClose bool - readWait bool - hasWrite int - mux *Mux + closeFlag bool // close conn flag + receiveWindow *ReceiveWindow + sendWindow *SendWindow + once sync.Once + //label string } -var connPool = sync.Pool{} - -func NewConn(connId int32, mux *Mux) *conn { +func NewConn(connId int32, mux *Mux, label ...string) *conn { c := &conn{ - readCh: make(chan struct{}), getStatusCh: make(chan struct{}), connStatusOkCh: make(chan struct{}), connStatusFailCh: make(chan struct{}), - waitQueue: NewQueue(), connId: connId, - mux: mux, + receiveWindow: new(ReceiveWindow), + sendWindow: new(SendWindow), + once: sync.Once{}, } + //if len(label) > 0 { + // c.label = label[0] + //} + c.receiveWindow.New(mux) + c.sendWindow.New(mux) + //logm := &connLog{ + // startTime: time.Now(), + // isClose: false, + // logs: []string{c.label + "new conn success"}, + //} + //setM(label[0], int(connId), logm) return c } @@ -50,135 +55,581 @@ func (s *conn) Read(buf []byte) (n int, err error) { if s.isClose || buf == nil { return 0, errors.New("the conn has closed") } - if s.endRead-s.startRead == 0 { //read finish or start - if s.waitQueue.Size() == 0 { - s.readWait = true - if t := s.readTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - s.readWait = false - return 0, errors.New("read timeout") - case <-s.readCh: - } - } else { - <-s.readCh - } - } - if s.isClose { //If the connection is closed instead of continuing command - return 0, errors.New("the conn has closed") - } - if node, err := s.waitQueue.Pop(); err != nil { - s.Close() - return 0, io.EOF - } else { - pool.PutBufPoolCopy(s.readBuffer) - s.readBuffer = node.val - s.endRead = node.l - s.startRead = 0 - } - } - if len(buf) < s.endRead-s.startRead { - n = copy(buf, s.readBuffer[s.startRead:s.startRead+len(buf)]) - s.startRead += n - } else { - n = copy(buf, s.readBuffer[s.startRead:s.endRead]) - s.startRead += n - s.mux.sendInfo(MUX_MSG_SEND_OK, s.connId, nil) + if len(buf) == 0 { + return 0, nil } + // waiting for takeout from receive window finish or timeout + //now := time.Now() + n, err = s.receiveWindow.Read(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + //logs.Warn("conn read long", n, t.Seconds()) + //} + //var errstr string + //if err == nil { + // errstr = "err:nil" + //} else { + // errstr = err.Error() + //} + //d := getM(s.label, int(s.connId)) + //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) + //setM(s.label, int(s.connId), d) return } -func (s *conn) Write(buf []byte) (int, error) { +func (s *conn) Write(buf []byte) (n int, err error) { if s.isClose { return 0, errors.New("the conn has closed") } - ch := make(chan struct{}) - go s.write(buf, ch) - if t := s.writeTimeOut.Sub(time.Now()); t > 0 { - timer := time.NewTimer(t) - defer timer.Stop() - select { - case <-timer.C: - return 0, errors.New("write timeout") - case <-ch: - } - } else { - <-ch + if s.closeFlag { + //s.Close() + return 0, errors.New("io: write on closed conn") } - if s.isClose { - return 0, io.EOF + if len(buf) == 0 { + return 0, nil } - return len(buf), nil -} -func (s *conn) write(buf []byte, ch chan struct{}) { - start := 0 - l := len(buf) - for { - if s.hasWrite > 50 { - <-s.getStatusCh - } - s.hasWrite++ - if l-start > pool.PoolSizeCopy { - s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:start+pool.PoolSizeCopy]) - start += pool.PoolSizeCopy - } else { - s.mux.sendInfo(MUX_NEW_MSG, s.connId, buf[start:l]) - break - } - } - ch <- struct{}{} + //logs.Warn("write buf", len(buf)) + //now := time.Now() + n, err = s.sendWindow.WriteFull(buf, s.connId) + //t := time.Now().Sub(now) + //if t.Seconds() > 0.5 { + // logs.Warn("conn write long", n, t.Seconds()) + //} + return } -func (s *conn) Close() error { - if s.isClose { - return errors.New("the conn has closed") - } - times := 0 -retry: - if s.waitQueue.Size() > 0 && times < 600 { - time.Sleep(time.Millisecond * 100) - times++ - goto retry - } - if s.isClose { - return errors.New("the conn has closed") - } +func (s *conn) Close() (err error) { + s.once.Do(s.closeProcess) + return +} + +func (s *conn) closeProcess() { s.isClose = true - pool.PutBufPoolCopy(s.readBuffer) - if s.readWait { - s.readCh <- struct{}{} + s.receiveWindow.mux.connMap.Delete(s.connId) + if !s.receiveWindow.mux.IsClose { + // if server or user close the conn while reading, will get a io.EOF + // and this Close method will be invoke, send this signal to close other side + s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) } - s.waitQueue.Clear() - s.mux.connMap.Delete(s.connId) - if !s.mux.IsClose { - s.mux.sendInfo(MUX_CONN_CLOSE, s.connId, nil) - } - connPool.Put(s) - return nil + s.sendWindow.CloseWindow() + s.receiveWindow.CloseWindow() + //d := getM(s.label, int(s.connId)) + //d.isClose = true + //d.logs = append(d.logs, s.label+"close "+time.Now().String()) + //setM(s.label, int(s.connId), d) + return } func (s *conn) LocalAddr() net.Addr { - return s.mux.conn.LocalAddr() + return s.receiveWindow.mux.conn.LocalAddr() } func (s *conn) RemoteAddr() net.Addr { - return s.mux.conn.RemoteAddr() + return s.receiveWindow.mux.conn.RemoteAddr() } func (s *conn) SetDeadline(t time.Time) error { - s.readTimeOut = t - s.writeTimeOut = t + _ = s.SetReadDeadline(t) + _ = s.SetWriteDeadline(t) return nil } func (s *conn) SetReadDeadline(t time.Time) error { - s.readTimeOut = t + s.receiveWindow.SetTimeOut(t) return nil } func (s *conn) SetWriteDeadline(t time.Time) error { - s.writeTimeOut = t + s.sendWindow.SetTimeOut(t) return nil } + +type window struct { + remainingWait uint64 // 64bit alignment + off uint32 + maxSize uint32 + closeOp bool + closeOpCh chan struct{} + mux *Mux +} + +func (Self *window) unpack(ptrs uint64) (remaining, wait uint32) { + const mask = 1<> dequeueBits) & mask) + wait = uint32(ptrs & mask) + return +} + +func (Self *window) pack(remaining, wait uint32) uint64 { + const mask = 1< 0 { + n = uint32(l) + } + return +} + +func (Self *ReceiveWindow) calcSize() { + // calculating maximum receive window size + if Self.count == 0 { + //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) + conns := Self.mux.connMap.Size() + n := uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * + Self.mux.bw.Get() / float64(conns)) + if n < common.MAXIMUM_SEGMENT_SIZE*10 { + n = common.MAXIMUM_SEGMENT_SIZE * 10 + } + bufLen := Self.bufQueue.Len() + if n < bufLen { + n = bufLen + } + if n < Self.maxSize/2 { + n = Self.maxSize / 2 + } + // set the minimal size + if n > 2*Self.maxSize { + n = 2 * Self.maxSize + } + if n > (common.MAXIMUM_WINDOW_SIZE / uint32(conns)) { + n = common.MAXIMUM_WINDOW_SIZE / uint32(conns) + } + // set the maximum size + //logs.Warn("n", n) + atomic.StoreUint32(&Self.maxSize, n) + Self.count = -10 + } + Self.count += 1 + return +} + +func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { + if Self.closeOp { + return errors.New("conn.receiveWindow: write on closed window") + } + element, err := NewListElement(buf, l, part) + //logs.Warn("push the buf", len(buf), l, (&element).l) + if err != nil { + return + } + Self.calcSize() // calculate the max window size + var wait uint32 +start: + ptrs := atomic.LoadUint64(&Self.remainingWait) + _, wait = Self.unpack(ptrs) + newRemaining := Self.remainingSize(l) + // calculate the remaining window size now, plus the element we will push + if newRemaining == 0 { + //logs.Warn("window full true", remaining) + wait = 1 + } + if !atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(0, wait)) { + goto start + // another goroutine change the status, make sure shall we need wait + } + Self.bufQueue.Push(element) + // status check finish, now we can push the element into the queue + if wait == 0 { + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.maxSize, newRemaining) + // send the remaining window size, not including zero size + } + return nil +} + +func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { + if Self.closeOp { + return 0, io.EOF // receive close signal, returns eof + } + pOff := 0 + l := 0 + //logs.Warn("receive window read off, element.l", Self.off, Self.element.l) +copyData: + if Self.off == uint32(Self.element.L) { + // on the first Read method invoked, Self.off and Self.element.l + // both zero value + common.ListElementPool.Put(Self.element) + if Self.closeOp { + return 0, io.EOF + } + Self.element, err = Self.bufQueue.Pop() + // if the queue is empty, Pop method will wait until one element push + // into the queue successful, or timeout. + // timer start on timeout parameter is set up , + // reset to 60s if timeout and data still available + Self.off = 0 + if err != nil { + return // queue receive stop or time out, break the loop and return + } + //logs.Warn("pop element", Self.element.l, Self.element.part) + } + l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L]) + pOff += l + Self.off += uint32(l) + //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) + n += l + l = 0 + if Self.off == uint32(Self.element.L) { + //logs.Warn("put the element end ", string(Self.element.buf[:15])) + common.WindowBuff.Put(Self.element.Buf) + Self.sendStatus(id, Self.element.L) + // check the window full status + } + if pOff < len(p) && Self.element.Part { + // element is a part of the segments, trying to fill up buf p + goto copyData + } + return // buf p is full or all of segments in buf, return +} + +func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { + var remaining, wait uint32 + for { + ptrs := atomic.LoadUint64(&Self.remainingWait) + remaining, wait = Self.unpack(ptrs) + remaining += uint32(l) + if atomic.CompareAndSwapUint64(&Self.remainingWait, ptrs, Self.pack(remaining, 0)) { + break + } + runtime.Gosched() + // another goroutine change remaining or wait status, make sure + // we need acknowledge other side + } + // now we get the current window status success + if wait == 1 { + //logs.Warn("send the wait status", remaining) + Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, atomic.LoadUint32(&Self.maxSize), remaining) + } + return +} + +func (Self *ReceiveWindow) SetTimeOut(t time.Time) { + // waiting for FIFO queue Pop method + Self.bufQueue.SetTimeOut(t) +} + +func (Self *ReceiveWindow) Stop() { + // queue has no more data to push, so unblock pop method + Self.once.Do(Self.bufQueue.Stop) +} + +func (Self *ReceiveWindow) CloseWindow() { + Self.window.CloseWindow() + Self.Stop() + Self.release() +} + +func (Self *ReceiveWindow) release() { + //if Self.element != nil { + // if Self.element.Buf != nil { + // common.WindowBuff.Put(Self.element.Buf) + // } + // common.ListElementPool.Put(Self.element) + //} + for { + Self.element = Self.bufQueue.TryPop() + if Self.element == nil { + return + } + if Self.element.Buf != nil { + common.WindowBuff.Put(Self.element.Buf) + } + common.ListElementPool.Put(Self.element) + } // release resource +} + +type SendWindow struct { + window + buf []byte + setSizeCh chan struct{} + timeout time.Time +} + +func (Self *SendWindow) New(mux *Mux) { + Self.setSizeCh = make(chan struct{}) + Self.maxSize = common.MAXIMUM_SEGMENT_SIZE * 10 + atomic.AddUint64(&Self.remainingWait, uint64(common.MAXIMUM_SEGMENT_SIZE*10)< common.MAXIMUM_SEGMENT_SIZE { + sendSize = common.MAXIMUM_SEGMENT_SIZE + //logs.Warn("cut buf by mss") + } else { + sendSize = uint32(len(Self.buf[Self.off:])) + } + if remaining < sendSize { + // usable window size is small than + // window MAXIMUM_SEGMENT_SIZE or send buf left + sendSize = remaining + //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) + } + //logs.Warn("send size", sendSize) + if sendSize < uint32(len(Self.buf[Self.off:])) { + part = true + } + p = Self.buf[Self.off : sendSize+Self.off] + Self.off += sendSize + Self.sent(sendSize) + return +} + +func (Self *SendWindow) waitReceiveWindow() (err error) { + t := Self.timeout.Sub(time.Now()) + if t < 0 { + t = time.Minute * 5 + } + timer := time.NewTimer(t) + defer timer.Stop() + // waiting for receive usable window size, or timeout + select { + case _, ok := <-Self.setSizeCh: + if !ok { + return errors.New("conn.writeWindow: window closed") + } + return nil + case <-timer.C: + return errors.New("conn.writeWindow: write to time out") + case <-Self.closeOpCh: + return errors.New("conn.writeWindow: window closed") + } +} + +func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) { + Self.SetSendBuf(buf) // set the buf to send window + //logs.Warn("set the buf to send window") + var bufSeg []byte + var part bool + var l uint32 + for { + bufSeg, l, part, err = Self.WriteTo() + //logs.Warn("buf seg", len(bufSeg), part, err) + // get the buf segments from send window + if bufSeg == nil && part == false && err == io.EOF { + // send window is drain, break the loop + err = nil + break + } + if err != nil { + break + } + n += int(l) + l = 0 + if part { + Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg) + } else { + Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg) + //logs.Warn("buf seg sent", len(bufSeg), part, err) + } + // send to other side, not send nil data to other side + } + //logs.Warn("buf seg write success") + return +} + +func (Self *SendWindow) SetTimeOut(t time.Time) { + // waiting for receive a receive window size + Self.timeout = t +} + +//type bandwidth struct { +// readStart time.Time +// lastReadStart time.Time +// readEnd time.Time +// lastReadEnd time.Time +// bufLength int +// lastBufLength int +// count int8 +// readBW float64 +// writeBW float64 +// readBandwidth float64 +//} +// +//func (Self *bandwidth) StartRead() { +// Self.lastReadStart, Self.readStart = Self.readStart, time.Now() +// if !Self.lastReadStart.IsZero() { +// if Self.count == -5 { +// Self.calcBandWidth() +// } +// } +//} +// +//func (Self *bandwidth) EndRead() { +// Self.lastReadEnd, Self.readEnd = Self.readEnd, time.Now() +// if Self.count == -5 { +// Self.calcWriteBandwidth() +// } +// if Self.count == 0 { +// Self.calcReadBandwidth() +// Self.count = -6 +// } +// Self.count += 1 +//} +// +//func (Self *bandwidth) SetCopySize(n int) { +// // must be invoke between StartRead and EndRead +// Self.lastBufLength, Self.bufLength = Self.bufLength, n +//} +//// calculating +//// start end start end +//// read read +//// write +// +//func (Self *bandwidth) calcBandWidth() { +// t := Self.readStart.Sub(Self.lastReadStart) +// if Self.lastBufLength >= 32768 { +// Self.readBandwidth = float64(Self.lastBufLength) / t.Seconds() +// } +//} +// +//func (Self *bandwidth) calcReadBandwidth() { +// // Bandwidth between nps and npc +// readTime := Self.readEnd.Sub(Self.readStart) +// Self.readBW = float64(Self.bufLength) / readTime.Seconds() +// //logs.Warn("calc read bw", Self.readBW, Self.bufLength, readTime.Seconds()) +//} +// +//func (Self *bandwidth) calcWriteBandwidth() { +// // Bandwidth between nps and user, npc and application +// writeTime := Self.readStart.Sub(Self.lastReadEnd) +// Self.writeBW = float64(Self.lastBufLength) / writeTime.Seconds() +// //logs.Warn("calc write bw", Self.writeBW, Self.bufLength, writeTime.Seconds()) +//} +// +//func (Self *bandwidth) Get() (bw float64) { +// // The zero value, 0 for numeric types +// if Self.writeBW == 0 && Self.readBW == 0 { +// //logs.Warn("bw both 0") +// return 100 +// } +// if Self.writeBW == 0 && Self.readBW != 0 { +// return Self.readBW +// } +// if Self.readBW == 0 && Self.writeBW != 0 { +// return Self.writeBW +// } +// return Self.readBandwidth +//} diff --git a/lib/mux/map.go b/lib/mux/map.go index 0801201..86d09b5 100644 --- a/lib/mux/map.go +++ b/lib/mux/map.go @@ -2,28 +2,35 @@ package mux import ( "sync" - "time" ) type connMap struct { connMap map[int32]*conn - closeCh chan struct{} + //closeCh chan struct{} sync.RWMutex } func NewConnMap() *connMap { connMap := &connMap{ connMap: make(map[int32]*conn), - closeCh: make(chan struct{}), + //closeCh: make(chan struct{}), } - go connMap.clean() + //go connMap.clean() return connMap } +func (s *connMap) Size() (n int) { + s.Lock() + n = len(s.connMap) + s.Unlock() + return +} + func (s *connMap) Get(id int32) (*conn, bool) { s.Lock() - defer s.Unlock() - if v, ok := s.connMap[id]; ok && v != nil { + v, ok := s.connMap[id] + s.Unlock() + if ok && v != nil { return v, true } return nil, false @@ -31,40 +38,38 @@ func (s *connMap) Get(id int32) (*conn, bool) { func (s *connMap) Set(id int32, v *conn) { s.Lock() - defer s.Unlock() s.connMap[id] = v + s.Unlock() } func (s *connMap) Close() { - s.Lock() - defer s.Unlock() + //s.closeCh <- struct{}{} // stop the clean goroutine first for _, v := range s.connMap { - v.isClose = true + v.Close() // close all the connections in the mux } - s.closeCh <- struct{}{} } func (s *connMap) Delete(id int32) { s.Lock() - defer s.Unlock() delete(s.connMap, id) + s.Unlock() } -func (s *connMap) clean() { - ticker := time.NewTimer(time.Minute * 1) - for { - select { - case <-ticker.C: - s.Lock() - for _, v := range s.connMap { - if v.isClose { - delete(s.connMap, v.connId) - } - } - s.Unlock() - case <-s.closeCh: - ticker.Stop() - return - } - } -} +//func (s *connMap) clean() { +// ticker := time.NewTimer(time.Minute * 1) +// for { +// select { +// case <-ticker.C: +// s.Lock() +// for _, v := range s.connMap { +// if v.isClose { +// delete(s.connMap, v.connId) +// } +// } +// s.Unlock() +// case <-s.closeCh: +// ticker.Stop() +// return +// } +// } +//} diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 0365a8b..a43510a 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -1,57 +1,59 @@ package mux import ( - "bytes" - "encoding/binary" "errors" + "io" "math" "net" - "sync" "sync/atomic" "time" - "github.com/cnlh/nps/lib/pool" -) - -const ( - MUX_PING_FLAG int32 = iota - MUX_NEW_CONN_OK - MUX_NEW_CONN_Fail - MUX_NEW_MSG - MUX_MSG_SEND_OK - MUX_NEW_CONN - MUX_PING - MUX_CONN_CLOSE - MUX_PING_RETURN + "github.com/astaxie/beego/logs" + "github.com/cnlh/nps/lib/common" ) type Mux struct { + latency uint64 // we store latency in bits, but it's float64 net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk int - connType string - sync.Mutex + conn net.Conn + connMap *connMap + newConnCh chan *conn + id int32 + closeChan chan struct{} + IsClose bool + pingOk uint32 + counter *latencyCounter + bw *bandwidth + pingCh chan []byte + pingCheckTime uint32 + connType string + writeQueue PriorityQueue + newConnQueue ConnQueue } func NewMux(c net.Conn, connType string) *Mux { + //c.(*net.TCPConn).SetReadBuffer(0) + //c.(*net.TCPConn).SetWriteBuffer(0) m := &Mux{ conn: c, connMap: NewConnMap(), id: 0, - closeChan: make(chan struct{}), + closeChan: make(chan struct{}, 1), newConnCh: make(chan *conn), + bw: new(bandwidth), IsClose: false, connType: connType, + pingCh: make(chan []byte), + counter: newLatencyCounter(), } + m.writeQueue.New() + m.newConnQueue.New() //read session by flag - go m.readSession() + m.readSession() //ping - go m.ping() + m.ping() + m.pingReturn() + m.writeSession() return m } @@ -59,12 +61,10 @@ func (s *Mux) NewConn() (*conn, error) { if s.IsClose { return nil, errors.New("the mux has closed") } - conn := NewConn(s.getId(), s) + conn := NewConn(s.getId(), s, "nps ") //it must be set before send s.connMap.Set(conn.connId, conn) - if err := s.sendInfo(MUX_NEW_CONN, conn.connId, nil); err != nil { - return nil, err - } + s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) //set a timer timeout 30 second timer := time.NewTimer(time.Minute * 2) defer timer.Stop() @@ -79,7 +79,7 @@ func (s *Mux) NewConn() (*conn, error) { func (s *Mux) Accept() (net.Conn, error) { if s.IsClose { - return nil, errors.New("accpet error,the conn has closed") + return nil, errors.New("accpet error,the mux has closed") } conn := <-s.newConnCh if conn == nil { @@ -92,129 +92,417 @@ func (s *Mux) Addr() net.Addr { return s.conn.LocalAddr() } -func (s *Mux) sendInfo(flag int32, id int32, content []byte) error { - raw := bytes.NewBuffer([]byte{}) - binary.Write(raw, binary.LittleEndian, flag) - binary.Write(raw, binary.LittleEndian, id) - if content != nil && len(content) > 0 { - binary.Write(raw, binary.LittleEndian, int32(len(content))) - binary.Write(raw, binary.LittleEndian, content) +func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { + if s.IsClose { + return } - if _, err := s.conn.Write(raw.Bytes()); err != nil { + var err error + pack := common.MuxPack.Get() + err = pack.NewPac(flag, id, data...) + if err != nil { + common.MuxPack.Put(pack) + logs.Error("mux: new pack err", err) s.Close() - return err + return } - return nil + s.writeQueue.Push(pack) + return } +func (s *Mux) writeSession() { + go s.packBuf() + //go s.writeBuf() +} + +func (s *Mux) packBuf() { + //buffer := common.BuffPool.Get() + for { + if s.IsClose { + break + } + //buffer.Reset() + pack := s.writeQueue.Pop() + if s.IsClose { + break + } + //buffer := common.BuffPool.Get() + err := pack.Pack(s.conn) + common.MuxPack.Put(pack) + if err != nil { + logs.Error("mux: pack err", err) + //common.BuffPool.Put(buffer) + s.Close() + break + } + //logs.Warn(buffer.String()) + //s.bufQueue.Push(buffer) + //l := buffer.Len() + //n, err := buffer.WriteTo(s.conn) + //common.BuffPool.Put(buffer) + //if err != nil || int(n) != l { + // logs.Error("mux: close from write session fail ", err, n, l) + // s.Close() + // break + //} + } +} + +//func (s *Mux) writeBuf() { +// for { +// if s.IsClose { +// break +// } +// buffer, err := s.bufQueue.Pop() +// if err != nil { +// break +// } +// l := buffer.Len() +// n, err := buffer.WriteTo(s.conn) +// common.BuffPool.Put(buffer) +// if err != nil || int(n) != l { +// logs.Warn("close from write session fail ", err, n, l) +// s.Close() +// break +// } +// } +//} + func (s *Mux) ping() { go func() { - ticker := time.NewTicker(time.Second * 1) + now, _ := time.Now().UTC().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + // send the ping flag and get the latency first + ticker := time.NewTicker(time.Second * 5) for { + if s.IsClose { + ticker.Stop() + break + } select { case <-ticker.C: } - //Avoid going beyond the scope - if (math.MaxInt32 - s.id) < 10000 { - s.id = 0 + if atomic.LoadUint32(&s.pingCheckTime) >= 60 { + logs.Error("mux: ping time out") + s.Close() + // more than 5 minutes not receive the ping return package, + // mux conn is damaged, maybe a packet drop, close it + break } - if err := s.sendInfo(MUX_PING_FLAG, MUX_PING, nil); err != nil || (s.pingOk > 10 && s.connType == "kcp") { + now, _ := time.Now().UTC().MarshalText() + s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + atomic.AddUint32(&s.pingCheckTime, 1) + if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" { + logs.Error("mux: kcp ping err") s.Close() break } - s.pingOk++ + atomic.AddUint32(&s.pingOk, 1) + } + }() +} + +func (s *Mux) pingReturn() { + go func() { + var now time.Time + var data []byte + for { + if s.IsClose { + break + } + select { + case data = <-s.pingCh: + atomic.StoreUint32(&s.pingCheckTime, 0) + case <-s.closeChan: + break + } + _ = now.UnmarshalText(data) + latency := time.Now().UTC().Sub(now).Seconds() / 2 + if latency > 0 { + atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency))) + // convert float64 to bits, store it atomic + } + //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency))) + if cap(data) > 0 { + common.WindowBuff.Put(data) + } } }() - select { - case <-s.closeChan: - } } func (s *Mux) readSession() { - var buf []byte go func() { + var connection *conn for { - var flag, i int32 - var n int - var err error - if binary.Read(s.conn, binary.LittleEndian, &flag) == nil { - if binary.Read(s.conn, binary.LittleEndian, &i) != nil { - break - } - s.pingOk = 0 - switch flag { - case MUX_NEW_CONN: //new conn - conn := NewConn(i, s) - s.connMap.Set(i, conn) //it has been set before send ok - s.newConnCh <- conn - s.sendInfo(MUX_NEW_CONN_OK, i, nil) - continue - case MUX_PING_FLAG: //ping - s.sendInfo(MUX_PING_RETURN, MUX_PING, nil) - continue - case MUX_PING_RETURN: - continue - case MUX_NEW_MSG: - buf = pool.GetBufPoolCopy() - if n, err = ReadLenBytes(buf, s.conn); err != nil { - break - } - } - if conn, ok := s.connMap.Get(i); ok && !conn.isClose { - switch flag { - case MUX_NEW_MSG: //new msg from remote conn - //insert wait queue - conn.waitQueue.Push(NewBufNode(buf, n)) - //judge len if >xxx ,send stop - if conn.readWait { - conn.readWait = false - conn.readCh <- struct{}{} - } - case MUX_MSG_SEND_OK: //the remote has read - select { - case conn.getStatusCh <- struct{}{}: - default: - } - conn.hasWrite-- - case MUX_NEW_CONN_OK: //conn ok - conn.connStatusOkCh <- struct{}{} - case MUX_NEW_CONN_Fail: - conn.connStatusFailCh <- struct{}{} - case MUX_CONN_CLOSE: //close the connection - go conn.Close() - s.connMap.Delete(i) - } - } else if flag == MUX_NEW_MSG { - pool.PutBufPoolCopy(buf) - } - } else { + if s.IsClose { break } + connection = s.newConnQueue.Pop() + if s.IsClose { + break // make sure that is closed + } + s.connMap.Set(connection.connId, connection) //it has been set before send ok + s.newConnCh <- connection + s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) } + }() + go func() { + pack := common.MuxPack.Get() + var l uint16 + var err error + for { + if s.IsClose { + break + } + pack = common.MuxPack.Get() + s.bw.StartRead() + if l, err = pack.UnPack(s.conn); err != nil { + logs.Error("mux: read session unpack from connection err", err) + s.Close() + break + } + s.bw.SetCopySize(l) + atomic.StoreUint32(&s.pingOk, 0) + switch pack.Flag { + case common.MUX_NEW_CONN: //new connection + connection := NewConn(pack.Id, s) + s.newConnQueue.Push(connection) + continue + case common.MUX_PING_FLAG: //ping + s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) + common.WindowBuff.Put(pack.Content) + continue + case common.MUX_PING_RETURN: + //go func(content []byte) { + s.pingCh <- pack.Content + //}(pack.Content) + continue + } + if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { + switch pack.Flag { + case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection + err = s.newMsg(connection, pack) + if err != nil { + logs.Error("mux: read session connection new msg err", err) + connection.Close() + } + continue + case common.MUX_NEW_CONN_OK: //connection ok + connection.connStatusOkCh <- struct{}{} + continue + case common.MUX_NEW_CONN_Fail: + connection.connStatusFailCh <- struct{}{} + continue + case common.MUX_MSG_SEND_OK: + if connection.isClose { + continue + } + connection.sendWindow.SetSize(pack.Window, pack.ReadLength) + continue + case common.MUX_CONN_CLOSE: //close the connection + connection.closeFlag = true + //s.connMap.Delete(pack.Id) + //go func(connection *conn) { + connection.receiveWindow.Stop() // close signal to receive window + //}(connection) + continue + } + } else if pack.Flag == common.MUX_CONN_CLOSE { + continue + } + common.MuxPack.Put(pack) + } + common.MuxPack.Put(pack) s.Close() }() - select { - case <-s.closeChan: - } } -func (s *Mux) Close() error { +func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) { + if connection.isClose { + err = io.ErrClosedPipe + return + } + //logs.Warn("read session receive new msg", pack.Length) + //go func(connection *conn, pack *common.MuxPackager) { // do not block read session + //insert into queue + if pack.Flag == common.MUX_NEW_MSG_PART { + err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id) + } + if pack.Flag == common.MUX_NEW_MSG { + err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id) + } + //logs.Warn("read session write success", pack.Length) + return +} + +func (s *Mux) Close() (err error) { + logs.Warn("close mux") if s.IsClose { return errors.New("the mux has closed") } s.IsClose = true s.connMap.Close() - select { - case s.closeChan <- struct{}{}: - } - select { - case s.closeChan <- struct{}{}: - } + s.connMap = nil + //s.bufQueue.Stop() + s.closeChan <- struct{}{} close(s.newConnCh) - return s.conn.Close() + err = s.conn.Close() + s.release() + return +} + +func (s *Mux) release() { + for { + pack := s.writeQueue.TryPop() + if pack == nil { + break + } + if pack.BasePackager.Content != nil { + common.WindowBuff.Put(pack.BasePackager.Content) + } + common.MuxPack.Put(pack) + } + for { + connection := s.newConnQueue.TryPop() + if connection == nil { + break + } + connection = nil + } + s.writeQueue.Stop() + s.newConnQueue.Stop() } //get new connId as unique flag -func (s *Mux) getId() int32 { - return atomic.AddInt32(&s.id, 1) +func (s *Mux) getId() (id int32) { + //Avoid going beyond the scope + if (math.MaxInt32 - s.id) < 10000 { + atomic.StoreInt32(&s.id, 0) + } + id = atomic.AddInt32(&s.id, 1) + if _, ok := s.connMap.Get(id); ok { + return s.getId() + } + return +} + +type bandwidth struct { + readBandwidth uint64 // store in bits, but it's float64 + readStart time.Time + lastReadStart time.Time + bufLength uint32 +} + +func (Self *bandwidth) StartRead() { + if Self.readStart.IsZero() { + Self.readStart = time.Now() + } + if Self.bufLength >= common.MAXIMUM_SEGMENT_SIZE*300 { + Self.lastReadStart, Self.readStart = Self.readStart, time.Now() + Self.calcBandWidth() + } +} + +func (Self *bandwidth) SetCopySize(n uint16) { + Self.bufLength += uint32(n) +} + +func (Self *bandwidth) calcBandWidth() { + t := Self.readStart.Sub(Self.lastReadStart) + atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) + Self.bufLength = 0 +} + +func (Self *bandwidth) Get() (bw float64) { + // The zero value, 0 for numeric types + bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth)) + if bw <= 0 { + bw = 100 + } + //logs.Warn(bw) + return +} + +const counterBits = 4 +const counterMask = 1<> counterBits) & counterMask) + // we set head is 4 bits + min = uint8(idxs & counterMask) + return +} + +func (Self *latencyCounter) pack(head, min uint8) uint8 { + return uint8(head< value { + min = head + } + head++ + Self.headMin = Self.pack(head, min) +} + +func (Self *latencyCounter) minimal() (min uint8) { + var val float64 + var i uint8 + for i = 0; i < counterMask; i++ { + if Self.buf[i] > 0 { + if val > Self.buf[i] { + val = Self.buf[i] + min = i + } + } + } + return +} + +func (Self *latencyCounter) Latency(value float64) (latency float64) { + Self.add(value) + _, min := Self.unpack(Self.headMin) + latency = Self.buf[min] * Self.countSuccess() + return +} + +const lossRatio = 1.6 + +func (Self *latencyCounter) countSuccess() (successRate float64) { + var success, loss, i uint8 + _, min := Self.unpack(Self.headMin) + for i = 0; i < counterMask; i++ { + if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 { + loss++ + } + if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 { + success++ + } + } + // counting all the data in the ring buf, except zero + successRate = float64(success) / float64(loss+success) + return } diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index b89e4e9..151def1 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -1,16 +1,22 @@ package mux import ( + "bufio" + "fmt" + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/goroutine" + "io" "log" "net" "net/http" + "net/http/httputil" _ "net/http/pprof" + "strconv" "testing" "time" + "unsafe" "github.com/astaxie/beego/logs" - "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" ) var conn1 net.Conn @@ -24,24 +30,54 @@ func TestNewMux(t *testing.T) { logs.SetLogFuncCallDepth(3) server() client() + //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false)) time.Sleep(time.Second * 3) go func() { m2 := NewMux(conn2, "tcp") for { + //logs.Warn("npc starting accept") c, err := m2.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) + continue } - go func(c net.Conn) { - c2, err := net.Dial("tcp", "127.0.0.1:8082") - if err != nil { - log.Fatalln(err) - } - go common.CopyBuffer(c2, c) - common.CopyBuffer(c, c2) + //logs.Warn("npc accept success ") + c2, err := net.Dial("tcp", "127.0.0.1:80") + if err != nil { + logs.Warn(err) c.Close() - c2.Close() - }(c) + continue + } + //c2.(*net.TCPConn).SetReadBuffer(0) + //c2.(*net.TCPConn).SetReadBuffer(0) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil)) + //go func(c2 net.Conn, c *conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c2, c) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from nps", err, c.connId) + // // } + // // wg.Done() + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg)) + // //go func() { + // // _, err = common.CopyBuffer(c, c2) + // // if err != nil { + // // c2.Close() + // // c.Close() + // // //logs.Warn("close npc by copy from server", err, c.connId) + // // } + // // wg.Done() + // //}() + // //logs.Warn("npc wait") + // wg.Wait() + //}(c2, c.(*conn)) } }() @@ -49,26 +85,58 @@ func TestNewMux(t *testing.T) { m1 := NewMux(conn1, "tcp") l, err := net.Listen("tcp", "127.0.0.1:7777") if err != nil { - log.Fatalln(err) + logs.Warn(err) } for { - conn, err := l.Accept() + //logs.Warn("nps starting accept") + conns, err := l.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) + continue } - go func(conn net.Conn) { - tmpCpnn, err := m1.NewConn() - if err != nil { - log.Fatalln(err) - } - go common.CopyBuffer(tmpCpnn, conn) - common.CopyBuffer(conn, tmpCpnn) - conn.Close() - tmpCpnn.Close() - }(conn) + //conns.(*net.TCPConn).SetReadBuffer(0) + //conns.(*net.TCPConn).SetReadBuffer(0) + //logs.Warn("nps accept success starting new conn") + tmpCpnn, err := m1.NewConn() + if err != nil { + logs.Warn("nps new conn err ", err) + continue + } + //logs.Warn("nps new conn success ", tmpCpnn.connId) + _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil)) + //go func(tmpCpnn *conn, conns net.Conn) { + // wg := new(sync.WaitGroup) + // wg.Add(2) + // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg)) + // //go func() { + // // _, err := common.CopyBuffer(tmpCpnn, conns) + // // if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) + // // } + // //}() + // //wg.Add(1) + // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg)) + // //time.Sleep(time.Second) + // //_, err = common.CopyBuffer(conns, tmpCpnn) + // //if err != nil { + // // conns.Close() + // // tmpCpnn.Close() + // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) + // //} + // wg.Wait() + //}(tmpCpnn, conns) } }() + //go NewLogServer() + time.Sleep(time.Second * 5) + //for i := 0; i < 1; i++ { + // go test_raw(i) + //} + //test_request() + for { time.Sleep(time.Second * 5) } @@ -78,12 +146,12 @@ func server() { var err error l, err := net.Listen("tcp", "127.0.0.1:9999") if err != nil { - log.Fatalln(err) + logs.Warn(err) } go func() { conn1, err = l.Accept() if err != nil { - log.Fatalln(err) + logs.Warn(err) } }() return @@ -93,12 +161,79 @@ func client() { var err error conn2, err = net.Dial("tcp", "127.0.0.1:9999") if err != nil { - log.Fatalln(err) + logs.Warn(err) } } +func test_request() { + conn, _ := net.Dial("tcp", "127.0.0.1:7777") + for i := 0; i < 1000; i++ { + conn.Write([]byte(`GET / HTTP/1.1 +Host: 127.0.0.1:7777 +Connection: keep-alive + + +`)) + r, err := http.ReadResponse(bufio.NewReader(conn), nil) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn("read response success", r) + b, err := httputil.DumpResponse(r, true) + if err != nil { + logs.Warn("close by dump response err", err) + break + } + fmt.Println(string(b[:20]), err) + //time.Sleep(time.Second) + } + logs.Warn("finish") +} + +func test_raw(k int) { + for i := 0; i < 1000; i++ { + ti := time.Now() + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil { + logs.Warn("conn dial err", err) + } + tid := time.Now() + conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 +Host: 127.0.0.1:7777 + + +`)) + tiw := time.Now() + buf := make([]byte, 3572) + n, err := io.ReadFull(conn, buf) + //n, err := conn.Read(buf) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) + //time.Sleep(time.Second) + err = conn.Close() + if err != nil { + logs.Warn("close conn err ", err) + } + now := time.Now() + du := now.Sub(ti).Seconds() + dud := now.Sub(tid).Seconds() + duw := now.Sub(tiw).Seconds() + if du > 1 { + logs.Warn("duration long", du, dud, duw, k, i) + } + if n != 3572 { + logs.Warn("n loss", n, string(buf)) + } + } + logs.Warn("finish") +} + func TestNewConn(t *testing.T) { - buf := pool.GetBufPoolCopy() + buf := common.GetBufPoolCopy() logs.Warn(len(buf), cap(buf)) //b := pool.GetBufPoolCopy() //b[0] = 1 @@ -108,3 +243,205 @@ func TestNewConn(t *testing.T) { logs.Warn(copy(buf[:3], b), len(buf), cap(buf)) logs.Warn(len(buf), buf[0]) } + +func TestDQueue(t *testing.T) { + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + d := new(bufDequeue) + d.vals = make([]unsafe.Pointer, 8) + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + logs.Warn(i) + logs.Warn(d.popTail()) + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + data := "test" + go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data))) + } + }() + time.Sleep(time.Second * 3) +} + +func TestChain(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(bufChain) + d.new(256) + go func() { + time.Sleep(time.Second) + for i := 0; i < 30000; i++ { + unsa, ok := d.popTail() + str := (*string)(unsa) + if ok { + fmt.Println(i, str, *str, ok) + //logs.Warn(i, str, *str, ok) + } else { + fmt.Println("nil", i, ok) + //logs.Warn("nil", i, ok) + } + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 3000; i++ { + go func(i int) { + for n := 0; n < 10; n++ { + data := "test " + strconv.Itoa(i) + strconv.Itoa(n) + fmt.Println(data, unsafe.Pointer(&data)) + //logs.Warn(data, unsafe.Pointer(&data)) + d.pushHead(unsafe.Pointer(&data)) + } + }(i) + } + }() + time.Sleep(time.Second * 100000) +} + +func TestFIFO(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(ReceiveWindowQueue) + d.New() + go func() { + time.Sleep(time.Second) + for i := 0; i < 1001; i++ { + data, err := d.Pop() + if err == nil { + //fmt.Println(i, string(data.buf), err) + logs.Warn(i, string(data.Buf), err) + common.ListElementPool.Put(data) + } else { + //fmt.Println("err", err) + logs.Warn("err", err) + } + //logs.Warn(d.Len()) + } + logs.Warn("pop finish") + }() + go func() { + time.Sleep(time.Second * 10) + for i := 0; i < 1000; i++ { + by := []byte("test " + strconv.Itoa(i) + " ") // + data, _ := NewListElement(by, uint16(len(by)), true) + //fmt.Println(string((*data).buf), data) + //logs.Warn(string((*data).buf), data) + d.Push(data) + } + }() + time.Sleep(time.Second * 100000) +} + +func TestPriority(t *testing.T) { + go func() { + log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + time.Sleep(time.Second * 5) + d := new(PriorityQueue) + d.New() + go func() { + time.Sleep(time.Second) + for i := 0; i < 360050; i++ { + data := d.Pop() + //fmt.Println(i, string(data.buf), err) + logs.Warn(i, string(data.Content), data) + } + logs.Warn("pop finish") + }() + go func() { + time.Sleep(time.Second * 10) + for i := 0; i < 30000; i++ { + go func(i int) { + for n := 0; n < 10; n++ { + data := new(common.MuxPackager) + by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) + _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) + //fmt.Println(string((*data).buf), data) + logs.Warn(string((*data).Content), data) + d.Push(data) + } + }(i) + go func(i int) { + data := new(common.MuxPackager) + _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) + //fmt.Println(string((*data).buf), data) + logs.Warn(data) + d.Push(data) + }(i) + go func(i int) { + data := new(common.MuxPackager) + _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) + //fmt.Println(string((*data).buf), data) + logs.Warn(data) + d.Push(data) + }(i) + } + }() + time.Sleep(time.Second * 100000) +} + +//func TestReceive(t *testing.T) { +// go func() { +// log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) +// }() +// logs.EnableFuncCallDepth(true) +// logs.SetLogFuncCallDepth(3) +// time.Sleep(time.Second * 5) +// mux := new(Mux) +// mux.bw.readBandwidth = float64(1*1024*1024) +// mux.latency = float64(1/1000) +// wind := new(ReceiveWindow) +// wind.New(mux) +// wind. +// go func() { +// time.Sleep(time.Second) +// for i := 0; i < 36000; i++ { +// data := d.Pop() +// //fmt.Println(i, string(data.buf), err) +// logs.Warn(i, string(data.Content), data) +// } +// }() +// go func() { +// time.Sleep(time.Second*10) +// for i := 0; i < 3000; i++ { +// go func(i int) { +// for n := 0; n < 10; n++{ +// data := new(common.MuxPackager) +// by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) +// _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(string((*data).Content), data) +// d.Push(data) +// } +// }(i) +// go func(i int) { +// data := new(common.MuxPackager) +// _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(data) +// d.Push(data) +// }(i) +// go func(i int) { +// data := new(common.MuxPackager) +// _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) +// //fmt.Println(string((*data).buf), data) +// logs.Warn(data) +// d.Push(data) +// }(i) +// } +// }() +// time.Sleep(time.Second * 100000) +//} diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 9487845..212563c 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -2,82 +2,577 @@ package mux import ( "errors" + "github.com/cnlh/nps/lib/common" + "io" + "math" + "runtime" "sync" - - "github.com/cnlh/nps/lib/pool" + "sync/atomic" + "time" + "unsafe" ) -type Element *bufNode - -type bufNode struct { - val []byte //buf value - l int //length +type PriorityQueue struct { + highestChain *bufChain + middleChain *bufChain + lowestChain *bufChain + starving uint8 + stop bool + cond *sync.Cond } -func NewBufNode(buf []byte, l int) *bufNode { - return &bufNode{ - val: buf, - l: l, +func (Self *PriorityQueue) New() { + Self.highestChain = new(bufChain) + Self.highestChain.new(4) + Self.middleChain = new(bufChain) + Self.middleChain.new(32) + Self.lowestChain = new(bufChain) + Self.lowestChain.new(256) + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) +} + +func (Self *PriorityQueue) Push(packager *common.MuxPackager) { + //logs.Warn("push start") + Self.push(packager) + Self.cond.Broadcast() + //logs.Warn("push finish") + return +} + +func (Self *PriorityQueue) push(packager *common.MuxPackager) { + switch packager.Flag { + case common.MUX_PING_FLAG, common.MUX_PING_RETURN: + Self.highestChain.pushHead(unsafe.Pointer(packager)) + // the ping package need highest priority + // prevent ping calculation error + case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail: + // the new conn package need some priority too + Self.middleChain.pushHead(unsafe.Pointer(packager)) + default: + Self.lowestChain.pushHead(unsafe.Pointer(packager)) } } -type Queue interface { - Push(e Element) //向队列中添加元素 - Pop() Element //移除队列中最前面的元素 - Clear() bool //清空队列 - Size() int //获取队列的元素个数 - IsEmpty() bool //判断队列是否是空 -} +const maxStarving uint8 = 8 -type sliceEntry struct { - element []Element - sync.Mutex -} - -func NewQueue() *sliceEntry { - return &sliceEntry{} -} - -//向队列中添加元素 -func (entry *sliceEntry) Push(e Element) { - entry.Lock() - defer entry.Unlock() - entry.element = append(entry.element, e) -} - -//移除队列中最前面的额元素 -func (entry *sliceEntry) Pop() (Element, error) { - if entry.IsEmpty() { - return nil, errors.New("queue is empty!") +func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { + var iter bool + for { + packager = Self.TryPop() + if packager != nil { + return + } + if Self.stop { + return + } + if iter { + break + // trying to pop twice + } + iter = true + runtime.Gosched() } - entry.Lock() - defer entry.Unlock() - firstElement := entry.element[0] - entry.element = entry.element[1:] - return firstElement, nil + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for packager = Self.TryPop(); packager == nil; { + if Self.stop { + return + } + //logs.Warn("queue into wait") + Self.cond.Wait() + // wait for it with no more iter + packager = Self.TryPop() + //logs.Warn("queue wait finish", packager) + } + return } -func (entry *sliceEntry) Clear() bool { - entry.Lock() - defer entry.Unlock() - if entry.IsEmpty() { +func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) { + ptr, ok := Self.highestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + return + } + if Self.starving < maxStarving { + // not pop too much, lowestChain will wait too long + ptr, ok = Self.middleChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + Self.starving++ + return + } + } + ptr, ok = Self.lowestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + if Self.starving > 0 { + Self.starving = uint8(Self.starving / 2) + } + return + } + if Self.starving > 0 { + ptr, ok = Self.middleChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + Self.starving++ + return + } + } + return +} + +func (Self *PriorityQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() +} + +type ConnQueue struct { + chain *bufChain + starving uint8 + stop bool + cond *sync.Cond +} + +func (Self *ConnQueue) New() { + Self.chain = new(bufChain) + Self.chain.new(32) + locker := new(sync.Mutex) + Self.cond = sync.NewCond(locker) +} + +func (Self *ConnQueue) Push(connection *conn) { + Self.chain.pushHead(unsafe.Pointer(connection)) + Self.cond.Broadcast() + return +} + +func (Self *ConnQueue) Pop() (connection *conn) { + var iter bool + for { + connection = Self.TryPop() + if connection != nil { + return + } + if Self.stop { + return + } + if iter { + break + // trying to pop twice + } + iter = true + runtime.Gosched() + } + Self.cond.L.Lock() + defer Self.cond.L.Unlock() + for connection = Self.TryPop(); connection == nil; { + if Self.stop { + return + } + //logs.Warn("queue into wait") + Self.cond.Wait() + // wait for it with no more iter + connection = Self.TryPop() + //logs.Warn("queue wait finish", packager) + } + return +} + +func (Self *ConnQueue) TryPop() (connection *conn) { + ptr, ok := Self.chain.popTail() + if ok { + connection = (*conn)(ptr) + return + } + return +} + +func (Self *ConnQueue) Stop() { + Self.stop = true + Self.cond.Broadcast() +} + +func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) { + if uint16(len(buf)) != l { + err = errors.New("ListElement: buf length not match") + return + } + //if l == 0 { + // logs.Warn("push zero") + //} + element = common.ListElementPool.Get() + element.Buf = buf + element.L = l + element.Part = part + return +} + +type ReceiveWindowQueue struct { + lengthWait uint64 + chain *bufChain + stopOp chan struct{} + readOp chan struct{} + timeout time.Time +} + +func (Self *ReceiveWindowQueue) New() { + Self.readOp = make(chan struct{}) + Self.chain = new(bufChain) + Self.chain.new(64) + Self.stopOp = make(chan struct{}, 2) +} + +func (Self *ReceiveWindowQueue) Push(element *common.ListElement) { + var length, wait uint32 + for { + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, wait = Self.chain.head.unpack(ptrs) + length += uint32(element.L) + if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) { + break + } + // another goroutine change the length or into wait, make sure + } + //logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf)) + Self.chain.pushHead(unsafe.Pointer(element)) + //logs.Warn("window push", Self.Len()) + if wait == 1 { + Self.allowPop() + } + return +} + +func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) { + var length uint32 +startPop: + ptrs := atomic.LoadUint64(&Self.lengthWait) + length, _ = Self.chain.head.unpack(ptrs) + if length == 0 { + if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) { + goto startPop // another goroutine is pushing + } + err = Self.waitPush() + // there is no more data in queue, wait for it + if err != nil { + return + } + goto startPop // wait finish, trying to get the new status + } + // length is not zero, so try to pop + for { + element = Self.TryPop() + if element != nil { + return + } + runtime.Gosched() // another goroutine is still pushing + } +} + +func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) { + ptr, ok := Self.chain.popTail() + if ok { + //logs.Warn("window pop before", Self.Len()) + element = (*common.ListElement)(ptr) + atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *bufDequeue) pack(head, tail uint32) uint64 { + const mask = 1< 0 { + runtime.Gosched() + } + for { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if (tail+uint32(len(d.vals)))&(1<= 3 && atomic.LoadUint32(&d.starving) > 0 { + atomic.StoreUint32(&d.starving, 0) + } + break + } + starve++ + if starve >= 3 { + atomic.StoreUint32(&d.starving, 1) + } + } + // The head slot is free, so we own it. + *slot = val return true } -func (entry *sliceEntry) Size() int { - return len(entry.element) +// popTail removes and returns the element at the tail of the queue. +// It returns false if the queue is empty. It may be called by any +// number of consumers. +func (d *bufDequeue) popTail() (unsafe.Pointer, bool) { + ptrs := atomic.LoadUint64(&d.headTail) + head, tail := d.unpack(ptrs) + if tail == head { + // Queue is empty. + return nil, false + } + slot := &d.vals[tail&uint32(len(d.vals)-1)] + var val unsafe.Pointer + for { + val = atomic.LoadPointer(slot) + if val != nil { + // We now own slot. + break + } + // Another goroutine is still pushing data on the tail. + } + + // Tell pushHead that we're done with this slot. Zeroing the + // slot is also important so we don't leave behind references + // that could keep this object live longer than necessary. + // + // We write to val first and then publish that we're done with + atomic.StorePointer(slot, nil) + // At this point pushHead owns the slot. + if tail < math.MaxUint32 { + atomic.AddUint64(&d.headTail, 1) + } else { + atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1)) + } + return val, true } -func (entry *sliceEntry) IsEmpty() bool { - if len(entry.element) == 0 { - return true - } - return false +// bufChain is a dynamically-sized version of bufDequeue. +// +// This is implemented as a doubly-linked list queue of poolDequeues +// where each dequeue is double the size of the previous one. Once a +// dequeue fills up, this allocates a new one and only ever pushes to +// the latest dequeue. Pops happen from the other end of the list and +// once a dequeue is exhausted, it gets removed from the list. +type bufChain struct { + // head is the bufDequeue to push to. This is only accessed + // by the producer, so doesn't need to be synchronized. + head *bufChainElt + + // tail is the bufDequeue to popTail from. This is accessed + // by consumers, so reads and writes must be atomic. + tail *bufChainElt + newChain uint32 +} + +type bufChainElt struct { + bufDequeue + + // next and prev link to the adjacent poolChainElts in this + // bufChain. + // + // next is written atomically by the producer and read + // atomically by the consumer. It only transitions from nil to + // non-nil. + // + // prev is written atomically by the consumer and read + // atomically by the producer. It only transitions from + // non-nil to nil. + next, prev *bufChainElt +} + +func storePoolChainElt(pp **bufChainElt, v *bufChainElt) { + atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) +} + +func loadPoolChainElt(pp **bufChainElt) *bufChainElt { + return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) +} + +func (c *bufChain) new(initSize int) { + // Initialize the chain. + // initSize must be a power of 2 + d := new(bufChainElt) + d.vals = make([]unsafe.Pointer, initSize) + storePoolChainElt(&c.head, d) + storePoolChainElt(&c.tail, d) +} + +func (c *bufChain) pushHead(val unsafe.Pointer) { +startPush: + for { + if atomic.LoadUint32(&c.newChain) > 0 { + runtime.Gosched() + } else { + break + } + } + + d := loadPoolChainElt(&c.head) + + if d.pushHead(val) { + return + } + + // The current dequeue is full. Allocate a new one of twice + // the size. + if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) { + newSize := len(d.vals) * 2 + if newSize >= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &bufChainElt{prev: d} + d2.vals = make([]unsafe.Pointer, newSize) + d2.pushHead(val) + storePoolChainElt(&c.head, d2) + storePoolChainElt(&d.next, d2) + atomic.StoreUint32(&c.newChain, 0) + return + } + goto startPush +} + +func (c *bufChain) popTail() (unsafe.Pointer, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the TryPop and the TryPop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next TryPop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } } diff --git a/lib/mux/web.go b/lib/mux/web.go new file mode 100644 index 0000000..36b2017 --- /dev/null +++ b/lib/mux/web.go @@ -0,0 +1,154 @@ +package mux + +import ( + "fmt" + "github.com/astaxie/beego/logs" + "net/http" + "sort" + "strconv" + "strings" + "sync" + "time" +) + +type connLog struct { + startTime time.Time + isClose bool + logs []string +} + +var logms map[int]*connLog +var logmc map[int]*connLog + +var copyMaps map[int]*connLog +var copyMapc map[int]*connLog +var stashTimeNow time.Time +var mutex sync.Mutex + +func deepCopyMaps() { + copyMaps = make(map[int]*connLog) + for k, v := range logms { + copyMaps[k] = &connLog{ + startTime: v.startTime, + isClose: v.isClose, + logs: v.logs, + } + } +} + +func deepCopyMapc() { + copyMapc = make(map[int]*connLog) + for k, v := range logmc { + copyMapc[k] = &connLog{ + startTime: v.startTime, + isClose: v.isClose, + logs: v.logs, + } + } +} + +func init() { + logms = make(map[int]*connLog) + logmc = make(map[int]*connLog) +} + +type IntSlice []int + +func (s IntSlice) Len() int { return len(s) } + +func (s IntSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + +func (s IntSlice) Less(i, j int) bool { return s[i] < s[j] } + +func NewLogServer() { + http.HandleFunc("/", index) + http.HandleFunc("/detail", detail) + http.HandleFunc("/stash", stash) + fmt.Println(http.ListenAndServe(":8899", nil)) +} + +func stash(w http.ResponseWriter, r *http.Request) { + stashTimeNow = time.Now() + deepCopyMaps() + deepCopyMapc() + w.Write([]byte("ok")) +} + +func getM(label string, id int) (cL *connLog) { + label = strings.TrimSpace(label) + mutex.Lock() + defer mutex.Unlock() + if label == "nps" { + cL = logms[id] + } + if label == "npc" { + cL = logmc[id] + } + return +} + +func setM(label string, id int, cL *connLog) { + label = strings.TrimSpace(label) + mutex.Lock() + defer mutex.Unlock() + if label == "nps" { + logms[id] = cL + } + if label == "npc" { + logmc[id] = cL + } +} + +func index(w http.ResponseWriter, r *http.Request) { + var keys []int + for k := range copyMaps { + keys = append(keys, k) + } + sort.Sort(IntSlice(keys)) + var s string + s += "

nps

" + for _, v := range keys { + connL := copyMaps[v] + s += "" + strconv.Itoa(v) + "----------" + s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" + s += strconv.FormatBool(connL.isClose) + s += "
" + } + + keys = keys[:0] + s += "

npc

" + for k := range copyMapc { + keys = append(keys, k) + } + sort.Sort(IntSlice(keys)) + + for _, v := range keys { + connL := copyMapc[v] + s += "" + strconv.Itoa(v) + "----------" + s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" + s += strconv.FormatBool(connL.isClose) + s += "
" + } + w.Write([]byte(s)) +} + +func detail(w http.ResponseWriter, r *http.Request) { + id := r.FormValue("id") + label := r.FormValue("label") + logs.Warn(label) + i, _ := strconv.Atoi(id) + var v *connLog + if label == "nps" { + v, _ = copyMaps[i] + } + if label == "npc" { + v, _ = copyMapc[i] + } + var s string + if v != nil { + for i, vv := range v.logs { + s += "

" + strconv.Itoa(i+1) + ":" + vv + "

" + } + } + w.Write([]byte(s)) +} diff --git a/lib/mux/web_test.go b/lib/mux/web_test.go new file mode 100644 index 0000000..91a0430 --- /dev/null +++ b/lib/mux/web_test.go @@ -0,0 +1,7 @@ +package mux + +import "testing" + +func TestWeb(t *testing.T) { + NewLogServer() +} diff --git a/lib/pool/pool.go b/lib/pool/pool.go deleted file mode 100644 index ace8b07..0000000 --- a/lib/pool/pool.go +++ /dev/null @@ -1,60 +0,0 @@ -package pool - -import ( - "sync" -) - -const PoolSize = 64 * 1024 -const PoolSizeSmall = 100 -const PoolSizeUdp = 1472 -const PoolSizeCopy = 32 << 10 - -var BufPool = sync.Pool{ - New: func() interface{} { - return make([]byte, PoolSize) - }, -} - -var BufPoolUdp = sync.Pool{ - New: func() interface{} { - return make([]byte, PoolSizeUdp) - }, -} -var BufPoolMax = sync.Pool{ - New: func() interface{} { - return make([]byte, PoolSize) - }, -} -var BufPoolSmall = sync.Pool{ - New: func() interface{} { - return make([]byte, PoolSizeSmall) - }, -} -var BufPoolCopy = sync.Pool{ - New: func() interface{} { - buf := make([]byte, PoolSizeCopy) - return &buf - }, -} - -func PutBufPoolUdp(buf []byte) { - if cap(buf) == PoolSizeUdp { - BufPoolUdp.Put(buf[:PoolSizeUdp]) - } -} - -func PutBufPoolCopy(buf []byte) { - if cap(buf) == PoolSizeCopy { - BufPoolCopy.Put(&buf) - } -} - -func GetBufPoolCopy() []byte { - return (*BufPoolCopy.Get().(*[]byte))[:PoolSizeCopy] -} - -func PutBufPoolMax(buf []byte) { - if cap(buf) == PoolSize { - BufPoolMax.Put(buf[:PoolSize]) - } -} diff --git a/lib/version/version.go b/lib/version/version.go index 902b30b..1f14ef1 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -1,8 +1,8 @@ package version -const VERSION = "0.23.2" +const VERSION = "0.24.0" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { - return "0.21.0" + return "0.24.0" } diff --git a/server/proxy/p2p.go b/server/proxy/p2p.go index dc6eb07..b789ced 100644 --- a/server/proxy/p2p.go +++ b/server/proxy/p2p.go @@ -7,7 +7,6 @@ import ( "github.com/astaxie/beego/logs" "github.com/cnlh/nps/lib/common" - "github.com/cnlh/nps/lib/pool" ) type P2PServer struct { @@ -37,7 +36,7 @@ func (s *P2PServer) Start() error { return err } for { - buf := pool.BufPoolUdp.Get().([]byte) + buf := common.BufPoolUdp.Get().([]byte) n, addr, err := s.listener.ReadFromUDP(buf) if err != nil { if strings.Contains(err.Error(), "use of closed network connection") { diff --git a/server/proxy/udp.go b/server/proxy/udp.go index d2bc130..958e3c2 100755 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -9,7 +9,6 @@ import ( "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/file" - "github.com/cnlh/nps/lib/pool" ) type UdpModeServer struct { @@ -34,7 +33,7 @@ func (s *UdpModeServer) Start() error { if err != nil { return err } - buf := pool.BufPoolUdp.Get().([]byte) + buf := common.BufPoolUdp.Get().([]byte) for { n, addr, err := s.listener.ReadFromUDP(buf) if err != nil { @@ -60,8 +59,8 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) { return } else { s.task.Flow.Add(int64(len(data)), 0) - buf := pool.BufPoolUdp.Get().([]byte) - defer pool.BufPoolUdp.Put(buf) + buf := common.BufPoolUdp.Get().([]byte) + defer common.BufPoolUdp.Put(buf) target.Write(data) s.task.Flow.Add(int64(len(data)), 0) if n, err := target.Read(buf); err != nil {