diff --git a/client/control.go b/client/control.go index 5a094dac..49dfb290 100644 --- a/client/control.go +++ b/client/control.go @@ -150,7 +150,7 @@ func (ctl *Control) NewWorkConn() { workConn = net.WrapConn(stream) } else { - workConn, err = net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, + workConn, err = net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) if err != nil { ctl.Warn("start new work connection error: %v", err) @@ -199,7 +199,7 @@ func (ctl *Control) login() (err error) { ctl.session.Close() } - conn, err := net.ConnectTcpServerByHttpProxy(config.ClientCommonCfg.HttpProxy, + conn, err := net.ConnectServerByHttpProxy(config.ClientCommonCfg.HttpProxy, config.ClientCommonCfg.Protocol, fmt.Sprintf("%s:%d", config.ClientCommonCfg.ServerAddr, config.ClientCommonCfg.ServerPort)) if err != nil { return err diff --git a/client/proxy.go b/client/proxy.go index 3dbedf8e..50b99563 100644 --- a/client/proxy.go +++ b/client/proxy.go @@ -294,7 +294,7 @@ func HandleTcpWorkConnection(localInfo *config.LocalSvrConf, proxyPlugin plugin. workConn.Debug("handle by plugin finished") return } else { - localConn, err := frpNet.ConnectTcpServer(fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) + localConn, err := frpNet.ConnectServer("tcp", fmt.Sprintf("%s:%d", localInfo.LocalIp, localInfo.LocalPort)) if err != nil { workConn.Error("connect to local service [%s:%d] error: %v", localInfo.LocalIp, localInfo.LocalPort, err) return diff --git a/conf/frpc_full.ini b/conf/frpc_full.ini index 93fe53df..dab0d891 100644 --- a/conf/frpc_full.ini +++ b/conf/frpc_full.ini @@ -32,6 +32,10 @@ user = your_name # default is true login_fail_exit = true +# communication protocol used to connect to server +# now it supports tcp and kcp, default is tcp +protocol = tcp + # proxy names you want to start divided by ',' # default is empty, means all proxies # start = ssh,dns diff --git a/conf/frps_full.ini b/conf/frps_full.ini index d28f8f33..bc70cc20 100644 --- a/conf/frps_full.ini +++ b/conf/frps_full.ini @@ -49,3 +49,6 @@ subdomain_host = frps.com # if tcp stream multiplexing is used, default is true tcp_mux = true + +# if server support client using kcp protocol, default is true +support_kcp = true diff --git a/models/config/client_common.go b/models/config/client_common.go index 449103a6..8ec6cd89 100644 --- a/models/config/client_common.go +++ b/models/config/client_common.go @@ -41,6 +41,7 @@ type ClientCommonConf struct { User string LoginFailExit bool Start map[string]struct{} + Protocol string HeartBeatInterval int64 HeartBeatTimeout int64 } @@ -61,6 +62,7 @@ func GetDeaultClientCommonConf() *ClientCommonConf { User: "", LoginFailExit: true, Start: make(map[string]struct{}), + Protocol: "tcp", HeartBeatInterval: 30, HeartBeatTimeout: 90, } @@ -154,6 +156,15 @@ func LoadClientCommonConf(conf ini.File) (cfg *ClientCommonConf, err error) { cfg.LoginFailExit = true } + tmpStr, ok = conf.Get("common", "protocol") + if ok { + // Now it only support tcp and kcp. + if tmpStr != "kcp" { + tmpStr = "tcp" + } + cfg.Protocol = tmpStr + } + tmpStr, ok = conf.Get("common", "heartbeat_timeout") if ok { v, err = strconv.ParseInt(tmpStr, 10, 64) diff --git a/models/config/server_common.go b/models/config/server_common.go index 35b158b5..780fef2d 100644 --- a/models/config/server_common.go +++ b/models/config/server_common.go @@ -51,6 +51,7 @@ type ServerCommonConf struct { AuthTimeout int64 SubDomainHost string TcpMux bool + SupportKcp bool // if PrivilegeAllowPorts is not nil, tcp proxies which remote port exist in this map can be connected PrivilegeAllowPorts [][2]int64 @@ -79,6 +80,7 @@ func GetDefaultServerCommonConf() *ServerCommonConf { AuthTimeout: 900, SubDomainHost: "", TcpMux: true, + SupportKcp: true, MaxPoolCount: 5, HeartBeatTimeout: 90, UserConnTimeout: 10, @@ -231,6 +233,13 @@ func LoadServerCommonConf(conf ini.File) (cfg *ServerCommonConf, err error) { cfg.TcpMux = true } + tmpStr, ok = conf.Get("common", "support_kcp") + if ok && tmpStr == "false" { + cfg.SupportKcp = false + } else { + cfg.SupportKcp = true + } + tmpStr, ok = conf.Get("common", "heartbeat_timeout") if ok { v, errRet := strconv.ParseInt(tmpStr, 10, 64) diff --git a/server/service.go b/server/service.go index 8cd23b6c..b3d71646 100644 --- a/server/service.go +++ b/server/service.go @@ -41,6 +41,9 @@ type Service struct { // Accept connections from client. listener frpNet.Listener + // Accept connections using kcp. + kcpListener frpNet.Listener + // For http proxies, route requests to different clients by hostname and other infomation. VhostHttpMuxer *vhost.HttpMuxer @@ -73,6 +76,17 @@ func NewService() (svr *Service, err error) { err = fmt.Errorf("Create server listener error, %v", err) return } + log.Info("frps tcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + + // Listen for accepting connections from client using kcp protocol. + if config.ServerCommonCfg.SupportKcp { + svr.kcpListener, err = frpNet.ListenKcp(config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + if err != nil { + err = fmt.Errorf("Listen on kcp address [%s:%d] error: %v", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort, err) + return + } + log.Info("frps kcp listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.BindPort) + } // Create http vhost muxer. if config.ServerCommonCfg.VhostHttpPort != 0 { @@ -87,6 +101,7 @@ func NewService() (svr *Service, err error) { err = fmt.Errorf("Create vhost httpMuxer error, %v", err) return } + log.Info("http service listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpPort) } // Create https vhost muxer. @@ -102,6 +117,7 @@ func NewService() (svr *Service, err error) { err = fmt.Errorf("Create vhost httpsMuxer error, %v", err) return } + log.Info("https service listen on %s:%d", config.ServerCommonCfg.BindAddr, config.ServerCommonCfg.VhostHttpsPort) } // Create dashboard web server. @@ -117,9 +133,17 @@ func NewService() (svr *Service, err error) { } func (svr *Service) Run() { + if config.ServerCommonCfg.SupportKcp { + go svr.HandleListener(svr.kcpListener) + } + svr.HandleListener(svr.listener) + +} + +func (svr *Service) HandleListener(l frpNet.Listener) { // Listen for incoming connections from client. for { - c, err := svr.listener.Accept() + c, err := l.Accept() if err != nil { log.Warn("Listener for incoming connections from client closed") return @@ -131,7 +155,7 @@ func (svr *Service) Run() { var rawMsg msg.Message conn.SetReadDeadline(time.Now().Add(connReadTimeout)) if rawMsg, err = msg.ReadMsg(conn); err != nil { - log.Warn("Failed to read message: %v", err) + log.Trace("Failed to read message: %v", err) conn.Close() return } diff --git a/utils/net/conn.go b/utils/net/conn.go index 5f172bca..94ff5307 100644 --- a/utils/net/conn.go +++ b/utils/net/conn.go @@ -15,11 +15,14 @@ package net import ( + "fmt" "io" "net" "time" "github.com/fatedier/frp/utils/log" + + kcp "github.com/xtaci/kcp-go" ) // Conn is the interface of connections used in frp. @@ -77,3 +80,59 @@ type Listener interface { Close() error log.Logger } + +type LogListener struct { + l net.Listener + net.Listener + log.Logger +} + +func WrapLogListener(l net.Listener) Listener { + return &LogListener{ + l: l, + Listener: l, + Logger: log.NewPrefixLogger(""), + } +} + +func (logL *LogListener) Accept() (Conn, error) { + c, err := logL.l.Accept() + return WrapConn(c), err +} + +func ConnectServer(protocol string, addr string) (c Conn, err error) { + switch protocol { + case "tcp": + return ConnectTcpServer(addr) + case "kcp": + kcpConn, errRet := kcp.DialWithOptions(addr, nil, 10, 3) + if errRet != nil { + err = errRet + return + } + kcpConn.SetStreamMode(true) + kcpConn.SetWriteDelay(true) + kcpConn.SetNoDelay(1, 20, 2, 1) + kcpConn.SetWindowSize(128, 512) + kcpConn.SetMtu(1350) + kcpConn.SetACKNoDelay(false) + kcpConn.SetReadBuffer(4194304) + kcpConn.SetWriteBuffer(4194304) + c = WrapConn(kcpConn) + return + default: + return nil, fmt.Errorf("unsupport protocol: %s", protocol) + } +} + +func ConnectServerByHttpProxy(httpProxy string, protocol string, addr string) (c Conn, err error) { + switch protocol { + case "tcp": + return ConnectTcpServerByHttpProxy(httpProxy, addr) + case "kcp": + // http proxy is not supported for kcp + return ConnectServer(protocol, addr) + default: + return nil, fmt.Errorf("unsupport protocol: %s", protocol) + } +} diff --git a/utils/net/kcp.go b/utils/net/kcp.go new file mode 100644 index 00000000..862d3846 --- /dev/null +++ b/utils/net/kcp.go @@ -0,0 +1,87 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package net + +import ( + "fmt" + "net" + + "github.com/fatedier/frp/utils/log" + + kcp "github.com/xtaci/kcp-go" +) + +type KcpListener struct { + net.Addr + listener net.Listener + accept chan Conn + closeFlag bool + log.Logger +} + +func ListenKcp(bindAddr string, bindPort int64) (l *KcpListener, err error) { + listener, err := kcp.ListenWithOptions(fmt.Sprintf("%s:%d", bindAddr, bindPort), nil, 10, 3) + if err != nil { + return l, err + } + listener.SetReadBuffer(4194304) + listener.SetWriteBuffer(4194304) + + l = &KcpListener{ + Addr: listener.Addr(), + listener: listener, + accept: make(chan Conn), + closeFlag: false, + Logger: log.NewPrefixLogger(""), + } + + go func() { + for { + conn, err := listener.AcceptKCP() + if err != nil { + if l.closeFlag { + close(l.accept) + return + } + continue + } + conn.SetStreamMode(true) + conn.SetWriteDelay(true) + conn.SetNoDelay(1, 20, 2, 1) + conn.SetMtu(1350) + conn.SetWindowSize(1024, 1024) + conn.SetACKNoDelay(false) + + l.accept <- WrapConn(conn) + } + }() + return l, err +} + +func (l *KcpListener) Accept() (Conn, error) { + conn, ok := <-l.accept + if !ok { + return conn, fmt.Errorf("channel for kcp listener closed") + } + return conn, nil +} + +func (l *KcpListener) Close() error { + if !l.closeFlag { + l.closeFlag = true + l.listener.Close() + } + return nil +}