nps/client/client.go

235 lines
5.5 KiB
Go
Executable File
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

package client
import (
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/config"
"github.com/cnlh/nps/lib/conn"
"github.com/cnlh/nps/lib/mux"
"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
"github.com/cnlh/nps/vender/github.com/xtaci/kcp"
"net"
"os"
"time"
)
type TRPClient struct {
svrAddr string
bridgeConnType string
proxyUrl string
vKey string
tunnel *mux.Mux
signal *conn.Conn
ticker *time.Ticker
cnf *config.Config
}
//new client
func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string, cnf *config.Config) *TRPClient {
return &TRPClient{
svrAddr: svraddr,
vKey: vKey,
bridgeConnType: bridgeConnType,
proxyUrl: proxyUrl,
cnf: cnf,
}
}
//start
func (s *TRPClient) Start() {
retry:
c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl)
if err != nil {
logs.Error("The connection server failed and will be reconnected in five seconds")
time.Sleep(time.Second * 5)
goto retry
}
logs.Info("Successful connection with server %s", s.svrAddr)
go s.ping()
s.processor(c)
}
func (s *TRPClient) Close() {
s.tunnel.Close()
s.signal.Close()
s.ticker.Stop()
}
//处理
func (s *TRPClient) processor(c *conn.Conn) {
s.signal = c
go s.dealChan()
if s.cnf != nil && len(s.cnf.Healths) > 0 {
go heathCheck(s.cnf.Healths, s.signal)
}
for {
flags, err := c.ReadFlag()
if err != nil {
logs.Error("Accept server data error %s, end this service", err.Error())
break
}
switch flags {
case common.VERIFY_EER:
logs.Error("VKey:%s is incorrect, the server refuses to connect, please check", s.vKey)
os.Exit(0)
case common.RES_CLOSE:
logs.Error("The authentication key is connected by another client or the server closes the client.")
os.Exit(0)
case common.RES_MSG:
logs.Error("Server-side return error")
break
case common.NEW_UDP_CONN:
//读取服务端地址、密钥 继续做处理
if lAddr, err := c.GetShortLenContent(); err != nil {
logs.Warn(err)
return
} else if pwd, err := c.GetShortLenContent(); err == nil {
go s.newUdpConn(string(lAddr), string(pwd))
}
default:
logs.Warn("The error could not be resolved")
break
}
}
c.Close()
s.Close()
}
func (s *TRPClient) newUdpConn(rAddr string, md5Password string) {
tmpConn, err := common.GetLocalUdpAddr()
if err != nil {
logs.Error(err)
return
}
localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String())
localConn, err := net.ListenUDP("udp", localAddr)
if err != nil {
logs.Error(err)
return
}
localKcpConn, err := kcp.NewConn(rAddr, nil, 150, 3, localConn)
if err != nil {
logs.Error(err)
return
}
conn.SetUdpSession(localKcpConn)
localToolConn := conn.NewConn(localKcpConn)
//写入密钥、provider身份
if _, err := localToolConn.Write([]byte(md5Password)); err != nil {
logs.Error(err)
return
}
if _, err := localToolConn.Write([]byte(common.WORK_P2P_PROVIDER)); err != nil {
logs.Error(err)
return
}
//接收服务端传的visitor地址
var b []byte
if b, err = localToolConn.GetShortLenContent(); err != nil {
logs.Error(err)
return
}
//向visitor地址发送测试消息
visitorAddr, err := net.ResolveUDPAddr("udp", string(b))
if err != nil {
logs.Error(err)
return
}
//向目标IP发送探测包
if _, err := localConn.WriteTo([]byte("test"), visitorAddr); err != nil {
logs.Error(err)
return
}
//给服务端发反馈
if _, err := localToolConn.Write([]byte(common.VERIFY_SUCCESS)); err != nil {
logs.Error(err)
return
}
//关闭与服务端的连接
localConn.Close()
//关闭与服务端udp conn建立新的监听
if localConn, err = net.ListenUDP("udp", localAddr); err != nil {
logs.Error(err)
return
}
l, err := kcp.ServeConn(nil, 150, 3, localConn)
if err != nil {
logs.Error(err)
return
}
//接收新的监听得到conn
for {
udpTunnel, err := l.AcceptKCP()
if err != nil {
logs.Error(err)
l.Close()
return
}
if udpTunnel.RemoteAddr().String() == string(b) {
conn.SetUdpSession(udpTunnel)
//读取link,设置msgCh 设置msgConn消息回传响应机制
l := mux.NewMux(udpTunnel, s.bridgeConnType)
for {
connMux, err := l.Accept()
if err != nil {
continue
}
go s.srcProcess(connMux)
}
}
}
}
//mux tunnel
func (s *TRPClient) dealChan() {
tunnel, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl)
if err != nil {
logs.Error("connect to ", s.svrAddr, "error:", err)
return
}
s.tunnel = mux.NewMux(tunnel.Conn, s.bridgeConnType)
for {
src, err := s.tunnel.Accept()
if err != nil {
logs.Warn(err)
s.Close()
break
}
go s.srcProcess(src)
}
}
func (s *TRPClient) srcProcess(src net.Conn) {
lk, err := conn.NewConn(src).GetLinkInfo()
if err != nil {
src.Close()
logs.Error("get connection info from server error ", err)
return
}
//host for target processing
lk.Host = common.FormatAddress(lk.Host)
//connect to target
if targetConn, err := net.Dial(lk.ConnType, lk.Host); err != nil {
logs.Warn("connect to %s error %s", lk.Host, err.Error())
src.Close()
} else {
logs.Trace("new %s connection with the goal of %s, remote address:%s", lk.ConnType, lk.Host, lk.RemoteAddr)
conn.CopyWaitGroup(src, targetConn, lk.Crypt, lk.Compress, nil, nil, false)
}
}
func (s *TRPClient) ping() {
s.ticker = time.NewTicker(time.Second * 5)
loop:
for {
select {
case <-s.ticker.C:
if s.tunnel.IsClose {
s.Close()
s.ticker.Stop()
break loop
}
}
}
}