udp: add heartbeat in udp work connection

pull/316/head
fatedier 8 years ago
parent a84dd05351
commit 69b09eb8a2

@ -27,3 +27,6 @@ app:
env CGO_ENABLED=0 GOOS=linux GOARCH=mips go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mips ./cmd/frps env CGO_ENABLED=0 GOOS=linux GOARCH=mips go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mips ./cmd/frps
env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frpc_linux_mipsle ./cmd/frpc env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frpc_linux_mipsle ./cmd/frpc
env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mipsle ./cmd/frps env CGO_ENABLED=0 GOOS=linux GOARCH=mipsle go build -o -ldflags "$(LDFLAGS)" ./frps_linux_mipsle ./cmd/frps
temp:
env CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -ldflags "$(LDFLAGS)" -o ./frps_linux_amd64 ./cmd/frps

@ -19,6 +19,7 @@ import (
"io" "io"
"net" "net"
"sync" "sync"
"time"
"github.com/fatedier/frp/models/config" "github.com/fatedier/frp/models/config"
"github.com/fatedier/frp/models/msg" "github.com/fatedier/frp/models/msg"
@ -141,8 +142,10 @@ type UdpProxy struct {
localAddr *net.UDPAddr localAddr *net.UDPAddr
readCh chan *msg.UdpPacket readCh chan *msg.UdpPacket
sendCh chan *msg.UdpPacket
workConn frpNet.Conn // include msg.UdpPacket and msg.Ping
sendCh chan msg.Message
workConn frpNet.Conn
} }
func (pxy *UdpProxy) Run() (err error) { func (pxy *UdpProxy) Run() (err error) {
@ -172,18 +175,18 @@ func (pxy *UdpProxy) Close() {
} }
func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) { func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
pxy.Info("incoming a new work connection for udp proxy") pxy.Info("incoming a new work connection for udp proxy, %s", conn.RemoteAddr().String())
// close resources releated with old workConn // close resources releated with old workConn
pxy.Close() pxy.Close()
pxy.mu.Lock() pxy.mu.Lock()
pxy.workConn = conn pxy.workConn = conn
pxy.readCh = make(chan *msg.UdpPacket, 64) pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.sendCh = make(chan *msg.UdpPacket, 64) pxy.sendCh = make(chan msg.Message, 1024)
pxy.closed = false pxy.closed = false
pxy.mu.Unlock() pxy.mu.Unlock()
workConnReaderFn := func(conn net.Conn) { workConnReaderFn := func(conn net.Conn, readCh chan *msg.UdpPacket) {
for { for {
var udpMsg msg.UdpPacket var udpMsg msg.UdpPacket
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil {
@ -192,26 +195,46 @@ func (pxy *UdpProxy) InWorkConn(conn frpNet.Conn) {
} }
if errRet := errors.PanicToError(func() { if errRet := errors.PanicToError(func() {
pxy.Trace("get udp package from workConn: %s", udpMsg.Content) pxy.Trace("get udp package from workConn: %s", udpMsg.Content)
pxy.readCh <- &udpMsg readCh <- &udpMsg
}); errRet != nil { }); errRet != nil {
pxy.Info("reader goroutine for udp work connection closed: %v", errRet) pxy.Info("reader goroutine for udp work connection closed: %v", errRet)
return return
} }
} }
} }
workConnSenderFn := func(conn net.Conn) { workConnSenderFn := func(conn net.Conn, sendCh chan msg.Message) {
defer func() {
pxy.Info("writer goroutine for udp work connection closed")
}()
var errRet error var errRet error
for udpMsg := range pxy.sendCh { for rawMsg := range sendCh {
pxy.Trace("send udp package to workConn: %s", udpMsg.Content) switch m := rawMsg.(type) {
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { case *msg.UdpPacket:
pxy.Info("sender goroutine for udp work connection closed") pxy.Trace("send udp package to workConn: %s", m.Content)
case *msg.Ping:
pxy.Trace("send ping message to udp workConn")
}
if errRet = msg.WriteMsg(conn, rawMsg); errRet != nil {
pxy.Error("udp work write error: %v", errRet)
return return
} }
} }
} }
heartbeatFn := func(conn net.Conn, sendCh chan msg.Message) {
var errRet error
for {
time.Sleep(time.Duration(30) * time.Second)
if errRet = errors.PanicToError(func() {
sendCh <- &msg.Ping{}
}); errRet != nil {
pxy.Trace("heartbeat goroutine for udp work connection closed")
}
}
}
go workConnSenderFn(pxy.workConn) go workConnSenderFn(pxy.workConn, pxy.sendCh)
go workConnReaderFn(pxy.workConn) go workConnReaderFn(pxy.workConn, pxy.readCh)
go heartbeatFn(pxy.workConn, pxy.sendCh)
udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh) udp.Forwarder(pxy.localAddr, pxy.readCh, pxy.sendCh)
} }

@ -26,9 +26,9 @@ pool_count = 5
user = your_name user = your_name
# heartbeat configure, it's not recommended to modify the default value # heartbeat configure, it's not recommended to modify the default value
# the default value of heartbeat_interval is 10 and heartbeat_timeout is 30 # the default value of heartbeat_interval is 10 and heartbeat_timeout is 90
# heartbeat_interval = 10 # heartbeat_interval = 30
# heartbeat_timeout = 30 # heartbeat_timeout = 90
# ssh is the proxy name same as server's configuration # ssh is the proxy name same as server's configuration
# if user in [common] section is not empty, it will be changed to {user}.{proxy} such as your_name.ssh # if user in [common] section is not empty, it will be changed to {user}.{proxy} such as your_name.ssh

