From 1f8e4410906a8e5a557535078818e22dff7bf35f Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Sat, 12 Oct 2019 22:56:37 +0800 Subject: [PATCH] multiple changes --- README.md | 11 ++++++++- client/client.go | 5 ++++ client/control.go | 9 +++++-- cmd/nps/nps.go | 2 +- lib/common/const.go | 1 + lib/common/netpackager.go | 9 +++++-- lib/common/pool.go | 11 ++------- lib/common/util.go | 3 +++ lib/install/install.go | 38 +++++++++++++++++++++++++--- lib/mux/conn.go | 8 ++++++ lib/mux/mux.go | 17 +++++++++++-- lib/mux/mux_test.go | 52 +++++++++++++++++++++++++++++++++++++++ lib/mux/queue.go | 39 ++++++++++++++++++++--------- lib/version/version.go | 4 +-- 14 files changed, 176 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 1dd7f77..f56c32a 100644 --- a/README.md +++ b/README.md @@ -197,6 +197,9 @@ nps是一款轻量级、高性能、功能强大的**内网穿透**代理服务 - 在刚才创建的客户端隧道管理中添加一条socks5代理,填写监听的端口(8003),保存。 - 在外网环境的本机配置socks5代理(例如使用proxifier进行全局代理),ip为公网服务器ip(1.1.1.1),端口为填写的监听端口(8003),即可畅享内网了 +**注意** +经过socks5代理,当收到socks5数据包时socket已经是accept状态。表现是扫描端口全open,建立连接后短时间关闭。若想同内网表现一致,建议远程连接一台设备。 + ### http正向代理 **适用范围:** 在外网环境下使用http正向代理访问内网站点 @@ -375,7 +378,13 @@ server { ``` (./nps|nps.exe) install ``` -安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行 +安装成功后,对于linux,darwin,将会把配置文件和静态文件放置于/etc/nps/,并将可执行文件nps复制到/usr/bin/nps或者/usr/local/bin/nps,安装成功后可在任何位置执行,同时也会添加systemd配置。 + +``` +sudo systemctl enable|disable|start|stop|restart|status nps +``` +systemd,带有开机自启,自动重启配置,当进程结束后15秒会启动,日志输出至/var/log/nps/nps.log。 +建议采用此方式启动,能够捕获panic信息,便于排查问题。 ``` nps test|start|stop|restart|status diff --git a/client/client.go b/client/client.go index 52da907..5bd0b01 100755 --- a/client/client.go +++ b/client/client.go @@ -49,6 +49,11 @@ retry: time.Sleep(time.Second * 5) goto retry } + if c == nil { + logs.Error("Error data from server, and will be reconnected in five seconds") + time.Sleep(time.Second * 5) + goto retry + } logs.Info("Successful connection with server %s", s.svrAddr) //monitor the connection go s.ping() diff --git a/client/control.go b/client/control.go index 5673f14..3260113 100644 --- a/client/control.go +++ b/client/control.go @@ -223,8 +223,13 @@ func NewConn(tp string, vkey string, server string, connType string, proxyUrl st if _, err := c.Write([]byte(crypt.Md5(version.GetVersion()))); err != nil { return nil, err } - if b, err := c.GetShortContent(32); err != nil || crypt.Md5(version.GetVersion()) != string(b) { - logs.Error("The client does not match the server version. The current version of the client is", version.GetVersion()) + b, err := c.GetShortContent(32) + if err != nil { + logs.Error(err) + return nil, err + } + if crypt.Md5(version.GetVersion()) != string(b) { + logs.Error("The client does not match the server version. The current core version of the client is", version.GetVersion()) return nil, err } if _, err := c.Write([]byte(common.Getverifyval(vkey))); err != nil { diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index f66fe66..22835a2 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -61,7 +61,7 @@ func main() { logs.Error("Getting bridge_port error", err) os.Exit(0) } - logs.Info("the version of server is %s ,allow client version to be %s", version.VERSION, version.GetVersion()) + logs.Info("the version of server is %s ,allow client core version to be %s", version.VERSION, version.GetVersion()) connection.InitConnectionService() crypt.InitTls(filepath.Join(common.GetRunPath(), "conf", "server.pem"), filepath.Join(common.GetRunPath(), "conf", "server.key")) tool.InitAllowPort() diff --git a/lib/common/const.go b/lib/common/const.go index b4eac1a..f57ce4f 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -49,4 +49,5 @@ const ( MUX_PING_RETURN MUX_PING int32 = -1 MAXIMUM_SEGMENT_SIZE = PoolSizeWindow + MAXIMUM_WINDOW_SIZE = 1<<31 - 1 ) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 69ce96a..567a48f 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -158,8 +158,10 @@ func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (e Self.Flag = flag Self.Id = id switch flag { - case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: + case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: + Self.Content = WindowBuff.Get() err = Self.BasePackager.NewPac(content...) + //logs.Warn(Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: // MUX_MSG_SEND_OK contains two data switch content[0].(type) { @@ -190,6 +192,7 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { switch Self.Flag { case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: err = Self.BasePackager.Pack(writer) + WindowBuff.Put(Self.Content) case MUX_MSG_SEND_OK: err = binary.Write(writer, binary.LittleEndian, Self.Window) if err != nil { @@ -201,7 +204,6 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { } func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { - Self.BasePackager.clean() // also clean the content err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return @@ -212,7 +214,10 @@ func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { } switch Self.Flag { case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: + Self.Content = WindowBuff.Get() // need get a window buf from pool + Self.BasePackager.clean() // also clean the content err = Self.BasePackager.UnPack(reader) + //logs.Warn("unpack", Self.Length, string(Self.Content)) case MUX_MSG_SEND_OK: err = binary.Read(reader, binary.LittleEndian, &Self.Window) if err != nil { diff --git a/lib/common/pool.go b/lib/common/pool.go index 24efc60..240f7f9 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -104,11 +104,7 @@ func (Self *windowBufferPool) Get() (buf []byte) { } func (Self *windowBufferPool) Put(x []byte) { - if len(x) == PoolSizeWindow { - Self.pool.Put(x[:PoolSizeWindow]) // make buf to full - } else { - x = nil - } + Self.pool.Put(x[:PoolSizeWindow]) // make buf to full } type bufferPool struct { @@ -146,13 +142,10 @@ func (Self *muxPackagerPool) New() { } func (Self *muxPackagerPool) Get() *MuxPackager { - pack := Self.pool.Get().(*MuxPackager) - pack.Content = WindowBuff.Get() - return pack + return Self.pool.Get().(*MuxPackager) } func (Self *muxPackagerPool) Put(pack *MuxPackager) { - WindowBuff.Put(pack.Content) Self.pool.Put(pack) } diff --git a/lib/common/util.go b/lib/common/util.go index dc9afbe..e3dfb4f 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -268,6 +268,9 @@ func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { defer CopyBuff.Put(buf) for { nr, er := src.Read(buf) + //if len(pr)>0 && pr[0] && nr > 50 { + // logs.Warn(string(buf[:50])) + //} if nr > 0 { nw, ew := dst.Write(buf[0:nr]) if nw > 0 { diff --git a/lib/install/install.go b/lib/install/install.go index 56f3cc5..24af9b9 100644 --- a/lib/install/install.go +++ b/lib/install/install.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "io" + "io/ioutil" "log" "os" "path/filepath" @@ -13,6 +14,23 @@ import ( ) func InstallNps() { + unit := `[Unit] +Description=nps - convenient proxy server +Documentation=https://github.com/cnlh/nps/ +After=network-online.target remote-fs.target nss-lookup.target +Wants=network-online.target` + service := `[Service] +Type=simple +KillMode=process +Restart=always +RestartSec=15s +StandardOutput=append:/var/log/nps/nps.log +ExecStartPre=/bin/echo 'Starting nps' +ExecStopPost=/bin/echo 'Stopping nps' +ExecStart=` + install := `[Install] +WantedBy=multi-user.target` + path := common.GetInstallPath() if common.FileExists(path) { log.Fatalf("the path %s has exist, does not support install", path) @@ -35,21 +53,35 @@ func InstallNps() { log.Fatalln(err) } else { os.Chmod("/usr/local/bin/nps", 0755) + service += "/usr/local/bin/nps" log.Println("Executable files have been copied to", "/usr/local/bin/nps") } } else { os.Chmod("/usr/bin/nps", 0755) + service += "/usr/bin/nps" log.Println("Executable files have been copied to", "/usr/bin/nps") } - + systemd := unit + "\n\n" + service + "\n\n" + install + _ = os.Remove("/usr/lib/systemd/system/nps.service") + err := ioutil.WriteFile("/usr/lib/systemd/system/nps.service", []byte(systemd), 0644) + if err != nil { + log.Println("Write systemd service err ", err) + } + _ = os.Mkdir("/var/log/nps", 644) } log.Println("install ok!") log.Println("Static files and configuration files in the current directory will be useless") log.Println("The new configuration file is located in", path, "you can edit them") if !common.IsWindows() { - log.Println("You can start with nps test|start|stop|restart|status anywhere") + log.Println(`You can start with: +sudo systemctl enable|disable|start|stop|restart|status nps +or: +nps test|start|stop|restart|status +anywhere!`) } else { - log.Println("You can copy executable files to any directory and start working with nps.exe test|start|stop|restart|status") + log.Println(`You can copy executable files to any directory and start working with: +nps.exe test|start|stop|restart|status +now!`) } } func MkidrDirAll(path string, v ...string) { diff --git a/lib/mux/conn.go b/lib/mux/conn.go index 5dd69ea..c4b47f3 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -183,6 +183,10 @@ func (Self *ReceiveWindow) CalcSize() { n = Self.bufQueue.Len() } // set the minimal size + if n > common.MAXIMUM_WINDOW_SIZE { + n = common.MAXIMUM_WINDOW_SIZE + } + // set the maximum size //logs.Warn("n", n) Self.maxSize = n Self.count = -5 @@ -248,6 +252,10 @@ copyData: l = 0 Self.bw.EndRead() Self.sendStatus(id) + if Self.off == uint32(Self.element.l) { + //logs.Warn("put the element end ", string(Self.element.buf[:15])) + common.WindowBuff.Put(Self.element.buf) + } if pOff < len(p) && Self.element.part { // element is a part of the segments, trying to fill up buf p goto copyData diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 9023b82..529b7dc 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -25,6 +25,7 @@ type Mux struct { pingOk int latency float64 pingCh chan []byte + pingTimer *time.Timer connType string writeQueue PriorityQueue bufCh chan *bytes.Buffer @@ -42,6 +43,7 @@ func NewMux(c net.Conn, connType string) *Mux { connType: connType, bufCh: make(chan *bytes.Buffer), pingCh: make(chan []byte), + pingTimer: time.NewTimer(15 * time.Second), } m.writeQueue.New() //read session by flag @@ -119,6 +121,7 @@ func (s *Mux) packBuf() { common.BuffPool.Put(buffer) break } + //logs.Warn(buffer.String()) select { case s.bufCh <- buffer: case <-s.closeChan: @@ -153,7 +156,7 @@ func (s *Mux) ping() { now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) // send the ping flag and get the latency first - ticker := time.NewTicker(time.Second * 15) + ticker := time.NewTicker(time.Second * 5) for { if s.IsClose { ticker.Stop() @@ -168,6 +171,10 @@ func (s *Mux) ping() { } now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) + if !s.pingTimer.Stop() { + <-s.pingTimer.C + } + s.pingTimer.Reset(15 * time.Second) if s.pingOk > 10 && s.connType == "kcp" { s.Close() break @@ -186,10 +193,15 @@ func (s *Mux) pingReturn() { case data = <-s.pingCh: case <-s.closeChan: break + case <-s.pingTimer.C: + logs.Error("mux: ping time out") + s.Close() + break } _ = now.UnmarshalText(data) s.latency = time.Now().UTC().Sub(now).Seconds() / 2 - //logs.Warn("latency", s.latency) + logs.Warn("latency", s.latency) + common.WindowBuff.Put(data) if s.latency <= 0 { logs.Warn("latency err", s.latency) } @@ -218,6 +230,7 @@ func (s *Mux) readSession() { continue case common.MUX_PING_FLAG: //ping s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) + common.WindowBuff.Put(pack.Content) continue case common.MUX_PING_RETURN: s.pingCh <- pack.Content diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index c7b10e0..abc4eb4 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -1,8 +1,11 @@ package mux import ( + "bufio" + "fmt" "net" "net/http" + "net/http/httputil" _ "net/http/pprof" "sync" "testing" @@ -37,6 +40,7 @@ func TestNewMux(t *testing.T) { c2, err := net.Dial("tcp", "127.0.0.1:80") if err != nil { logs.Warn(err) + c.Close() continue } go func(c2 net.Conn, c net.Conn) { @@ -107,6 +111,9 @@ func TestNewMux(t *testing.T) { } }() + time.Sleep(time.Second * 5) + //go test_request() + for { time.Sleep(time.Second * 5) } @@ -135,6 +142,51 @@ func client() { } } +func test_request() { + conn, _ := net.Dial("tcp", "127.0.0.1:7777") + for { + conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 +Host: 127.0.0.1:7777 +Connection: keep-alive + + +`)) + r, err := http.ReadResponse(bufio.NewReader(conn), nil) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn("read response success", r) + b, err := httputil.DumpResponse(r, true) + if err != nil { + logs.Warn("close by dump response err", err) + break + } + fmt.Println(string(b[:20]), err) + time.Sleep(time.Second) + } +} + +func test_raw() { + conn, _ := net.Dial("tcp", "127.0.0.1:7777") + for { + conn.Write([]byte(`GET /videojs5/test HTTP/1.1 +Host: 127.0.0.1:7777 +Connection: keep-alive + + +`)) + buf := make([]byte, 1000000) + n, err := conn.Read(buf) + if err != nil { + logs.Warn("close by read response err", err) + break + } + logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) + time.Sleep(time.Second) + } +} + func TestNewConn(t *testing.T) { buf := common.GetBufPoolCopy() logs.Warn(len(buf), cap(buf)) diff --git a/lib/mux/queue.go b/lib/mux/queue.go index 5a57151..a835e2a 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -51,15 +51,23 @@ func (Self *PriorityQueue) New() { func (Self *PriorityQueue) Push(packager *common.MuxPackager) { Self.mutex.Lock() - if Self.popWait { - defer Self.allowPop() - } - if packager.Flag == common.MUX_CONN_CLOSE { - Self.insert(packager) // the close package may need priority, - // prevent wait too long to close - } else { + switch packager.Flag { + case common.MUX_PING_FLAG, common.MUX_PING_RETURN: + Self.list.PushFront(packager) + // the ping package need highest priority + // prevent ping calculation error + case common.MUX_CONN_CLOSE: + Self.insert(packager) + // the close package may need priority too, set second + // prevent wait too long to close conn + default: Self.list.PushBack(packager) } + if Self.popWait { + Self.mutex.Unlock() + Self.allowPop() + return + } Self.mutex.Unlock() return } @@ -68,7 +76,14 @@ func (Self *PriorityQueue) insert(packager *common.MuxPackager) { element := Self.list.Back() for { if element == nil { // PriorityQueue dose not have any of msg package with this close package id - Self.list.PushFront(packager) // insert close package to first + element = Self.list.Front() + if element != nil { + Self.list.InsertAfter(packager, element) + // insert close package to second + } else { + Self.list.PushFront(packager) + // list is empty, push to front + } break } if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG && @@ -136,11 +151,13 @@ func (Self *FIFOQueue) New() { func (Self *FIFOQueue) Push(element *ListElement) { Self.mutex.Lock() - if Self.popWait { - defer Self.allowPop() - } Self.list = append(Self.list, element) Self.length += uint32(element.l) + if Self.popWait { + Self.mutex.Unlock() + Self.allowPop() + return + } Self.mutex.Unlock() return } diff --git a/lib/version/version.go b/lib/version/version.go index 902b30b..4cc0532 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -1,8 +1,8 @@ package version -const VERSION = "0.23.2" +const VERSION = "0.23.3" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { - return "0.21.0" + return "0.22.0" }