From fe806500e9bbabfb151d80d68b1a34049ad32863 Mon Sep 17 00:00:00 2001 From: hp Date: Wed, 3 Feb 2016 11:07:36 +0800 Subject: [PATCH 01/15] add pcrupto.go --- pkg/utils/pcrypto/pcrypto.go | 87 ++++++++++++++++++++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 pkg/utils/pcrypto/pcrypto.go diff --git a/pkg/utils/pcrypto/pcrypto.go b/pkg/utils/pcrypto/pcrypto.go new file mode 100644 index 0000000..729db90 --- /dev/null +++ b/pkg/utils/pcrypto/pcrypto.go @@ -0,0 +1,87 @@ +package pcrypto + +import ( + "bytes" + "compress/gzip" + "crypto/aes" + "crypto/cipher" + "encoding/base64" + "encoding/hex" + "errors" + "fmt" + "io/ioutil" +) + +type Pcrypto struct { + pkey []byte + paes cipher.Block +} + +func (pc *Pcrypto) Init(key []byte) error { + var err error + pc.pkey = PKCS7Padding(key, aes.BlockSize) + pc.paes, err = aes.NewCipher(pc.pkey) + + return err +} + +func (pc *Pcrypto) Encrypto(src []byte) ([]byte, error) { + // aes + src = PKCS7Padding(src, aes.BlockSize) + blockMode := cipher.NewCBCEncrypter(pc.paes, pc.pkey) + crypted := make([]byte, len(src)) + blockMode.CryptBlocks(crypted, src) + + // gzip + var zbuf bytes.Buffer + zwr := gzip.NewWriter(&zbuf) + defer zwr.Close() + zwr.Write(crypted) + zwr.Flush() + + // base64 + return []byte(base64.StdEncoding.EncodeToString(zbuf.Bytes())), nil +} + +func (pc *Pcrypto) Decrypto(str []byte) ([]byte, error) { + // base64 + data, err := base64.StdEncoding.DecodeString(string(str)) + if err != nil { + return nil, err + } + + // gunzip + zbuf := bytes.NewBuffer(data) + zrd, _ := gzip.NewReader(zbuf) + defer zrd.Close() + data, _ = ioutil.ReadAll(zrd) + + // aes + decryptText, err := hex.DecodeString(fmt.Sprintf("%x", data)) + if err != nil { + return nil, err + } + + if len(decryptText)%aes.BlockSize != 0 { + return nil, errors.New("crypto/cipher: ciphertext is not a multiple of the block size") + } + + blockMode := cipher.NewCBCDecrypter(pc.paes, pc.pkey) + + blockMode.CryptBlocks(decryptText, decryptText) + decryptText = PKCS7UnPadding(decryptText) + + return decryptText, nil +} + +func PKCS7Padding(ciphertext []byte, blockSize int) []byte { + padding := blockSize - len(ciphertext)%blockSize + padtext := bytes.Repeat([]byte{byte(padding)}, padding) + return append(ciphertext, padtext...) +} + +func PKCS7UnPadding(origData []byte) []byte { + length := len(origData) + unpadding := int(origData[length-1]) + return origData[:(length - unpadding)] +} From b77738e619d3474de3511332cfb65f214f2d9480 Mon Sep 17 00:00:00 2001 From: hp Date: Wed, 3 Feb 2016 11:46:12 +0800 Subject: [PATCH 02/15] add ppcrypto_test --- pkg/utils/pcrypto/pcrypto_test.go | 47 +++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 pkg/utils/pcrypto/pcrypto_test.go diff --git a/pkg/utils/pcrypto/pcrypto_test.go b/pkg/utils/pcrypto/pcrypto_test.go new file mode 100644 index 0000000..f83c003 --- /dev/null +++ b/pkg/utils/pcrypto/pcrypto_test.go @@ -0,0 +1,47 @@ +package pcrypto + +import ( + "crypto/aes" + "fmt" + "testing" +) + +func Test_Encrypto(t *testing.T) { + pp := new(Pcrypto) + pp.Init([]byte("Hana")) + res, err := pp.Encrypto([]byte("Just One Test!")) + if err != nil { + t.Error(err) + } + + fmt.Printf("[%x]\n", res) +} + +func Test_Decrypto(t *testing.T) { + pp := new(Pcrypto) + pp.Init([]byte("Hana")) + res, err := pp.Encrypto([]byte("Just One Test!")) + if err != nil { + t.Error(err) + } + + res, err = pp.Decrypto(res) + if err != nil { + t.Error(err) + } + + fmt.Printf("[%s]\n", string(res)) +} + +func Test_PKCS7Padding(t *testing.T) { + ltt := []byte("Test_PKCS7Padding") + ltt = PKCS7Padding(ltt, aes.BlockSize) + fmt.Printf("[%x]\n", (ltt)) +} + +func Test_PKCS7UnPadding(t *testing.T) { + ltt := []byte("Test_PKCS7Padding") + ltt = PKCS7Padding(ltt, aes.BlockSize) + ltt = PKCS7UnPadding(ltt) + fmt.Printf("[%x]\n", ltt) +} From 0b719326bc456e16cf0fd42ae47ee78265ae2220 Mon Sep 17 00:00:00 2001 From: fatedier <512654112@qq.com> Date: Wed, 3 Feb 2016 18:14:16 +0800 Subject: [PATCH 03/15] Support travis-ci --- .travis.yml | 14 ++++++++++++++ Makefile | 3 +++ 2 files changed, 17 insertions(+) create mode 100644 .travis.yml diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..f41bc59 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,14 @@ +go_import_path: github.com/fatedier/frp +sudo: false +language: go + +go: + - 1.4.2 + - 1.5.2 + - tip + +install: + - make + +script: + - make test diff --git a/Makefile b/Makefile index 06a5054..682c9e4 100644 --- a/Makefile +++ b/Makefile @@ -13,3 +13,6 @@ frps: frpc: godep go build -o bin/frpc ./cmd/frpc + +test: + @godep go test ./... From 79195d71777e8ed157ff852fadcbe6ed4004cf77 Mon Sep 17 00:00:00 2001 From: fatedier <512654112@qq.com> Date: Wed, 3 Feb 2016 18:23:30 +0800 Subject: [PATCH 04/15] Change internal package import to github/fatedier --- cmd/frpc/config.go | 2 +- cmd/frpc/control.go | 6 +++--- cmd/frpc/main.go | 2 +- cmd/frps/config.go | 2 +- cmd/frps/control.go | 6 +++--- cmd/frps/main.go | 4 ++-- pkg/models/client.go | 4 ++-- pkg/models/server.go | 4 ++-- pkg/utils/conn/conn.go | 2 +- 9 files changed, 16 insertions(+), 16 deletions(-) diff --git a/cmd/frpc/config.go b/cmd/frpc/config.go index a26fe6a..c543793 100644 --- a/cmd/frpc/config.go +++ b/cmd/frpc/config.go @@ -4,7 +4,7 @@ import ( "fmt" "strconv" - "frp/pkg/models" + "github.com/fatedier/frp/pkg/models" ini "github.com/vaughan0/go-ini" ) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index e917a93..87621e6 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -5,9 +5,9 @@ import ( "sync" "encoding/json" - "frp/pkg/models" - "frp/pkg/utils/conn" - "frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/models" + "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" ) func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index 7f07282..df87873 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -4,7 +4,7 @@ import ( "os" "sync" - "frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/log" ) func main() { diff --git a/cmd/frps/config.go b/cmd/frps/config.go index feb07d5..275e9e9 100644 --- a/cmd/frps/config.go +++ b/cmd/frps/config.go @@ -4,7 +4,7 @@ import ( "fmt" "strconv" - "frp/pkg/models" + "github.com/fatedier/frp/pkg/models" ini "github.com/vaughan0/go-ini" ) diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 62d141e..18ddb0a 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -4,9 +4,9 @@ import ( "fmt" "encoding/json" - "frp/pkg/utils/log" - "frp/pkg/utils/conn" - "frp/pkg/models" + "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/models" ) func ProcessControlConn(l *conn.Listener) { diff --git a/cmd/frps/main.go b/cmd/frps/main.go index 1288622..3970341 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -3,8 +3,8 @@ package main import ( "os" - "frp/pkg/utils/log" - "frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/conn" ) func main() { diff --git a/pkg/models/client.go b/pkg/models/client.go index 1f01d50..042e75b 100644 --- a/pkg/models/client.go +++ b/pkg/models/client.go @@ -3,8 +3,8 @@ package models import ( "encoding/json" - "frp/pkg/utils/conn" - "frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" ) type ProxyClient struct { diff --git a/pkg/models/server.go b/pkg/models/server.go index bd6baa8..172a290 100644 --- a/pkg/models/server.go +++ b/pkg/models/server.go @@ -4,8 +4,8 @@ import ( "sync" "container/list" - "frp/pkg/utils/conn" - "frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" ) const ( diff --git a/pkg/utils/conn/conn.go b/pkg/utils/conn/conn.go index 60929ac..b600f39 100644 --- a/pkg/utils/conn/conn.go +++ b/pkg/utils/conn/conn.go @@ -7,7 +7,7 @@ import ( "sync" "io" - "frp/pkg/utils/log" + "github.com/fatedier/frp/pkg/utils/log" ) type Listener struct { From bdcdafd7682d8311103b5969d27891c9585c4da5 Mon Sep 17 00:00:00 2001 From: fatedier <512654112@qq.com> Date: Wed, 3 Feb 2016 18:40:46 +0800 Subject: [PATCH 05/15] Add travis-ci build status in README.md --- README.md | 3 +++ 1 file changed, 3 insertions(+) diff --git a/README.md b/README.md index f6f1b7f..766797f 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,5 @@ # frp + +[![Build Status](https://travis-ci.org/fatedier/frp.svg)](https://travis-ci.org/fatedier/frp) + A fast reverse proxy. From fa9356936b49f60eba62fa49b05869eddbf461b1 Mon Sep 17 00:00:00 2001 From: fatedier <512654112@qq.com> Date: Wed, 3 Feb 2016 18:46:24 +0800 Subject: [PATCH 06/15] Use go fmt before build --- Makefile | 5 ++++- cmd/frpc/config.go | 11 +++++------ cmd/frpc/control.go | 8 ++++---- cmd/frpc/main.go | 2 +- cmd/frps/config.go | 11 +++++------ cmd/frps/control.go | 14 +++++++------- cmd/frps/main.go | 2 +- pkg/models/client.go | 16 ++++++++-------- pkg/models/msg.go | 13 ++++++------- pkg/models/server.go | 30 +++++++++++++++--------------- pkg/utils/conn/conn.go | 18 +++++++++--------- pkg/utils/log/log.go | 2 +- 12 files changed, 66 insertions(+), 66 deletions(-) diff --git a/Makefile b/Makefile index 682c9e4..0151542 100644 --- a/Makefile +++ b/Makefile @@ -2,12 +2,15 @@ export PATH := $(GOPATH)/bin:$(PATH) all: build -build: godep frps frpc +build: godep fmt frps frpc godep: @go get github.com/tools/godep godep restore +fmt: + @godep go fmt ./... + frps: godep go build -o bin/frps ./cmd/frps diff --git a/cmd/frpc/config.go b/cmd/frpc/config.go index c543793..5374222 100644 --- a/cmd/frpc/config.go +++ b/cmd/frpc/config.go @@ -11,16 +11,15 @@ import ( // common config var ( - ServerAddr string = "0.0.0.0" - ServerPort int64 = 7000 - LogFile string = "./frpc.log" - LogLevel string = "warn" - LogWay string = "file" + ServerAddr string = "0.0.0.0" + ServerPort int64 = 7000 + LogFile string = "./frpc.log" + LogLevel string = "warn" + LogWay string = "file" ) var ProxyClients map[string]*models.ProxyClient = make(map[string]*models.ProxyClient) - func LoadConf(confFile string) (err error) { var tmpStr string var ok bool diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 87621e6..313bfcb 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -1,9 +1,9 @@ package main import ( + "encoding/json" "io" "sync" - "encoding/json" "github.com/fatedier/frp/pkg/models" "github.com/fatedier/frp/pkg/utils/conn" @@ -22,9 +22,9 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { defer c.Close() req := &models.ClientCtlReq{ - Type: models.ControlConn, - ProxyName: cli.Name, - Passwd: cli.Passwd, + Type: models.ControlConn, + ProxyName: cli.Name, + Passwd: cli.Passwd, } buf, _ := json.Marshal(req) err = c.Write(string(buf) + "\n") diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index df87873..1bb5eb3 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -3,7 +3,7 @@ package main import ( "os" "sync" - + "github.com/fatedier/frp/pkg/utils/log" ) diff --git a/cmd/frps/config.go b/cmd/frps/config.go index 275e9e9..b7564c2 100644 --- a/cmd/frps/config.go +++ b/cmd/frps/config.go @@ -11,16 +11,15 @@ import ( // common config var ( - BindAddr string = "0.0.0.0" - BindPort int64 = 9527 - LogFile string = "./frps.log" - LogLevel string = "warn" - LogWay string = "file" + BindAddr string = "0.0.0.0" + BindPort int64 = 9527 + LogFile string = "./frps.log" + LogLevel string = "warn" + LogWay string = "file" ) var ProxyServers map[string]*models.ProxyServer = make(map[string]*models.ProxyServer) - func LoadConf(confFile string) (err error) { var tmpStr string var ok bool diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 18ddb0a..4e58738 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -1,12 +1,12 @@ package main import ( - "fmt" "encoding/json" + "fmt" - "github.com/fatedier/frp/pkg/utils/log" - "github.com/fatedier/frp/pkg/utils/conn" "github.com/fatedier/frp/pkg/models" + "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" ) func ProcessControlConn(l *conn.Listener) { @@ -41,7 +41,7 @@ func controlWorker(c *conn.Conn) { clientCtlRes.Code = 1 clientCtlRes.Msg = msg } - + if needRes { buf, _ := json.Marshal(clientCtlRes) err = c.Write(string(buf) + "\n") @@ -49,7 +49,7 @@ func controlWorker(c *conn.Conn) { log.Warn("Write error, %v", err) } } else { - // work conn, just return + // work conn, just return return } @@ -96,7 +96,7 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, log.Warn(msg) return } - + // control conn if req.Type == models.ControlConn { if server.Status != models.Idle { @@ -115,7 +115,7 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, log.Info("ProxyName [%s], start proxy success", req.ProxyName) } else if req.Type == models.WorkConn { - // work conn + // work conn needRes = false if server.Status != models.Working { log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName) diff --git a/cmd/frps/main.go b/cmd/frps/main.go index 3970341..83fdc48 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -3,8 +3,8 @@ package main import ( "os" - "github.com/fatedier/frp/pkg/utils/log" "github.com/fatedier/frp/pkg/utils/conn" + "github.com/fatedier/frp/pkg/utils/log" ) func main() { diff --git a/pkg/models/client.go b/pkg/models/client.go index 042e75b..38fa9be 100644 --- a/pkg/models/client.go +++ b/pkg/models/client.go @@ -8,9 +8,9 @@ import ( ) type ProxyClient struct { - Name string - Passwd string - LocalPort int64 + Name string + Passwd string + LocalPort int64 } func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { @@ -24,7 +24,7 @@ func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err error) { c = &conn.Conn{} - defer func(){ + defer func() { if err != nil { c.Close() } @@ -37,9 +37,9 @@ func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err } req := &ClientCtlReq{ - Type: WorkConn, - ProxyName: p.Name, - Passwd: p.Passwd, + Type: WorkConn, + ProxyName: p.Name, + Passwd: p.Passwd, } buf, _ := json.Marshal(req) @@ -64,7 +64,7 @@ func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err erro } log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(), - remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) + remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) go conn.Join(localConn, remoteConn) return nil } diff --git a/pkg/models/msg.go b/pkg/models/msg.go index 0062556..a3018e7 100644 --- a/pkg/models/msg.go +++ b/pkg/models/msg.go @@ -1,8 +1,8 @@ package models type GeneralRes struct { - Code int64 `json:"code"` - Msg string `json:"msg"` + Code int64 `json:"code"` + Msg string `json:"msg"` } // type @@ -12,16 +12,15 @@ const ( ) type ClientCtlReq struct { - Type int64 `json:"type"` - ProxyName string `json:"proxy_name"` - Passwd string `json:"passwd"` + Type int64 `json:"type"` + ProxyName string `json:"proxy_name"` + Passwd string `json:"passwd"` } type ClientCtlRes struct { GeneralRes } - type ServerCtlReq struct { - Type int64 `json:"type"` + Type int64 `json:"type"` } diff --git a/pkg/models/server.go b/pkg/models/server.go index 172a290..b6bff36 100644 --- a/pkg/models/server.go +++ b/pkg/models/server.go @@ -1,8 +1,8 @@ package models import ( - "sync" "container/list" + "sync" "github.com/fatedier/frp/pkg/utils/conn" "github.com/fatedier/frp/pkg/utils/log" @@ -14,17 +14,17 @@ const ( ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - - Status int64 - Listener *conn.Listener // accept new connection from remote users - CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - CliConnChan chan *conn.Conn // get client conns from control goroutine - UserConnList *list.List // store user conns - Mutex sync.Mutex + Name string + Passwd string + BindAddr string + ListenPort int64 + + Status int64 + Listener *conn.Listener // accept new connection from remote users + CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + CliConnChan chan *conn.Conn // get client conns from control goroutine + UserConnList *list.List // store user conns + Mutex sync.Mutex } func (p *ProxyServer) Init() { @@ -55,7 +55,7 @@ func (p *ProxyServer) Start() (err error) { go func() { for { // block - c := p.Listener.GetConn() + c := p.Listener.GetConn() log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr()) // put to list @@ -93,7 +93,7 @@ func (p *ProxyServer) Start() (err error) { // msg will transfer to another without modifying log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(), - userConn.GetLocalAddr(), userConn.GetRemoteAddr()) + userConn.GetLocalAddr(), userConn.GetRemoteAddr()) go conn.Join(cliConn, userConn) } }() @@ -112,5 +112,5 @@ func (p *ProxyServer) Close() { func (p *ProxyServer) WaitUserConn() (res int64) { res = <-p.CtlMsgChan - return + return } diff --git a/pkg/utils/conn/conn.go b/pkg/utils/conn/conn.go index b600f39..f8e352f 100644 --- a/pkg/utils/conn/conn.go +++ b/pkg/utils/conn/conn.go @@ -1,18 +1,18 @@ package conn import ( + "bufio" "fmt" + "io" "net" - "bufio" "sync" - "io" "github.com/fatedier/frp/pkg/utils/log" ) type Listener struct { - Addr net.Addr - Conns chan *Conn + Addr net.Addr + Conns chan *Conn } // wait util get one @@ -22,8 +22,8 @@ func (l *Listener) GetConn() (conn *Conn) { } type Conn struct { - TcpConn *net.TCPConn - Reader *bufio.Reader + TcpConn *net.TCPConn + Reader *bufio.Reader } func (c *Conn) ConnectServer(host string, port int64) (err error) { @@ -70,8 +70,8 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { } l = &Listener{ - Addr: listener.Addr(), - Conns: make(chan *Conn), + Addr: listener.Addr(), + Conns: make(chan *Conn), } go func() { @@ -83,7 +83,7 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { } c := &Conn{ - TcpConn: conn, + TcpConn: conn, } c.Reader = bufio.NewReader(c.TcpConn) l.Conns <- c diff --git a/pkg/utils/log/log.go b/pkg/utils/log/log.go index 1a55c3c..f6587cd 100644 --- a/pkg/utils/log/log.go +++ b/pkg/utils/log/log.go @@ -22,7 +22,7 @@ func SetLogFile(logWay string, logFile string) { if logWay == "console" { Log.SetLogger("console", "") } else { - Log.SetLogger("file", `{"filename": "` + logFile + `"}`) + Log.SetLogger("file", `{"filename": "`+logFile+`"}`) } } From f065562ec3e570dda5f560840653ae761a962f67 Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Thu, 4 Feb 2016 10:55:10 +0800 Subject: [PATCH 07/15] =?UTF-8?q?(1)=E5=88=86=E7=A6=BB=E5=87=BA=E8=BF=9E?= =?UTF-8?q?=E6=8E=A5=E6=9C=8D=E5=8A=A1=E5=99=A8=E7=9A=84=E9=80=BB=E8=BE=91?= =?UTF-8?q?=20=20(2)=E6=96=B0=E5=A2=9Eclient=E6=96=AD=E7=BA=BF=E9=87=8D?= =?UTF-8?q?=E8=BF=9E=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/frpc/control.go | 115 ++++++++++++++++++++++++++++++-------------- 1 file changed, 79 insertions(+), 36 deletions(-) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index e917a93..78facb2 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -1,62 +1,54 @@ package main import ( + "encoding/json" "io" "sync" - "encoding/json" + "time" "frp/pkg/models" "frp/pkg/utils/conn" "frp/pkg/utils/log" ) +// 重连时的间隔时间区间 +const ( + sleepMinDuration = 1 + sleepMaxDuration = 60 +) + func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() - c := &conn.Conn{} - err := c.ConnectServer(ServerAddr, ServerPort) - if err != nil { - log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, ServerAddr, ServerPort, err) + c := loginToServer(cli) + if c == nil { + log.Error("ProxyName [%s], connect to server failed!", cli.Name) return } defer c.Close() - req := &models.ClientCtlReq{ - Type: models.ControlConn, - ProxyName: cli.Name, - Passwd: cli.Passwd, - } - buf, _ := json.Marshal(req) - err = c.Write(string(buf) + "\n") - if err != nil { - log.Error("ProxyName [%s], write to server error, %v", cli.Name, err) - return - } - - res, err := c.ReadLine() - if err != nil { - log.Error("ProxyName [%s], read from server error, %v", cli.Name, err) - return - } - log.Debug("ProxyName [%s], read [%s]", cli.Name, res) - - clientCtlRes := &models.ClientCtlRes{} - if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil { - log.Error("ProxyName [%s], format server response error, %v", cli.Name, err) - return - } - - if clientCtlRes.Code != 0 { - log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg) - return - } - for { // ignore response content now _, err := c.ReadLine() if err == io.EOF { + // reconnect when disconnect log.Debug("ProxyName [%s], server close this control conn", cli.Name) - break + var sleepTime time.Duration = 1 + for { + log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, ServerAddr, ServerPort) + tmpConn := loginToServer(cli) + if tmpConn != nil { + c.Close() + c = tmpConn + break + } + + if sleepTime < 60 { + sleepTime++ + } + time.Sleep(sleepTime * time.Second) + } + continue } else if err != nil { log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err) continue @@ -65,3 +57,54 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { cli.StartTunnel(ServerAddr, ServerPort) } } + +func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { + c := &conn.Conn{} + + connection = nil + for i := 0; i < 1; i++ { // ZWF: 此处的for作为控制流使用 + err := c.ConnectServer(ServerAddr, ServerPort) + if err != nil { + log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, ServerAddr, ServerPort, err) + break + } + + req := &models.ClientCtlReq{ + Type: models.ControlConn, + ProxyName: cli.Name, + Passwd: cli.Passwd, + } + buf, _ := json.Marshal(req) + err = c.Write(string(buf) + "\n") + if err != nil { + log.Error("ProxyName [%s], write to server error, %v", cli.Name, err) + break + } + + res, err := c.ReadLine() + if err != nil { + log.Error("ProxyName [%s], read from server error, %v", cli.Name, err) + break + } + log.Debug("ProxyName [%s], read [%s]", cli.Name, res) + + clientCtlRes := &models.ClientCtlRes{} + if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil { + log.Error("ProxyName [%s], format server response error, %v", cli.Name, err) + break + } + + if clientCtlRes.Code != 0 { + log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg) + break + } + + connection = c + } + + if connection == nil { + c.Close() + } + + return +} From af6fc61537eaa8012c0091a0eb18353de9604c2c Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Thu, 4 Feb 2016 11:29:04 +0800 Subject: [PATCH 08/15] =?UTF-8?q?(1)=E6=96=B0=E5=A2=9Eclient=E5=90=91serve?= =?UTF-8?q?r=E5=8F=91=E9=80=81=E5=BF=83=E8=B7=B3=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/frpc/control.go | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 78facb2..332a06c 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -11,12 +11,13 @@ import ( "frp/pkg/utils/log" ) -// 重连时的间隔时间区间 const ( - sleepMinDuration = 1 - sleepMaxDuration = 60 + heartbeatDuration = 2 //心跳检测时间间隔,单位秒 ) +// client与server之间连接的保护锁 +var connProtect sync.Mutex + func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -27,11 +28,13 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { } defer c.Close() + go startHeartBeat(c) + for { // ignore response content now _, err := c.ReadLine() if err == io.EOF { - // reconnect when disconnect + connProtect.Lock() // 除了这里,其他地方禁止对连接进行任何操作 log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 for { @@ -48,6 +51,7 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { } time.Sleep(sleepTime * time.Second) } + connProtect.Unlock() continue } else if err != nil { log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err) @@ -108,3 +112,16 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { return } + +func startHeartBeat(con *conn.Conn) { + for { + time.Sleep(heartbeatDuration * time.Second) + + connProtect.Lock() + err := con.Write("\r\n") + connProtect.Unlock() + if err != nil { + log.Error("Send hearbeat to server failed! Err:%s", err.Error()) + } + } +} From 5d6f37aa82b8e0a5e3cbcb78b6febcb8a44299e0 Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Fri, 5 Feb 2016 14:18:26 +0800 Subject: [PATCH 09/15] =?UTF-8?q?(1)=E4=BC=98=E5=8C=96=E9=87=8D=E8=BF=9E?= =?UTF-8?q?=E5=92=8C=E5=BF=83=E8=B7=B3=E6=A3=80=E6=B5=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 2 ++ cmd/frpc/control.go | 25 +++++++++--------- cmd/frps/control.go | 59 +++++++++++++++++++++++++++++++++++------- pkg/models/server.go | 44 +++++++++++++++++++------------ pkg/utils/conn/conn.go | 22 +++++++++------- 5 files changed, 104 insertions(+), 48 deletions(-) diff --git a/.gitignore b/.gitignore index fab4548..72c9fd3 100644 --- a/.gitignore +++ b/.gitignore @@ -26,3 +26,5 @@ _testmain.go # Self bin/ +# Cache +*.swp diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 332a06c..3bca709 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -15,8 +15,7 @@ const ( heartbeatDuration = 2 //心跳检测时间间隔,单位秒 ) -// client与server之间连接的保护锁 -var connProtect sync.Mutex +var isHeartBeatContinue bool = true func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -28,13 +27,11 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { } defer c.Close() - go startHeartBeat(c) - for { // ignore response content now _, err := c.ReadLine() if err == io.EOF { - connProtect.Lock() // 除了这里,其他地方禁止对连接进行任何操作 + isHeartBeatContinue = false log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 for { @@ -51,7 +48,6 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { } time.Sleep(sleepTime * time.Second) } - connProtect.Unlock() continue } else if err != nil { log.Warn("ProxyName [%s], read from server error, %v", cli.Name, err) @@ -104,6 +100,8 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { } connection = c + go startHeartBeat(connection) + log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, ServerAddr, ServerPort) } if connection == nil { @@ -114,14 +112,17 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { } func startHeartBeat(con *conn.Conn) { + isHeartBeatContinue = true for { time.Sleep(heartbeatDuration * time.Second) - - connProtect.Lock() - err := con.Write("\r\n") - connProtect.Unlock() - if err != nil { - log.Error("Send hearbeat to server failed! Err:%s", err.Error()) + if isHeartBeatContinue { // 把isHeartBeatContinue放在这里是为了防止SIGPIPE + err := con.Write("\r\n") + //log.Debug("send heart beat to server!") + if err != nil { + log.Error("Send hearbeat to server failed! Err:%s", err.Error()) + } + } else { + break } } } diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 62d141e..d6cfaa3 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -1,12 +1,14 @@ package main import ( - "fmt" "encoding/json" + "fmt" + "io" + "time" - "frp/pkg/utils/log" - "frp/pkg/utils/conn" "frp/pkg/models" + "frp/pkg/utils/conn" + "frp/pkg/utils/log" ) func ProcessControlConn(l *conn.Listener) { @@ -19,6 +21,8 @@ func ProcessControlConn(l *conn.Listener) { // control connection from every client and server func controlWorker(c *conn.Conn) { + defer c.Close() + // the first message is from client to server // if error, close connection res, err := c.ReadLine() @@ -41,19 +45,20 @@ func controlWorker(c *conn.Conn) { clientCtlRes.Code = 1 clientCtlRes.Msg = msg } - + if needRes { buf, _ := json.Marshal(clientCtlRes) err = c.Write(string(buf) + "\n") if err != nil { log.Warn("Write error, %v", err) + time.Sleep(1 * time.Second) + return } } else { - // work conn, just return + // work conn, just return return } - defer c.Close() // others is from server to client server, ok := ProxyServers[clientCtlReq.ProxyName] if !ok { @@ -61,10 +66,16 @@ func controlWorker(c *conn.Conn) { return } + // read control msg from client + go readControlMsgFromClient(server, c) + serverCtlReq := &models.ClientCtlReq{} serverCtlReq.Type = models.WorkConn for { - server.WaitUserConn() + _, isStop := server.WaitUserConn() + if isStop { + break + } buf, _ := json.Marshal(serverCtlReq) err = c.Write(string(buf) + "\n") if err != nil { @@ -76,6 +87,7 @@ func controlWorker(c *conn.Conn) { log.Debug("ProxyName [%s], write to client to add work conn success", server.Name) } + log.Error("ProxyName [%s], I'm dead!", server.Name) return } @@ -96,7 +108,7 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, log.Warn(msg) return } - + // control conn if req.Type == models.ControlConn { if server.Status != models.Idle { @@ -115,7 +127,7 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, log.Info("ProxyName [%s], start proxy success", req.ProxyName) } else if req.Type == models.WorkConn { - // work conn + // work conn needRes = false if server.Status != models.Working { log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName) @@ -132,3 +144,32 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, succ = true return } + +func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) { + isContinueRead := true + f := func() { + isContinueRead = false + server.StopWaitUserConn() + } + timer := time.AfterFunc(10*time.Second, f) + defer timer.Stop() + + for isContinueRead { + content, err := c.ReadLine() + //log.Debug("Receive msg from client! content:%s", content) + if err != nil { + if err == io.EOF { + log.Warn("Server detect client[%s] is dead!", server.Name) + server.StopWaitUserConn() + break + } + log.Error("ProxyName [%s], read error:%s", server.Name, err.Error()) + continue + } + + if content == "\r\n" { + log.Debug("receive hearbeat:%s", content) + timer.Reset(10 * time.Second) + } + } +} diff --git a/pkg/models/server.go b/pkg/models/server.go index bd6baa8..2c503dd 100644 --- a/pkg/models/server.go +++ b/pkg/models/server.go @@ -1,8 +1,8 @@ package models import ( - "sync" "container/list" + "sync" "frp/pkg/utils/conn" "frp/pkg/utils/log" @@ -14,22 +14,24 @@ const ( ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - - Status int64 - Listener *conn.Listener // accept new connection from remote users - CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - CliConnChan chan *conn.Conn // get client conns from control goroutine - UserConnList *list.List // store user conns - Mutex sync.Mutex + Name string + Passwd string + BindAddr string + ListenPort int64 + + Status int64 + Listener *conn.Listener // accept new connection from remote users + CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + StopBlockChan chan int64 // put any number to the channel, if you want to stop wait user conn + CliConnChan chan *conn.Conn // get client conns from control goroutine + UserConnList *list.List // store user conns + Mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = Idle p.CtlMsgChan = make(chan int64) + p.StopBlockChan = make(chan int64) p.CliConnChan = make(chan *conn.Conn) p.UserConnList = list.New() } @@ -55,7 +57,7 @@ func (p *ProxyServer) Start() (err error) { go func() { for { // block - c := p.Listener.GetConn() + c := p.Listener.GetConn() log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr()) // put to list @@ -93,7 +95,7 @@ func (p *ProxyServer) Start() (err error) { // msg will transfer to another without modifying log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", cliConn.GetLocalAddr(), cliConn.GetRemoteAddr(), - userConn.GetLocalAddr(), userConn.GetRemoteAddr()) + userConn.GetLocalAddr(), userConn.GetRemoteAddr()) go conn.Join(cliConn, userConn) } }() @@ -110,7 +112,15 @@ func (p *ProxyServer) Close() { p.Unlock() } -func (p *ProxyServer) WaitUserConn() (res int64) { - res = <-p.CtlMsgChan - return +func (p *ProxyServer) WaitUserConn() (res int64, isStop bool) { + select { + case res = <-p.CtlMsgChan: + return res, false + case <-p.StopBlockChan: + return 0, true + } +} + +func (p *ProxyServer) StopWaitUserConn() { + p.StopBlockChan <- 1 } diff --git a/pkg/utils/conn/conn.go b/pkg/utils/conn/conn.go index 60929ac..42c9468 100644 --- a/pkg/utils/conn/conn.go +++ b/pkg/utils/conn/conn.go @@ -1,18 +1,18 @@ package conn import ( + "bufio" "fmt" + "io" "net" - "bufio" "sync" - "io" "frp/pkg/utils/log" ) type Listener struct { - Addr net.Addr - Conns chan *Conn + Addr net.Addr + Conns chan *Conn } // wait util get one @@ -22,8 +22,8 @@ func (l *Listener) GetConn() (conn *Conn) { } type Conn struct { - TcpConn *net.TCPConn - Reader *bufio.Reader + TcpConn *net.TCPConn + Reader *bufio.Reader } func (c *Conn) ConnectServer(host string, port int64) (err error) { @@ -59,7 +59,9 @@ func (c *Conn) Write(content string) (err error) { } func (c *Conn) Close() { - c.TcpConn.Close() + if c.TcpConn != nil { // ZWF:我觉得应该加一个非空保护 + c.TcpConn.Close() + } } func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { @@ -70,8 +72,8 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { } l = &Listener{ - Addr: listener.Addr(), - Conns: make(chan *Conn), + Addr: listener.Addr(), + Conns: make(chan *Conn), } go func() { @@ -83,7 +85,7 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { } c := &Conn{ - TcpConn: conn, + TcpConn: conn, } c.Reader = bufio.NewReader(c.TcpConn) l.Conns <- c From 04c26d1c31c6994d8c77c3982b9f2b38a5776abe Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Fri, 5 Feb 2016 14:36:04 +0800 Subject: [PATCH 10/15] =?UTF-8?q?(1)=E6=96=B0=E5=A2=9E=E5=BF=83=E8=B7=B3?= =?UTF-8?q?=E6=A3=80=E6=B5=8B=E5=8F=91=E9=80=81=E9=97=B4=E9=9A=94=E5=92=8C?= =?UTF-8?q?=E8=B6=85=E6=97=B6=E6=97=B6=E9=97=B4=E7=9A=84=E9=85=8D=E7=BD=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cmd/frpc/config.go | 17 +++++++++++------ cmd/frpc/control.go | 6 +----- cmd/frps/config.go | 12 ++++++------ cmd/frps/control.go | 4 ++-- conf/frpc.ini | 4 +++- conf/frps.ini | 4 +++- 6 files changed, 26 insertions(+), 21 deletions(-) diff --git a/cmd/frpc/config.go b/cmd/frpc/config.go index a26fe6a..c6b232f 100644 --- a/cmd/frpc/config.go +++ b/cmd/frpc/config.go @@ -11,16 +11,16 @@ import ( // common config var ( - ServerAddr string = "0.0.0.0" - ServerPort int64 = 7000 - LogFile string = "./frpc.log" - LogLevel string = "warn" - LogWay string = "file" + ServerAddr string = "0.0.0.0" + ServerPort int64 = 7000 + LogFile string = "./frpc.log" + LogLevel string = "warn" + LogWay string = "file" + HeartBeatInterval int64 = 5 ) var ProxyClients map[string]*models.ProxyClient = make(map[string]*models.ProxyClient) - func LoadConf(confFile string) (err error) { var tmpStr string var ok bool @@ -56,6 +56,11 @@ func LoadConf(confFile string) (err error) { LogWay = tmpStr } + tmpStr, ok = conf.Get("common", "heartbeat_interval") + if ok { + HeartBeatInterval, _ = strconv.ParseInt(tmpStr, 10, 64) + } + // servers for name, section := range conf { if name != "common" { diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 3bca709..6bf417a 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -11,10 +11,6 @@ import ( "frp/pkg/utils/log" ) -const ( - heartbeatDuration = 2 //心跳检测时间间隔,单位秒 -) - var isHeartBeatContinue bool = true func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { @@ -114,7 +110,7 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { func startHeartBeat(con *conn.Conn) { isHeartBeatContinue = true for { - time.Sleep(heartbeatDuration * time.Second) + time.Sleep(time.Duration(HeartBeatInterval) * time.Second) if isHeartBeatContinue { // 把isHeartBeatContinue放在这里是为了防止SIGPIPE err := con.Write("\r\n") //log.Debug("send heart beat to server!") diff --git a/cmd/frps/config.go b/cmd/frps/config.go index feb07d5..995557e 100644 --- a/cmd/frps/config.go +++ b/cmd/frps/config.go @@ -11,16 +11,16 @@ import ( // common config var ( - BindAddr string = "0.0.0.0" - BindPort int64 = 9527 - LogFile string = "./frps.log" - LogLevel string = "warn" - LogWay string = "file" + BindAddr string = "0.0.0.0" + BindPort int64 = 9527 + LogFile string = "./frps.log" + LogLevel string = "warn" + LogWay string = "file" + HeartBeatTimeout int64 = 30 ) var ProxyServers map[string]*models.ProxyServer = make(map[string]*models.ProxyServer) - func LoadConf(confFile string) (err error) { var tmpStr string var ok bool diff --git a/cmd/frps/control.go b/cmd/frps/control.go index d6cfaa3..04f7a96 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -151,7 +151,7 @@ func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) { isContinueRead = false server.StopWaitUserConn() } - timer := time.AfterFunc(10*time.Second, f) + timer := time.AfterFunc(time.Duration(HeartBeatTimeout)*time.Second, f) defer timer.Stop() for isContinueRead { @@ -169,7 +169,7 @@ func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) { if content == "\r\n" { log.Debug("receive hearbeat:%s", content) - timer.Reset(10 * time.Second) + timer.Reset(time.Duration(HeartBeatTimeout) * time.Second) } } } diff --git a/conf/frpc.ini b/conf/frpc.ini index d2ba710..f9c7c11 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -6,7 +6,9 @@ log_file = ./frpc.log # debug, info, warn, error log_level = info # file, console -log_way = file +log_way = console +# 心跳检测时间间隔,单位秒,默认为2 +heartbeat_interval = 2 # test1即为name [test1] diff --git a/conf/frps.ini b/conf/frps.ini index f6a6995..4f1cfdb 100644 --- a/conf/frps.ini +++ b/conf/frps.ini @@ -6,7 +6,9 @@ log_file = ./frps.log # debug, info, warn, error log_level = info # file, console -log_way = file +log_way = console +# 心跳检测超时时间,单位秒,默认为30 +heartbeat_timeout = 30 # test1即为name [test1] From 60c9804776ffb7455b80d2110733805c93636a3c Mon Sep 17 00:00:00 2001 From: Hurricanezwf <1094646850@qq.com> Date: Fri, 5 Feb 2016 16:49:52 +0800 Subject: [PATCH 11/15] =?UTF-8?q?format=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 + cmd/frpc/main.go | 2 +- cmd/frps/main.go | 2 +- pkg/models/client.go | 16 ++++++++-------- pkg/models/msg.go | 13 ++++++------- pkg/models/server.go | 1 + pkg/utils/log/log.go | 2 +- 7 files changed, 19 insertions(+), 18 deletions(-) diff --git a/.gitignore b/.gitignore index 72c9fd3..e237cc4 100644 --- a/.gitignore +++ b/.gitignore @@ -28,3 +28,4 @@ bin/ # Cache *.swp +*.swo diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index 7f07282..8f5b08b 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -3,7 +3,7 @@ package main import ( "os" "sync" - + "frp/pkg/utils/log" ) diff --git a/cmd/frps/main.go b/cmd/frps/main.go index 1288622..d2c98be 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -3,8 +3,8 @@ package main import ( "os" - "frp/pkg/utils/log" "frp/pkg/utils/conn" + "frp/pkg/utils/log" ) func main() { diff --git a/pkg/models/client.go b/pkg/models/client.go index 1f01d50..0ebac20 100644 --- a/pkg/models/client.go +++ b/pkg/models/client.go @@ -8,9 +8,9 @@ import ( ) type ProxyClient struct { - Name string - Passwd string - LocalPort int64 + Name string + Passwd string + LocalPort int64 } func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { @@ -24,7 +24,7 @@ func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err error) { c = &conn.Conn{} - defer func(){ + defer func() { if err != nil { c.Close() } @@ -37,9 +37,9 @@ func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err } req := &ClientCtlReq{ - Type: WorkConn, - ProxyName: p.Name, - Passwd: p.Passwd, + Type: WorkConn, + ProxyName: p.Name, + Passwd: p.Passwd, } buf, _ := json.Marshal(req) @@ -64,7 +64,7 @@ func (p *ProxyClient) StartTunnel(serverAddr string, serverPort int64) (err erro } log.Debug("Join two conns, (l[%s] r[%s]) (l[%s] r[%s])", localConn.GetLocalAddr(), localConn.GetRemoteAddr(), - remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) + remoteConn.GetLocalAddr(), remoteConn.GetRemoteAddr()) go conn.Join(localConn, remoteConn) return nil } diff --git a/pkg/models/msg.go b/pkg/models/msg.go index 0062556..a3018e7 100644 --- a/pkg/models/msg.go +++ b/pkg/models/msg.go @@ -1,8 +1,8 @@ package models type GeneralRes struct { - Code int64 `json:"code"` - Msg string `json:"msg"` + Code int64 `json:"code"` + Msg string `json:"msg"` } // type @@ -12,16 +12,15 @@ const ( ) type ClientCtlReq struct { - Type int64 `json:"type"` - ProxyName string `json:"proxy_name"` - Passwd string `json:"passwd"` + Type int64 `json:"type"` + ProxyName string `json:"proxy_name"` + Passwd string `json:"passwd"` } type ClientCtlRes struct { GeneralRes } - type ServerCtlReq struct { - Type int64 `json:"type"` + Type int64 `json:"type"` } diff --git a/pkg/models/server.go b/pkg/models/server.go index 2c503dd..f8c9451 100644 --- a/pkg/models/server.go +++ b/pkg/models/server.go @@ -89,6 +89,7 @@ func (p *ProxyServer) Start() (err error) { p.UserConnList.Remove(element) } else { cliConn.Close() + p.Unlock() continue } p.Unlock() diff --git a/pkg/utils/log/log.go b/pkg/utils/log/log.go index 1a55c3c..f6587cd 100644 --- a/pkg/utils/log/log.go +++ b/pkg/utils/log/log.go @@ -22,7 +22,7 @@ func SetLogFile(logWay string, logFile string) { if logWay == "console" { Log.SetLogger("console", "") } else { - Log.SetLogger("file", `{"filename": "` + logFile + `"}`) + Log.SetLogger("file", `{"filename": "`+logFile+`"}`) } } From 50165053f864cf1db8fb82ec7b8da2ee46e3b79f Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 18 Feb 2016 16:56:55 +0800 Subject: [PATCH 12/15] Change directory structure, move models and utils to root directory --- cmd/frpc/config.go | 7 ++- cmd/frpc/control.go | 18 ++++--- cmd/frpc/main.go | 2 +- cmd/frps/config.go | 6 +-- cmd/frps/control.go | 51 +++++++++++--------- cmd/frps/main.go | 4 +- {pkg/models => models/client}/client.go | 12 +++-- models/consts/consts.go | 13 +++++ {pkg/models => models/msg}/msg.go | 8 +-- {pkg/models => models/server}/server.go | 20 +++----- {pkg/utils => utils}/conn/conn.go | 2 +- {pkg/utils => utils}/log/log.go | 0 {pkg/utils => utils}/pcrypto/pcrypto.go | 0 {pkg/utils => utils}/pcrypto/pcrypto_test.go | 0 14 files changed, 76 insertions(+), 67 deletions(-) rename {pkg/models => models/client}/client.go (86%) create mode 100644 models/consts/consts.go rename {pkg/models => models/msg}/msg.go (83%) rename {pkg/models => models/server}/server.go (91%) rename {pkg/utils => utils}/conn/conn.go (98%) rename {pkg/utils => utils}/log/log.go (100%) rename {pkg/utils => utils}/pcrypto/pcrypto.go (100%) rename {pkg/utils => utils}/pcrypto/pcrypto_test.go (100%) diff --git a/cmd/frpc/config.go b/cmd/frpc/config.go index ff6f1ad..b500e47 100644 --- a/cmd/frpc/config.go +++ b/cmd/frpc/config.go @@ -4,8 +4,7 @@ import ( "fmt" "strconv" - "github.com/fatedier/frp/pkg/models" - + "github.com/fatedier/frp/models/client" ini "github.com/vaughan0/go-ini" ) @@ -19,7 +18,7 @@ var ( HeartBeatInterval int64 = 5 ) -var ProxyClients map[string]*models.ProxyClient = make(map[string]*models.ProxyClient) +var ProxyClients map[string]*client.ProxyClient = make(map[string]*client.ProxyClient) func LoadConf(confFile string) (err error) { var tmpStr string @@ -59,7 +58,7 @@ func LoadConf(confFile string) (err error) { // servers for name, section := range conf { if name != "common" { - proxyClient := &models.ProxyClient{} + proxyClient := &client.ProxyClient{} proxyClient.Name = name proxyClient.Passwd, ok = section["passwd"] diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 57fce55..2b0c59b 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -6,14 +6,16 @@ import ( "sync" "time" - "github.com/fatedier/frp/pkg/models" - "github.com/fatedier/frp/pkg/utils/conn" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/models/client" + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/conn" + "github.com/fatedier/frp/utils/log" ) var isHeartBeatContinue bool = true -func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { +func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() c := loginToServer(cli) @@ -54,7 +56,7 @@ func ControlProcess(cli *models.ProxyClient, wait *sync.WaitGroup) { } } -func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { +func loginToServer(cli *client.ProxyClient) (connection *conn.Conn) { c := &conn.Conn{} connection = nil @@ -65,8 +67,8 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { break } - req := &models.ClientCtlReq{ - Type: models.ControlConn, + req := &msg.ClientCtlReq{ + Type: consts.CtlConn, ProxyName: cli.Name, Passwd: cli.Passwd, } @@ -84,7 +86,7 @@ func loginToServer(cli *models.ProxyClient) (connection *conn.Conn) { } log.Debug("ProxyName [%s], read [%s]", cli.Name, res) - clientCtlRes := &models.ClientCtlRes{} + clientCtlRes := &msg.ClientCtlRes{} if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil { log.Error("ProxyName [%s], format server response error, %v", cli.Name, err) break diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index 1bb5eb3..7e7fe4a 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -4,7 +4,7 @@ import ( "os" "sync" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/utils/log" ) func main() { diff --git a/cmd/frps/config.go b/cmd/frps/config.go index af523e0..d3b829f 100644 --- a/cmd/frps/config.go +++ b/cmd/frps/config.go @@ -4,7 +4,7 @@ import ( "fmt" "strconv" - "github.com/fatedier/frp/pkg/models" + "github.com/fatedier/frp/models/server" ini "github.com/vaughan0/go-ini" ) @@ -19,7 +19,7 @@ var ( HeartBeatTimeout int64 = 30 ) -var ProxyServers map[string]*models.ProxyServer = make(map[string]*models.ProxyServer) +var ProxyServers map[string]*server.ProxyServer = make(map[string]*server.ProxyServer) func LoadConf(confFile string) (err error) { var tmpStr string @@ -59,7 +59,7 @@ func LoadConf(confFile string) (err error) { // servers for name, section := range conf { if name != "common" { - proxyServer := &models.ProxyServer{} + proxyServer := &server.ProxyServer{} proxyServer.Name = name proxyServer.Passwd, ok = section["passwd"] diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 609b25a..1797385 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -6,9 +6,11 @@ import ( "io" "time" - "github.com/fatedier/frp/pkg/models" - "github.com/fatedier/frp/pkg/utils/conn" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/models/server" + "github.com/fatedier/frp/utils/conn" + "github.com/fatedier/frp/utils/log" ) func ProcessControlConn(l *conn.Listener) { @@ -30,18 +32,18 @@ func controlWorker(c *conn.Conn) { } log.Debug("get: %s", res) - clientCtlReq := &models.ClientCtlReq{} - clientCtlRes := &models.ClientCtlRes{} + clientCtlReq := &msg.ClientCtlReq{} + clientCtlRes := &msg.ClientCtlRes{} if err := json.Unmarshal([]byte(res), &clientCtlReq); err != nil { log.Warn("Parse err: %v : %s", err, res) return } // check - succ, msg, needRes := checkProxy(clientCtlReq, c) + succ, info, needRes := checkProxy(clientCtlReq, c) if !succ { clientCtlRes.Code = 1 - clientCtlRes.Msg = msg + clientCtlRes.Msg = info } if needRes { @@ -70,8 +72,8 @@ func controlWorker(c *conn.Conn) { // read control msg from client go readControlMsgFromClient(server, c) - serverCtlReq := &models.ClientCtlReq{} - serverCtlReq.Type = models.WorkConn + serverCtlReq := &msg.ClientCtlReq{} + serverCtlReq.Type = consts.WorkConn for { _, isStop := server.WaitUserConn() if isStop { @@ -92,52 +94,53 @@ func controlWorker(c *conn.Conn) { return } -func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, needRes bool) { +func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, needRes bool) { succ = false needRes = true // check if proxy name exist server, ok := ProxyServers[req.ProxyName] if !ok { - msg = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName) - log.Warn(msg) + info = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName) + log.Warn(info) return } // check password if req.Passwd != server.Passwd { - msg = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName) - log.Warn(msg) + info = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName) + log.Warn(info) return } // control conn - if req.Type == models.ControlConn { - if server.Status != models.Idle { - msg = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName) - log.Warn(msg) + if req.Type == consts.CtlConn { + if server.Status != consts.Idle { + info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName) + log.Warn(info) return } // start proxy and listen for user conn, no block err := server.Start() if err != nil { - msg = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error()) - log.Warn(msg) + info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error()) + log.Warn(info) return } log.Info("ProxyName [%s], start proxy success", req.ProxyName) - } else if req.Type == models.WorkConn { + } else if req.Type == consts.WorkConn { // work conn needRes = false - if server.Status != models.Working { + if server.Status != consts.Working { log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName) return } server.CliConnChan <- c } else { - log.Warn("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) + info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) + log.Warn(info) return } @@ -145,7 +148,7 @@ func checkProxy(req *models.ClientCtlReq, c *conn.Conn) (succ bool, msg string, return } -func readControlMsgFromClient(server *models.ProxyServer, c *conn.Conn) { +func readControlMsgFromClient(server *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false diff --git a/cmd/frps/main.go b/cmd/frps/main.go index 83fdc48..4aef278 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -3,8 +3,8 @@ package main import ( "os" - "github.com/fatedier/frp/pkg/utils/conn" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/utils/conn" + "github.com/fatedier/frp/utils/log" ) func main() { diff --git a/pkg/models/client.go b/models/client/client.go similarity index 86% rename from pkg/models/client.go rename to models/client/client.go index 3fc5f57..481ba8e 100644 --- a/pkg/models/client.go +++ b/models/client/client.go @@ -1,10 +1,12 @@ -package models +package client import ( "encoding/json" - "github.com/fatedier/frp/pkg/utils/conn" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/models/msg" + "github.com/fatedier/frp/utils/conn" + "github.com/fatedier/frp/utils/log" ) type ProxyClient struct { @@ -36,8 +38,8 @@ func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err return } - req := &ClientCtlReq{ - Type: WorkConn, + req := &msg.ClientCtlReq{ + Type: consts.WorkConn, ProxyName: p.Name, Passwd: p.Passwd, } diff --git a/models/consts/consts.go b/models/consts/consts.go new file mode 100644 index 0000000..51dfe20 --- /dev/null +++ b/models/consts/consts.go @@ -0,0 +1,13 @@ +package consts + +// server status +const ( + Idle = iota + Working +) + +// connection type +const ( + CtlConn = iota + WorkConn +) diff --git a/pkg/models/msg.go b/models/msg/msg.go similarity index 83% rename from pkg/models/msg.go rename to models/msg/msg.go index a3018e7..6555296 100644 --- a/pkg/models/msg.go +++ b/models/msg/msg.go @@ -1,16 +1,10 @@ -package models +package msg type GeneralRes struct { Code int64 `json:"code"` Msg string `json:"msg"` } -// type -const ( - ControlConn = iota - WorkConn -) - type ClientCtlReq struct { Type int64 `json:"type"` ProxyName string `json:"proxy_name"` diff --git a/pkg/models/server.go b/models/server/server.go similarity index 91% rename from pkg/models/server.go rename to models/server/server.go index 7f58e5e..0d11717 100644 --- a/pkg/models/server.go +++ b/models/server/server.go @@ -1,16 +1,12 @@ -package models +package server import ( "container/list" "sync" - "github.com/fatedier/frp/pkg/utils/conn" - "github.com/fatedier/frp/pkg/utils/log" -) - -const ( - Idle = iota - Working + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/utils/conn" + "github.com/fatedier/frp/utils/log" ) type ProxyServer struct { @@ -29,7 +25,7 @@ type ProxyServer struct { } func (p *ProxyServer) Init() { - p.Status = Idle + p.Status = consts.Idle p.CtlMsgChan = make(chan int64) p.StopBlockChan = make(chan int64) p.CliConnChan = make(chan *conn.Conn) @@ -51,7 +47,7 @@ func (p *ProxyServer) Start() (err error) { return err } - p.Status = Working + p.Status = consts.Working // start a goroutine for listener go func() { @@ -62,7 +58,7 @@ func (p *ProxyServer) Start() (err error) { // put to list p.Lock() - if p.Status != Working { + if p.Status != consts.Working { log.Debug("ProxyName [%s] is not working, new user conn close", p.Name) c.Close() p.Unlock() @@ -107,7 +103,7 @@ func (p *ProxyServer) Start() (err error) { func (p *ProxyServer) Close() { p.Lock() - p.Status = Idle + p.Status = consts.Idle p.CtlMsgChan = make(chan int64) p.CliConnChan = make(chan *conn.Conn) p.UserConnList = list.New() diff --git a/pkg/utils/conn/conn.go b/utils/conn/conn.go similarity index 98% rename from pkg/utils/conn/conn.go rename to utils/conn/conn.go index 5f65329..1fa0cb3 100644 --- a/pkg/utils/conn/conn.go +++ b/utils/conn/conn.go @@ -7,7 +7,7 @@ import ( "net" "sync" - "github.com/fatedier/frp/pkg/utils/log" + "github.com/fatedier/frp/utils/log" ) type Listener struct { diff --git a/pkg/utils/log/log.go b/utils/log/log.go similarity index 100% rename from pkg/utils/log/log.go rename to utils/log/log.go diff --git a/pkg/utils/pcrypto/pcrypto.go b/utils/pcrypto/pcrypto.go similarity index 100% rename from pkg/utils/pcrypto/pcrypto.go rename to utils/pcrypto/pcrypto.go diff --git a/pkg/utils/pcrypto/pcrypto_test.go b/utils/pcrypto/pcrypto_test.go similarity index 100% rename from pkg/utils/pcrypto/pcrypto_test.go rename to utils/pcrypto/pcrypto_test.go From 84f8addd6a6119dec46f41d23f3a79a919632837 Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 18 Feb 2016 18:24:48 +0800 Subject: [PATCH 13/15] Move config.go to models/xxx --- cmd/frpc/control.go | 12 ++++---- cmd/frpc/main.go | 9 +++--- cmd/frps/control.go | 40 +++++++++++++-------------- cmd/frps/main.go | 7 +++-- {cmd/frpc => models/client}/config.go | 7 ++--- {cmd/frps => models/server}/config.go | 8 ++---- utils/conn/conn.go | 1 - 7 files changed, 41 insertions(+), 43 deletions(-) rename {cmd/frpc => models/client}/config.go (90%) rename {cmd/frps => models/server}/config.go (90%) diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 2b0c59b..971c595 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -33,7 +33,7 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 for { - log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, ServerAddr, ServerPort) + log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) tmpConn := loginToServer(cli) if tmpConn != nil { c.Close() @@ -52,7 +52,7 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { continue } - cli.StartTunnel(ServerAddr, ServerPort) + cli.StartTunnel(client.ServerAddr, client.ServerPort) } } @@ -61,9 +61,9 @@ func loginToServer(cli *client.ProxyClient) (connection *conn.Conn) { connection = nil for i := 0; i < 1; i++ { - err := c.ConnectServer(ServerAddr, ServerPort) + err := c.ConnectServer(client.ServerAddr, client.ServerPort) if err != nil { - log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, ServerAddr, ServerPort, err) + log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, client.ServerAddr, client.ServerPort, err) break } @@ -99,7 +99,7 @@ func loginToServer(cli *client.ProxyClient) (connection *conn.Conn) { connection = c go startHeartBeat(connection) - log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, ServerAddr, ServerPort) + log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort) } if connection == nil { @@ -113,7 +113,7 @@ func startHeartBeat(con *conn.Conn) { isHeartBeatContinue = true log.Debug("Start to send heartbeat") for { - time.Sleep(time.Duration(HeartBeatInterval) * time.Second) + time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second) if isHeartBeatContinue { err := con.Write("\n") if err != nil { diff --git a/cmd/frpc/main.go b/cmd/frpc/main.go index 7e7fe4a..c17f3e7 100644 --- a/cmd/frpc/main.go +++ b/cmd/frpc/main.go @@ -4,22 +4,23 @@ import ( "os" "sync" + "github.com/fatedier/frp/models/client" "github.com/fatedier/frp/utils/log" ) func main() { - err := LoadConf("./frpc.ini") + err := client.LoadConf("./frpc.ini") if err != nil { os.Exit(-1) } - log.InitLog(LogWay, LogFile, LogLevel) + log.InitLog(client.LogWay, client.LogFile, client.LogLevel) // wait until all control goroutine exit var wait sync.WaitGroup - wait.Add(len(ProxyClients)) + wait.Add(len(client.ProxyClients)) - for _, client := range ProxyClients { + for _, client := range client.ProxyClients { go ControlProcess(client, &wait) } diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 1797385..5f2eab9 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -63,34 +63,34 @@ func controlWorker(c *conn.Conn) { } // others is from server to client - server, ok := ProxyServers[clientCtlReq.ProxyName] + s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) return } // read control msg from client - go readControlMsgFromClient(server, c) + go readControlMsgFromClient(s, c) serverCtlReq := &msg.ClientCtlReq{} serverCtlReq.Type = consts.WorkConn for { - _, isStop := server.WaitUserConn() + _, isStop := s.WaitUserConn() if isStop { break } buf, _ := json.Marshal(serverCtlReq) err = c.Write(string(buf) + "\n") if err != nil { - log.Warn("ProxyName [%s], write to client error, proxy exit", server.Name) - server.Close() + log.Warn("ProxyName [%s], write to client error, proxy exit", s.Name) + s.Close() return } - log.Debug("ProxyName [%s], write to client to add work conn success", server.Name) + log.Debug("ProxyName [%s], write to client to add work conn success", s.Name) } - log.Error("ProxyName [%s], I'm dead!", server.Name) + log.Error("ProxyName [%s], I'm dead!", s.Name) return } @@ -98,7 +98,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne succ = false needRes = true // check if proxy name exist - server, ok := ProxyServers[req.ProxyName] + s, ok := server.ProxyServers[req.ProxyName] if !ok { info = fmt.Sprintf("ProxyName [%s] is not exist", req.ProxyName) log.Warn(info) @@ -106,7 +106,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne } // check password - if req.Passwd != server.Passwd { + if req.Passwd != s.Passwd { info = fmt.Sprintf("ProxyName [%s], password is not correct", req.ProxyName) log.Warn(info) return @@ -114,14 +114,14 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne // control conn if req.Type == consts.CtlConn { - if server.Status != consts.Idle { + if s.Status != consts.Idle { info = fmt.Sprintf("ProxyName [%s], already in use", req.ProxyName) log.Warn(info) return } // start proxy and listen for user conn, no block - err := server.Start() + err := s.Start() if err != nil { info = fmt.Sprintf("ProxyName [%s], start proxy error: %v", req.ProxyName, err.Error()) log.Warn(info) @@ -132,12 +132,12 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne } else if req.Type == consts.WorkConn { // work conn needRes = false - if server.Status != consts.Working { + if s.Status != consts.Working { log.Warn("ProxyName [%s], is not working when it gets one new work conn", req.ProxyName) return } - server.CliConnChan <- c + s.CliConnChan <- c } else { info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) log.Warn(info) @@ -148,13 +148,13 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne return } -func readControlMsgFromClient(server *server.ProxyServer, c *conn.Conn) { +func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false - server.StopWaitUserConn() + s.StopWaitUserConn() } - timer := time.AfterFunc(time.Duration(HeartBeatTimeout)*time.Second, f) + timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() for isContinueRead { @@ -162,16 +162,16 @@ func readControlMsgFromClient(server *server.ProxyServer, c *conn.Conn) { //log.Debug("Receive msg from client! content:%s", content) if err != nil { if err == io.EOF { - log.Warn("Server detect client[%s] is dead!", server.Name) - server.StopWaitUserConn() + log.Warn("Server detect client[%s] is dead!", s.Name) + s.StopWaitUserConn() break } - log.Error("ProxyName [%s], read error:%s", server.Name, err.Error()) + log.Error("ProxyName [%s], read error:%s", s.Name, err.Error()) continue } if content == "\n" { - timer.Reset(time.Duration(HeartBeatTimeout) * time.Second) + timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) } } } diff --git a/cmd/frps/main.go b/cmd/frps/main.go index 4aef278..e21f927 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -3,19 +3,20 @@ package main import ( "os" + "github.com/fatedier/frp/models/server" "github.com/fatedier/frp/utils/conn" "github.com/fatedier/frp/utils/log" ) func main() { - err := LoadConf("./frps.ini") + err := server.LoadConf("./frps.ini") if err != nil { os.Exit(-1) } - log.InitLog(LogWay, LogFile, LogLevel) + log.InitLog(server.LogWay, server.LogFile, server.LogLevel) - l, err := conn.Listen(BindAddr, BindPort) + l, err := conn.Listen(server.BindAddr, server.BindPort) if err != nil { log.Error("Create listener error, %v", err) os.Exit(-1) diff --git a/cmd/frpc/config.go b/models/client/config.go similarity index 90% rename from cmd/frpc/config.go rename to models/client/config.go index b500e47..063b216 100644 --- a/cmd/frpc/config.go +++ b/models/client/config.go @@ -1,10 +1,9 @@ -package main +package client import ( "fmt" "strconv" - "github.com/fatedier/frp/models/client" ini "github.com/vaughan0/go-ini" ) @@ -18,7 +17,7 @@ var ( HeartBeatInterval int64 = 5 ) -var ProxyClients map[string]*client.ProxyClient = make(map[string]*client.ProxyClient) +var ProxyClients map[string]*ProxyClient = make(map[string]*ProxyClient) func LoadConf(confFile string) (err error) { var tmpStr string @@ -58,7 +57,7 @@ func LoadConf(confFile string) (err error) { // servers for name, section := range conf { if name != "common" { - proxyClient := &client.ProxyClient{} + proxyClient := &ProxyClient{} proxyClient.Name = name proxyClient.Passwd, ok = section["passwd"] diff --git a/cmd/frps/config.go b/models/server/config.go similarity index 90% rename from cmd/frps/config.go rename to models/server/config.go index d3b829f..f9e974e 100644 --- a/cmd/frps/config.go +++ b/models/server/config.go @@ -1,11 +1,9 @@ -package main +package server import ( "fmt" "strconv" - "github.com/fatedier/frp/models/server" - ini "github.com/vaughan0/go-ini" ) @@ -19,7 +17,7 @@ var ( HeartBeatTimeout int64 = 30 ) -var ProxyServers map[string]*server.ProxyServer = make(map[string]*server.ProxyServer) +var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer) func LoadConf(confFile string) (err error) { var tmpStr string @@ -59,7 +57,7 @@ func LoadConf(confFile string) (err error) { // servers for name, section := range conf { if name != "common" { - proxyServer := &server.ProxyServer{} + proxyServer := &ProxyServer{} proxyServer.Name = name proxyServer.Passwd, ok = section["passwd"] diff --git a/utils/conn/conn.go b/utils/conn/conn.go index 1fa0cb3..7607ff4 100644 --- a/utils/conn/conn.go +++ b/utils/conn/conn.go @@ -80,7 +80,6 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { for { conn, err := listener.AcceptTCP() if err != nil { - log.Error("Accept new tcp connection error, %v", err) continue } From 0f7271312a78fe82d3de38b52a431a3ef9630f68 Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 19 Feb 2016 00:18:34 +0800 Subject: [PATCH 14/15] Add package "broadcast" for transmitting message to goroutines at the same time --- utils/broadcast/broadcast.go | 73 +++++++++++++++++++++++++++++++ utils/broadcast/broadcast_test.go | 63 ++++++++++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 utils/broadcast/broadcast.go create mode 100644 utils/broadcast/broadcast_test.go diff --git a/utils/broadcast/broadcast.go b/utils/broadcast/broadcast.go new file mode 100644 index 0000000..4d45012 --- /dev/null +++ b/utils/broadcast/broadcast.go @@ -0,0 +1,73 @@ +package broadcast + +type Broadcast struct { + listeners []chan interface{} + reg chan (chan interface{}) + unreg chan (chan interface{}) + in chan interface{} + stop chan int64 + stopStatus bool +} + +func NewBroadcast() *Broadcast { + b := &Broadcast{ + listeners: make([]chan interface{}, 0), + reg: make(chan (chan interface{})), + unreg: make(chan (chan interface{})), + in: make(chan interface{}), + stop: make(chan int64), + stopStatus: false, + } + + go func() { + for { + select { + case l := <-b.unreg: + // remove L from b.listeners + // this operation is slow: O(n) but not used frequently + // unlike iterating over listeners + oldListeners := b.listeners + b.listeners = make([]chan interface{}, 0, len(oldListeners)) + for _, oldL := range oldListeners { + if l != oldL { + b.listeners = append(b.listeners, oldL) + } + } + + case l := <-b.reg: + b.listeners = append(b.listeners, l) + + case item := <-b.in: + for _, l := range b.listeners { + l <- item + } + + case _ = <-b.stop: + b.stopStatus = true + break + } + } + }() + + return b +} + +func (b *Broadcast) In() chan interface{} { + return b.in +} + +func (b *Broadcast) Reg() chan interface{} { + listener := make(chan interface{}) + b.reg <- listener + return listener +} + +func (b *Broadcast) UnReg(listener chan interface{}) { + b.unreg <- listener +} + +func (b *Broadcast) Close() { + if b.stopStatus == false { + b.stop <- 1 + } +} diff --git a/utils/broadcast/broadcast_test.go b/utils/broadcast/broadcast_test.go new file mode 100644 index 0000000..3354adc --- /dev/null +++ b/utils/broadcast/broadcast_test.go @@ -0,0 +1,63 @@ +package broadcast + +import ( + "sync" + "testing" + "time" +) + +var ( + totalNum int = 5 + succNum int = 0 + mutex sync.Mutex +) + +func TestBroadcast(t *testing.T) { + b := NewBroadcast() + if b == nil { + t.Errorf("New Broadcast error, nil return") + } + defer b.Close() + + var wait sync.WaitGroup + wait.Add(totalNum) + for i := 0; i < totalNum; i++ { + go worker(b, &wait) + } + + time.Sleep(1e6 * 20) + msg := "test" + b.In() <- msg + + wait.Wait() + if succNum != totalNum { + t.Errorf("TotalNum %d, FailNum(timeout) %d", totalNum, totalNum-succNum) + } +} + +func worker(b *Broadcast, wait *sync.WaitGroup) { + defer wait.Done() + msgChan := b.Reg() + + // exit if nothing got in 2 seconds + timeout := make(chan bool, 1) + go func() { + time.Sleep(time.Duration(2) * time.Second) + timeout <- true + }() + + select { + case item := <-msgChan: + msg := item.(string) + if msg == "test" { + mutex.Lock() + succNum++ + mutex.Unlock() + } else { + break + } + + case <-timeout: + break + } +} From 26479cf92aa6f0501302e1188531559c2747fb2a Mon Sep 17 00:00:00 2001 From: fatedier Date: Fri, 19 Feb 2016 17:01:47 +0800 Subject: [PATCH 15/15] all: fix bug when client shutdown and reconnect, server response already use 1. if client is offline, server will release all resources 2. use a graceful method to shutdown go net.Listeners 3. add closeFlag for Conn, so startHeartBeat func can exit correctly now --- .travis.yml | 2 - cmd/frpc/control.go | 96 ++++++++++++++++------------------- cmd/frps/control.go | 24 ++++----- models/client/client.go | 6 +-- models/server/server.go | 82 ++++++++++++++++-------------- utils/conn/conn.go | 109 ++++++++++++++++++++++++++-------------- 6 files changed, 173 insertions(+), 146 deletions(-) diff --git a/.travis.yml b/.travis.yml index f41bc59..68fda64 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,11 +1,9 @@ -go_import_path: github.com/fatedier/frp sudo: false language: go go: - 1.4.2 - 1.5.2 - - tip install: - make diff --git a/cmd/frpc/control.go b/cmd/frpc/control.go index 971c595..c328555 100644 --- a/cmd/frpc/control.go +++ b/cmd/frpc/control.go @@ -2,6 +2,7 @@ package main import ( "encoding/json" + "fmt" "io" "sync" "time" @@ -18,8 +19,8 @@ var isHeartBeatContinue bool = true func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() - c := loginToServer(cli) - if c == nil { + c, err := loginToServer(cli) + if err != nil { log.Error("ProxyName [%s], connect to server failed!", cli.Name) return } @@ -34,15 +35,15 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { var sleepTime time.Duration = 1 for { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) - tmpConn := loginToServer(cli) - if tmpConn != nil { + tmpConn, err := loginToServer(cli) + if err == nil { c.Close() c = tmpConn break } if sleepTime < 60 { - sleepTime++ + sleepTime = sleepTime * 2 } time.Sleep(sleepTime * time.Second) } @@ -56,71 +57,62 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { } } -func loginToServer(cli *client.ProxyClient) (connection *conn.Conn) { - c := &conn.Conn{} - - connection = nil - for i := 0; i < 1; i++ { - err := c.ConnectServer(client.ServerAddr, client.ServerPort) - if err != nil { - log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, client.ServerAddr, client.ServerPort, err) - break - } - - req := &msg.ClientCtlReq{ - Type: consts.CtlConn, - ProxyName: cli.Name, - Passwd: cli.Passwd, - } - buf, _ := json.Marshal(req) - err = c.Write(string(buf) + "\n") - if err != nil { - log.Error("ProxyName [%s], write to server error, %v", cli.Name, err) - break - } - - res, err := c.ReadLine() - if err != nil { - log.Error("ProxyName [%s], read from server error, %v", cli.Name, err) - break - } - log.Debug("ProxyName [%s], read [%s]", cli.Name, res) +func loginToServer(cli *client.ProxyClient) (c *conn.Conn, err error) { + c, err = conn.ConnectServer(client.ServerAddr, client.ServerPort) + if err != nil { + log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", cli.Name, client.ServerAddr, client.ServerPort, err) + return + } - clientCtlRes := &msg.ClientCtlRes{} - if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil { - log.Error("ProxyName [%s], format server response error, %v", cli.Name, err) - break - } + req := &msg.ClientCtlReq{ + Type: consts.CtlConn, + ProxyName: cli.Name, + Passwd: cli.Passwd, + } + buf, _ := json.Marshal(req) + err = c.Write(string(buf) + "\n") + if err != nil { + log.Error("ProxyName [%s], write to server error, %v", cli.Name, err) + return + } - if clientCtlRes.Code != 0 { - log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg) - break - } + res, err := c.ReadLine() + if err != nil { + log.Error("ProxyName [%s], read from server error, %v", cli.Name, err) + return + } + log.Debug("ProxyName [%s], read [%s]", cli.Name, res) - connection = c - go startHeartBeat(connection) - log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort) + clientCtlRes := &msg.ClientCtlRes{} + if err = json.Unmarshal([]byte(res), &clientCtlRes); err != nil { + log.Error("ProxyName [%s], format server response error, %v", cli.Name, err) + return } - if connection == nil { - c.Close() + if clientCtlRes.Code != 0 { + log.Error("ProxyName [%s], start proxy error, %s", cli.Name, clientCtlRes.Msg) + return c, fmt.Errorf("%s", clientCtlRes.Msg) } + go startHeartBeat(c) + log.Debug("ProxyName [%s], connect to server[%s:%d] success!", cli.Name, client.ServerAddr, client.ServerPort) + return } -func startHeartBeat(con *conn.Conn) { - isHeartBeatContinue = true +func startHeartBeat(c *conn.Conn) { log.Debug("Start to send heartbeat") for { time.Sleep(time.Duration(client.HeartBeatInterval) * time.Second) - if isHeartBeatContinue { - err := con.Write("\n") + if !c.IsClosed() { + err := c.Write("\n") if err != nil { log.Error("Send hearbeat to server failed! Err:%s", err.Error()) + continue } } else { break } } + log.Info("heartbeat exit") } diff --git a/cmd/frps/control.go b/cmd/frps/control.go index 5f2eab9..02ff86e 100644 --- a/cmd/frps/control.go +++ b/cmd/frps/control.go @@ -75,8 +75,9 @@ func controlWorker(c *conn.Conn) { serverCtlReq := &msg.ClientCtlReq{} serverCtlReq.Type = consts.WorkConn for { - _, isStop := s.WaitUserConn() - if isStop { + closeFlag := s.WaitUserConn() + if closeFlag { + log.Debug("ProxyName [%s], goroutine for dealing user conn is closed", s.Name) break } buf, _ := json.Marshal(serverCtlReq) @@ -90,7 +91,7 @@ func controlWorker(c *conn.Conn) { log.Debug("ProxyName [%s], write to client to add work conn success", s.Name) } - log.Error("ProxyName [%s], I'm dead!", s.Name) + log.Info("ProxyName [%s], I'm dead!", s.Name) return } @@ -152,26 +153,25 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false - s.StopWaitUserConn() + c.Close() + s.Close() } timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() for isContinueRead { - content, err := c.ReadLine() - //log.Debug("Receive msg from client! content:%s", content) + _, err := c.ReadLine() if err != nil { if err == io.EOF { - log.Warn("Server detect client[%s] is dead!", s.Name) - s.StopWaitUserConn() + log.Warn("ProxyName [%s], client is dead!", s.Name) + c.Close() + s.Close() break } - log.Error("ProxyName [%s], read error:%s", s.Name, err.Error()) + log.Error("ProxyName [%s], read error: %v", s.Name, err) continue } - if content == "\n" { - timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) - } + timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) } } diff --git a/models/client/client.go b/models/client/client.go index 481ba8e..81a0448 100644 --- a/models/client/client.go +++ b/models/client/client.go @@ -16,8 +16,7 @@ type ProxyClient struct { } func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { - c = &conn.Conn{} - err = c.ConnectServer("127.0.0.1", p.LocalPort) + c, err = conn.ConnectServer("127.0.0.1", p.LocalPort) if err != nil { log.Error("ProxyName [%s], connect to local port error, %v", p.Name, err) } @@ -25,14 +24,13 @@ func (p *ProxyClient) GetLocalConn() (c *conn.Conn, err error) { } func (p *ProxyClient) GetRemoteConn(addr string, port int64) (c *conn.Conn, err error) { - c = &conn.Conn{} defer func() { if err != nil { c.Close() } }() - err = c.ConnectServer(addr, port) + c, err = conn.ConnectServer(addr, port) if err != nil { log.Error("ProxyName [%s], connect to server [%s:%d] error, %v", p.Name, addr, port, err) return diff --git a/models/server/server.go b/models/server/server.go index 0d11717..889b2d7 100644 --- a/models/server/server.go +++ b/models/server/server.go @@ -10,39 +10,38 @@ import ( ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - - Status int64 - Listener *conn.Listener // accept new connection from remote users - CtlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - StopBlockChan chan int64 // put any number to the channel, if you want to stop wait user conn - CliConnChan chan *conn.Conn // get client conns from control goroutine - UserConnList *list.List // store user conns - Mutex sync.Mutex + Name string + Passwd string + BindAddr string + ListenPort int64 + Status int64 + CliConnChan chan *conn.Conn // get client conns from control goroutine + + listener *conn.Listener // accept new connection from remote users + ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + userConnList *list.List // store user conns + mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = consts.Idle - p.CtlMsgChan = make(chan int64) - p.StopBlockChan = make(chan int64) p.CliConnChan = make(chan *conn.Conn) - p.UserConnList = list.New() + p.ctlMsgChan = make(chan int64) + p.userConnList = list.New() } func (p *ProxyServer) Lock() { - p.Mutex.Lock() + p.mutex.Lock() } func (p *ProxyServer) Unlock() { - p.Mutex.Unlock() + p.mutex.Unlock() } // start listening for user conns func (p *ProxyServer) Start() (err error) { - p.Listener, err = conn.Listen(p.BindAddr, p.ListenPort) + p.Init() + p.listener, err = conn.Listen(p.BindAddr, p.ListenPort) if err != nil { return err } @@ -53,10 +52,15 @@ func (p *ProxyServer) Start() (err error) { go func() { for { // block - c := p.Listener.GetConn() + // if listener is closed, get nil + c := p.listener.GetConn() + if c == nil { + log.Info("ProxyName [%s], listener is closed", p.Name) + return + } log.Debug("ProxyName [%s], get one new user conn [%s]", p.Name, c.GetRemoteAddr()) - // put to list + // insert into list p.Lock() if p.Status != consts.Working { log.Debug("ProxyName [%s] is not working, new user conn close", p.Name) @@ -64,25 +68,29 @@ func (p *ProxyServer) Start() (err error) { p.Unlock() return } - p.UserConnList.PushBack(c) + p.userConnList.PushBack(c) p.Unlock() // put msg to control conn - p.CtlMsgChan <- 1 + p.ctlMsgChan <- 1 } }() // start another goroutine for join two conns from client and user go func() { for { - cliConn := <-p.CliConnChan + cliConn, ok := <-p.CliConnChan + if !ok { + return + } + p.Lock() - element := p.UserConnList.Front() + element := p.userConnList.Front() var userConn *conn.Conn if element != nil { userConn = element.Value.(*conn.Conn) - p.UserConnList.Remove(element) + p.userConnList.Remove(element) } else { cliConn.Close() p.Unlock() @@ -104,21 +112,19 @@ func (p *ProxyServer) Start() (err error) { func (p *ProxyServer) Close() { p.Lock() p.Status = consts.Idle - p.CtlMsgChan = make(chan int64) - p.CliConnChan = make(chan *conn.Conn) - p.UserConnList = list.New() + p.listener.Close() + close(p.ctlMsgChan) + close(p.CliConnChan) + p.userConnList = list.New() p.Unlock() } -func (p *ProxyServer) WaitUserConn() (res int64, isStop bool) { - select { - case res = <-p.CtlMsgChan: - return res, false - case <-p.StopBlockChan: - return 0, true - } -} +func (p *ProxyServer) WaitUserConn() (closeFlag bool) { + closeFlag = false -func (p *ProxyServer) StopWaitUserConn() { - p.StopBlockChan <- 1 + _, ok := <-p.ctlMsgChan + if !ok { + closeFlag = true + } + return } diff --git a/utils/conn/conn.go b/utils/conn/conn.go index 7607ff4..4cf6762 100644 --- a/utils/conn/conn.go +++ b/utils/conn/conn.go @@ -11,33 +11,87 @@ import ( ) type Listener struct { - Addr net.Addr - Conns chan *Conn + addr net.Addr + l *net.TCPListener + conns chan *Conn + closeFlag bool } -// wait util get one +func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { + tcpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", bindAddr, bindPort)) + listener, err := net.ListenTCP("tcp", tcpAddr) + if err != nil { + return l, err + } + + l = &Listener{ + addr: listener.Addr(), + l: listener, + conns: make(chan *Conn), + closeFlag: false, + } + + go func() { + for { + conn, err := l.l.AcceptTCP() + if err != nil { + if l.closeFlag { + return + } + continue + } + + c := &Conn{ + TcpConn: conn, + closeFlag: false, + } + c.Reader = bufio.NewReader(c.TcpConn) + l.conns <- c + } + }() + return l, err +} + +// wait util get one new connection or close +// if listener is closed, return nil func (l *Listener) GetConn() (conn *Conn) { - conn = <-l.Conns + var ok bool + conn, ok = <-l.conns + if !ok { + return nil + } return conn } +func (l *Listener) Close() { + if l.l != nil && l.closeFlag == false { + l.closeFlag = true + l.l.Close() + close(l.conns) + } +} + +// wrap for TCPConn type Conn struct { - TcpConn *net.TCPConn - Reader *bufio.Reader + TcpConn *net.TCPConn + Reader *bufio.Reader + closeFlag bool } -func (c *Conn) ConnectServer(host string, port int64) (err error) { +func ConnectServer(host string, port int64) (c *Conn, err error) { + c = &Conn{} servertAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", host, port)) if err != nil { - return err + return } conn, err := net.DialTCP("tcp", nil, servertAddr) if err != nil { - return err + return } c.TcpConn = conn c.Reader = bufio.NewReader(c.TcpConn) - return nil + c.closeFlag = false + return c, nil } func (c *Conn) GetRemoteAddr() (addr string) { @@ -50,6 +104,9 @@ func (c *Conn) GetLocalAddr() (addr string) { func (c *Conn) ReadLine() (buff string, err error) { buff, err = c.Reader.ReadString('\n') + if err == io.EOF { + c.closeFlag = true + } return buff, err } @@ -60,40 +117,16 @@ func (c *Conn) Write(content string) (err error) { func (c *Conn) Close() { if c.TcpConn != nil { + c.closeFlag = true c.TcpConn.Close() } } -func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { - tcpAddr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("%s:%d", bindAddr, bindPort)) - listener, err := net.ListenTCP("tcp", tcpAddr) - if err != nil { - return l, err - } - - l = &Listener{ - Addr: listener.Addr(), - Conns: make(chan *Conn), - } - - go func() { - for { - conn, err := listener.AcceptTCP() - if err != nil { - continue - } - - c := &Conn{ - TcpConn: conn, - } - c.Reader = bufio.NewReader(c.TcpConn) - l.Conns <- c - } - }() - return l, err +func (c *Conn) IsClosed() bool { + return c.closeFlag } -// will block until conn close +// will block until connection close func Join(c1 *Conn, c2 *Conn) { var wait sync.WaitGroup pipe := func(to *Conn, from *Conn) {