@ -30,8 +30,8 @@ log_max_days = 3
privilege_token = 12345678 privilege_token = 12345678
# heartbeat configure, it's not recommended to modify the default value # heartbeat configure, it's not recommended to modify the default value
# the default value of heartbeat_timeout is 30 # the default value of heartbeat_timeout is 90
# heartbeat_timeout = 30 # heartbeat_timeout = 90
# only allow frpc to bind ports you list, if you set nothing, there won't be any limit # only allow frpc to bind ports you list, if you set nothing, there won't be any limit
privilege_allow_ports = 2000-3000,3001,3003,4000-50000 privilege_allow_ports = 2000-3000,3001,3003,4000-50000

@ -54,8 +54,8 @@ func GetDeaultClientCommonConf() *ClientCommonConf {
PrivilegeToken: "", PrivilegeToken: "",
PoolCount: 1, PoolCount: 1,
User: "", User: "",
HeartBeatInterval: 10, HeartBeatInterval: 30,
HeartBeatTimeout: 30, HeartBeatTimeout: 90,
} }
} }

@ -77,7 +77,7 @@ func GetDefaultServerCommonConf() *ServerCommonConf {
AuthTimeout: 900, AuthTimeout: 900,
SubDomainHost: "", SubDomainHost: "",
MaxPoolCount: 10, MaxPoolCount: 10,
HeartBeatTimeout: 30, HeartBeatTimeout: 90,
UserConnTimeout: 10, UserConnTimeout: 10,
} }
} }

@ -69,7 +69,7 @@ func ForwardUserConn(udpConn *net.UDPConn, readCh <-chan *msg.UdpPacket, sendCh
return return
} }
func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- *msg.UdpPacket) { func Forwarder(dstAddr *net.UDPAddr, readCh <-chan *msg.UdpPacket, sendCh chan<- msg.Message) {
var ( var (
mu sync.RWMutex mu sync.RWMutex
) )

@ -310,16 +310,21 @@ func (pxy *UdpProxy) Run() (err error) {
pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort) pxy.Info("udp proxy listen port [%d]", pxy.cfg.RemotePort)
pxy.udpConn = udpConn pxy.udpConn = udpConn
pxy.sendCh = make(chan *msg.UdpPacket, 64) pxy.sendCh = make(chan *msg.UdpPacket, 1024)
pxy.readCh = make(chan *msg.UdpPacket, 64) pxy.readCh = make(chan *msg.UdpPacket, 1024)
pxy.checkCloseCh = make(chan int) pxy.checkCloseCh = make(chan int)
// read message from workConn, if it returns any error, notify proxy to start a new workConn // read message from workConn, if it returns any error, notify proxy to start a new workConn
workConnReaderFn := func(conn net.Conn) { workConnReaderFn := func(conn net.Conn) {
for { for {
var udpMsg msg.UdpPacket var (
rawMsg msg.Message
errRet error
)
pxy.Trace("loop waiting message from udp workConn") pxy.Trace("loop waiting message from udp workConn")
if errRet := msg.ReadMsgInto(conn, &udpMsg); errRet != nil { // client will send heartbeat in workConn for keeping alive
conn.SetReadDeadline(time.Now().Add(time.Duration(60) * time.Second))
if rawMsg, errRet = msg.ReadMsg(conn); errRet != nil {
pxy.Warn("read from workConn for udp error: %v", errRet) pxy.Warn("read from workConn for udp error: %v", errRet)
conn.Close() conn.Close()
// notify proxy to start a new work connection // notify proxy to start a new work connection
@ -329,14 +334,21 @@ func (pxy *UdpProxy) Run() (err error) {
}) })
return return
} }
if errRet := errors.PanicToError(func() { conn.SetReadDeadline(time.Time{})
pxy.Trace("get udp message from workConn: %s", udpMsg.Content) switch m := rawMsg.(type) {
pxy.readCh <- &udpMsg case *msg.Ping:
StatsAddTrafficOut(pxy.GetName(), int64(len(udpMsg.Content))) pxy.Trace("udp work conn get ping message")
}); errRet != nil { continue
conn.Close() case *msg.UdpPacket:
pxy.Info("reader goroutine for udp work connection closed") if errRet := errors.PanicToError(func() {
return pxy.Trace("get udp message from workConn: %s", m.Content)
pxy.readCh <- m
StatsAddTrafficOut(pxy.GetName(), int64(len(m.Content)))
}); errRet != nil {
conn.Close()
pxy.Info("reader goroutine for udp work connection closed")
return
}
} }
} }
} }
@ -348,7 +360,7 @@ func (pxy *UdpProxy) Run() (err error) {
select { select {
case udpMsg, ok := <-pxy.sendCh: case udpMsg, ok := <-pxy.sendCh:
if !ok { if !ok {
pxy.Info("sender goroutine for udp work condition closed") pxy.Info("sender goroutine for udp work connection closed")
return return
} }
if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil { if errRet = msg.WriteMsg(conn, udpMsg); errRet != nil {

Loading…
Cancel
Save