diff --git a/.travis.yml b/.travis.yml index 68fda64..8fdbf99 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: go go: - 1.4.2 - - 1.5.2 + - 1.5.1 install: - make diff --git a/Makefile b/Makefile index 0151542..797e11f 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,5 @@ export PATH := $(GOPATH)/bin:$(PATH) +export NEW_GOPATH := $(shell pwd) all: build @@ -9,13 +10,13 @@ godep: godep restore fmt: - @godep go fmt ./... + @GOPATH=$(NEW_GOPATH) godep go fmt ./... frps: - godep go build -o bin/frps ./cmd/frps + GOPATH=$(NEW_GOPATH) godep go build -o bin/frps ./src/frp/cmd/frps frpc: - godep go build -o bin/frpc ./cmd/frpc + GOPATH=$(NEW_GOPATH) godep go build -o bin/frpc ./src/frp/cmd/frpc test: - @godep go test ./... + @GOPATH=$(NEW_GOPATH) godep go test ./... diff --git a/conf/frpc.ini b/conf/frpc.ini index f6df4b6..447cdc8 100644 --- a/conf/frpc.ini +++ b/conf/frpc.ini @@ -1,7 +1,7 @@ # common是必须的section [common] server_addr = 127.0.0.1 -bind_port = 7000 +server_port = 7000 log_file = ./frpc.log # debug, info, warn, error log_level = debug diff --git a/cmd/frpc/control.go b/src/frp/cmd/frpc/control.go similarity index 90% rename from cmd/frpc/control.go rename to src/frp/cmd/frpc/control.go index c328555..4cc3eca 100644 --- a/cmd/frpc/control.go +++ b/src/frp/cmd/frpc/control.go @@ -7,15 +7,13 @@ import ( "sync" "time" - "github.com/fatedier/frp/models/client" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/client" + "frp/models/consts" + "frp/models/msg" + "frp/utils/conn" + "frp/utils/log" ) -var isHeartBeatContinue bool = true - func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { defer wait.Done() @@ -30,9 +28,10 @@ func ControlProcess(cli *client.ProxyClient, wait *sync.WaitGroup) { // ignore response content now _, err := c.ReadLine() if err == io.EOF { - isHeartBeatContinue = false log.Debug("ProxyName [%s], server close this control conn", cli.Name) var sleepTime time.Duration = 1 + + // loop until connect to server for { log.Debug("ProxyName [%s], try to reconnect to server[%s:%d]...", cli.Name, client.ServerAddr, client.ServerPort) tmpConn, err := loginToServer(cli) @@ -114,5 +113,5 @@ func startHeartBeat(c *conn.Conn) { break } } - log.Info("heartbeat exit") + log.Debug("heartbeat exit") } diff --git a/cmd/frpc/main.go b/src/frp/cmd/frpc/main.go similarity index 85% rename from cmd/frpc/main.go rename to src/frp/cmd/frpc/main.go index c17f3e7..0177234 100644 --- a/cmd/frpc/main.go +++ b/src/frp/cmd/frpc/main.go @@ -4,8 +4,8 @@ import ( "os" "sync" - "github.com/fatedier/frp/models/client" - "github.com/fatedier/frp/utils/log" + "frp/models/client" + "frp/utils/log" ) func main() { diff --git a/cmd/frps/control.go b/src/frp/cmd/frps/control.go similarity index 89% rename from cmd/frps/control.go rename to src/frp/cmd/frps/control.go index 02ff86e..6154054 100644 --- a/cmd/frps/control.go +++ b/src/frp/cmd/frps/control.go @@ -6,16 +6,19 @@ import ( "io" "time" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/models/server" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/models/msg" + "frp/models/server" + "frp/utils/conn" + "frp/utils/log" ) func ProcessControlConn(l *conn.Listener) { for { - c := l.GetConn() + c, err := l.GetConn() + if err != nil { + return + } log.Debug("Get one new conn, %v", c.GetRemoteAddr()) go controlWorker(c) } @@ -47,7 +50,6 @@ func controlWorker(c *conn.Conn) { } if needRes { - // control conn defer c.Close() buf, _ := json.Marshal(clientCtlRes) @@ -62,7 +64,7 @@ func controlWorker(c *conn.Conn) { return } - // others is from server to client + // other messages is from server to client s, ok := server.ProxyServers[clientCtlReq.ProxyName] if !ok { log.Warn("ProxyName [%s] is not exist", clientCtlReq.ProxyName) @@ -138,7 +140,7 @@ func checkProxy(req *msg.ClientCtlReq, c *conn.Conn) (succ bool, info string, ne return } - s.CliConnChan <- c + s.GetNewCliConn(c) } else { info = fmt.Sprintf("ProxyName [%s], type [%d] unsupport", req.ProxyName, req.Type) log.Warn(info) @@ -153,8 +155,8 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { isContinueRead := true f := func() { isContinueRead = false - c.Close() s.Close() + log.Error("ProxyName [%s], client heartbeat timeout", s.Name) } timer := time.AfterFunc(time.Duration(server.HeartBeatTimeout)*time.Second, f) defer timer.Stop() @@ -164,13 +166,17 @@ func readControlMsgFromClient(s *server.ProxyServer, c *conn.Conn) { if err != nil { if err == io.EOF { log.Warn("ProxyName [%s], client is dead!", s.Name) - c.Close() s.Close() break + } else if c.IsClosed() { + log.Warn("ProxyName [%s], client connection is closed", s.Name) + break } + log.Error("ProxyName [%s], read error: %v", s.Name, err) continue } + log.Debug("ProxyName [%s], get heartbeat", s.Name) timer.Reset(time.Duration(server.HeartBeatTimeout) * time.Second) } diff --git a/cmd/frps/main.go b/src/frp/cmd/frps/main.go similarity index 76% rename from cmd/frps/main.go rename to src/frp/cmd/frps/main.go index e21f927..c4ce4d7 100644 --- a/cmd/frps/main.go +++ b/src/frp/cmd/frps/main.go @@ -3,9 +3,9 @@ package main import ( "os" - "github.com/fatedier/frp/models/server" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/server" + "frp/utils/conn" + "frp/utils/log" ) func main() { diff --git a/models/client/client.go b/src/frp/models/client/client.go similarity index 90% rename from models/client/client.go rename to src/frp/models/client/client.go index 81a0448..69d256d 100644 --- a/models/client/client.go +++ b/src/frp/models/client/client.go @@ -3,10 +3,10 @@ package client import ( "encoding/json" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/models/msg" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/models/msg" + "frp/utils/conn" + "frp/utils/log" ) type ProxyClient struct { diff --git a/models/client/config.go b/src/frp/models/client/config.go similarity index 100% rename from models/client/config.go rename to src/frp/models/client/config.go diff --git a/models/consts/consts.go b/src/frp/models/consts/consts.go similarity index 100% rename from models/consts/consts.go rename to src/frp/models/consts/consts.go diff --git a/models/msg/msg.go b/src/frp/models/msg/msg.go similarity index 100% rename from models/msg/msg.go rename to src/frp/models/msg/msg.go diff --git a/models/server/config.go b/src/frp/models/server/config.go similarity index 98% rename from models/server/config.go rename to src/frp/models/server/config.go index f9e974e..ec70071 100644 --- a/models/server/config.go +++ b/src/frp/models/server/config.go @@ -15,6 +15,7 @@ var ( LogLevel string = "warn" LogWay string = "file" HeartBeatTimeout int64 = 30 + UserConnTimeout int64 = 10 ) var ProxyServers map[string]*ProxyServer = make(map[string]*ProxyServer) diff --git a/models/server/server.go b/src/frp/models/server/server.go similarity index 63% rename from models/server/server.go rename to src/frp/models/server/server.go index 889b2d7..e8d6d81 100644 --- a/models/server/server.go +++ b/src/frp/models/server/server.go @@ -3,29 +3,30 @@ package server import ( "container/list" "sync" + "time" - "github.com/fatedier/frp/models/consts" - "github.com/fatedier/frp/utils/conn" - "github.com/fatedier/frp/utils/log" + "frp/models/consts" + "frp/utils/conn" + "frp/utils/log" ) type ProxyServer struct { - Name string - Passwd string - BindAddr string - ListenPort int64 - Status int64 - CliConnChan chan *conn.Conn // get client conns from control goroutine - - listener *conn.Listener // accept new connection from remote users - ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel - userConnList *list.List // store user conns + Name string + Passwd string + BindAddr string + ListenPort int64 + Status int64 + + listener *conn.Listener // accept new connection from remote users + ctlMsgChan chan int64 // every time accept a new user conn, put "1" to the channel + cliConnChan chan *conn.Conn // get client conns from control goroutine + userConnList *list.List // store user conns mutex sync.Mutex } func (p *ProxyServer) Init() { p.Status = consts.Idle - p.CliConnChan = make(chan *conn.Conn) + p.cliConnChan = make(chan *conn.Conn) p.ctlMsgChan = make(chan int64) p.userConnList = list.New() } @@ -48,13 +49,13 @@ func (p *ProxyServer) Start() (err error) { p.Status = consts.Working - // start a goroutine for listener + // start a goroutine for listener to accept user connection go func() { for { // block - // if listener is closed, get nil - c := p.listener.GetConn() - if c == nil { + // if listener is closed, err returned + c, err := p.listener.GetConn() + if err != nil { log.Info("ProxyName [%s], listener is closed", p.Name) return } @@ -73,13 +74,28 @@ func (p *ProxyServer) Start() (err error) { // put msg to control conn p.ctlMsgChan <- 1 + + // set timeout + time.AfterFunc(time.Duration(UserConnTimeout)*time.Second, func() { + p.Lock() + defer p.Unlock() + element := p.userConnList.Front() + if element == nil { + return + } + + userConn := element.Value.(*conn.Conn) + if userConn == c { + log.Warn("ProxyName [%s], user conn [%s] timeout", p.Name, c.GetRemoteAddr()) + } + }) } }() // start another goroutine for join two conns from client and user go func() { for { - cliConn, ok := <-p.CliConnChan + cliConn, ok := <-p.cliConnChan if !ok { return } @@ -114,7 +130,7 @@ func (p *ProxyServer) Close() { p.Status = consts.Idle p.listener.Close() close(p.ctlMsgChan) - close(p.CliConnChan) + close(p.cliConnChan) p.userConnList = list.New() p.Unlock() } @@ -128,3 +144,7 @@ func (p *ProxyServer) WaitUserConn() (closeFlag bool) { } return } + +func (p *ProxyServer) GetNewCliConn(c *conn.Conn) { + p.cliConnChan <- c +} diff --git a/utils/broadcast/broadcast.go b/src/frp/utils/broadcast/broadcast.go similarity index 100% rename from utils/broadcast/broadcast.go rename to src/frp/utils/broadcast/broadcast.go diff --git a/utils/broadcast/broadcast_test.go b/src/frp/utils/broadcast/broadcast_test.go similarity index 100% rename from utils/broadcast/broadcast_test.go rename to src/frp/utils/broadcast/broadcast_test.go diff --git a/utils/conn/conn.go b/src/frp/utils/conn/conn.go similarity index 89% rename from utils/conn/conn.go rename to src/frp/utils/conn/conn.go index 4cf6762..dceabd9 100644 --- a/utils/conn/conn.go +++ b/src/frp/utils/conn/conn.go @@ -7,7 +7,7 @@ import ( "net" "sync" - "github.com/fatedier/frp/utils/log" + "frp/utils/log" ) type Listener struct { @@ -52,15 +52,15 @@ func Listen(bindAddr string, bindPort int64) (l *Listener, err error) { return l, err } -// wait util get one new connection or close -// if listener is closed, return nil -func (l *Listener) GetConn() (conn *Conn) { +// wait util get one new connection or listener is closed +// if listener is closed, err returned +func (l *Listener) GetConn() (conn *Conn, err error) { var ok bool conn, ok = <-l.conns if !ok { - return nil + return conn, fmt.Errorf("channel close") } - return conn + return conn, nil } func (l *Listener) Close() { @@ -116,7 +116,7 @@ func (c *Conn) Write(content string) (err error) { } func (c *Conn) Close() { - if c.TcpConn != nil { + if c.TcpConn != nil && c.closeFlag == false { c.closeFlag = true c.TcpConn.Close() } diff --git a/utils/log/log.go b/src/frp/utils/log/log.go similarity index 100% rename from utils/log/log.go rename to src/frp/utils/log/log.go diff --git a/utils/pcrypto/pcrypto.go b/src/frp/utils/pcrypto/pcrypto.go similarity index 100% rename from utils/pcrypto/pcrypto.go rename to src/frp/utils/pcrypto/pcrypto.go diff --git a/utils/pcrypto/pcrypto_test.go b/src/frp/utils/pcrypto/pcrypto_test.go similarity index 100% rename from utils/pcrypto/pcrypto_test.go rename to src/frp/utils/pcrypto/pcrypto_test.go