From 534d428c6d21e6ff28868b2b0ff0885d04a96604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B2=B3?= Date: Tue, 26 Feb 2019 22:40:28 +0800 Subject: [PATCH] P2p first version --- bridge/bridge.go | 32 +++++- client/client.go | 133 +++++++++++++++++++----- client/local.go | 81 +++++++++++++-- conf/npc.conf | 12 +-- lib/common/const.go | 7 +- lib/common/util.go | 26 ++++- lib/config/config.go | 11 +- lib/conn/conn.go | 25 ++++- lib/conn/link.go | 26 ++++- lib/file/file.go | 2 +- lib/file/obj.go | 1 - lib/mux/bytes.go | 32 ++++++ lib/mux/conn.go | 148 +++++++++++++++++++++++++++ lib/mux/map.go | 64 ++++++++++++ lib/mux/mux.go | 226 +++++++++++++++++++++++++++++++++++++++++ lib/mux/mux_test.go | 96 +++++++++++++++++ lib/pool/pool.go | 1 + server/proxy/base.go | 25 +---- server/proxy/http.go | 2 +- server/proxy/p2p.go | 104 +++++++++++++++++++ server/proxy/socks5.go | 2 +- server/proxy/udp.go | 2 +- server/server.go | 5 +- 23 files changed, 986 insertions(+), 77 deletions(-) create mode 100644 lib/mux/bytes.go create mode 100644 lib/mux/conn.go create mode 100644 lib/mux/map.go create mode 100644 lib/mux/mux.go create mode 100644 lib/mux/mux_test.go create mode 100644 server/proxy/p2p.go diff --git a/bridge/bridge.go b/bridge/bridge.go index d4df082..562bf7b 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -11,6 +11,7 @@ import ( "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/lib/version" "github.com/cnlh/nps/server/tool" + "github.com/cnlh/nps/vender/github.com/astaxie/beego" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "net" @@ -71,7 +72,6 @@ func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int } func (s *Bridge) StartTunnel() error { - go s.linkCleanSession() var err error if s.tunnelType == "kcp" { s.kcpListener, err = kcp.ListenWithOptions(":"+strconv.Itoa(s.TunnelPort), nil, 150, 3) @@ -209,10 +209,35 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int) { go s.GetConfig(c) case common.WORK_REGISTER: go s.register(c) - case common.WORD_SECRET: + case common.WORK_SECRET: if b, err := c.ReadLen(32); err == nil { s.SecretChan <- conn.NewSecret(string(b), c) } + case common.WORK_P2P: + //读取md5密钥 + if b, err := c.ReadLen(32); err != nil { + return + } else if t := file.GetCsvDb().GetTaskByMd5Password(string(b)); t == nil { + return + } else { + s.clientLock.Lock() + if v, ok := s.Client[t.Client.Id]; !ok { + logs.Error("未获取到对应客户端") + s.clientLock.Unlock() + return + } else { + logs.Warn("获取到对应客户端") + s.clientLock.Unlock() + //向密钥对应的客户端发送与服务端udp建立连接信息,地址,密钥 + logs.Warn(v.signal.Write([]byte(common.NEW_UDP_CONN))) + svrAddr := beego.AppConfig.String("serverIp") + ":" + beego.AppConfig.String("p2pPort") + logs.Warn(svrAddr) + logs.Warn(v.signal.WriteLenContent([]byte(svrAddr))) + logs.Warn(string(b), v.signal.WriteLenContent(b)) + //向该请求者发送建立连接请求,服务器地址 + c.WriteLenContent([]byte(svrAddr)) + } + } case common.WORK_SEND_STATUS: s.clientLock.Lock() if v, ok := s.Client[id]; ok { @@ -511,6 +536,7 @@ func (s *Bridge) clientCopy(clientId int) { } } +//TODO 清除有一个未知bug待处理 func (s *Bridge) linkCleanSession() { ticker := time.NewTicker(time.Minute * 5) for { @@ -526,7 +552,7 @@ func (s *Bridge) linkCleanSession() { } v.Unlock() } - s.clientLock.RUnlock() + s.clientLock.Unlock() } } } diff --git a/client/client.go b/client/client.go index 916bb1a..071e65f 100755 --- a/client/client.go +++ b/client/client.go @@ -5,6 +5,7 @@ import ( "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/pool" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" + "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "net" "os" "sync" @@ -38,7 +39,6 @@ func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl st //start func (s *TRPClient) Start() { - go s.linkCleanSession() retry: c, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_MAIN, s.proxyUrl) if err != nil { @@ -82,8 +82,8 @@ func (s *TRPClient) processor(c *conn.Conn) { s.linkMap[link.Id] = link s.Unlock() link.MsgConn = s.msgTunnel - go s.linkProcess(link, c) - link.Run(false) + go linkProcess(link, c, s.tunnel) + link.RunWrite() } case common.RES_CLOSE: logs.Error("The authentication key is connected by another client or the server closes the client.") @@ -91,6 +91,14 @@ func (s *TRPClient) processor(c *conn.Conn) { case common.RES_MSG: logs.Error("Server-side return error") break + case common.NEW_UDP_CONN: + //读取服务端地址、密钥 继续做处理 + if lAddr, err := c.GetLenContent(); err != nil { + return + } else if pwd, err := c.GetLenContent(); err == nil { + logs.Warn(string(lAddr), string(pwd)) + go s.newUdpConn(string(lAddr), string(pwd)) + } default: logs.Warn("The error could not be resolved") break @@ -100,37 +108,112 @@ func (s *TRPClient) processor(c *conn.Conn) { s.Close() } -func (s *TRPClient) linkProcess(link *conn.Link, c *conn.Conn) { +func (s *TRPClient) newUdpConn(rAddr string, md5Password string) { + tmpConn, err := net.Dial("udp", "114.114.114.114:53") + if err != nil { + logs.Warn(err) + return + } + tmpConn.Close() + //与服务端建立udp连接 + localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String()) + localConn, err := net.ListenUDP("udp", localAddr) + if err != nil { + logs.Warn(err) + return + } + localKcpConn, err := kcp.NewConn(rAddr, nil, 150, 3, localConn) + logs.Warn(localConn.RemoteAddr(), rAddr) + conn.SetUdpSession(localKcpConn) + if err != nil { + logs.Warn(err) + return + } + localToolConn := conn.NewConn(localKcpConn) + //写入密钥、provider身份 + if _, err := localToolConn.Write([]byte(md5Password)); err != nil { + logs.Warn(err) + return + } + if _, err := localToolConn.Write([]byte(common.WORK_P2P_PROVIDER)); err != nil { + logs.Warn(err) + return + } + //接收服务端传的visitor地址 + if b, err := localToolConn.GetLenContent(); err != nil { + logs.Warn(err) + return + } else { + logs.Warn("收到服务端回传地址", string(b)) + //向visitor地址发送测试消息 + visitorAddr, err := net.ResolveUDPAddr("udp", string(b)) + if err != nil { + logs.Warn(err) + } + logs.Warn(visitorAddr.String()) + if n, err := localConn.WriteTo([]byte("test"), visitorAddr); err != nil { + logs.Warn(err) + } else { + logs.Warn("write", n) + } + //给服务端发反馈 + if _, err := localToolConn.Write([]byte(common.VERIFY_SUCCESS)); err != nil { + logs.Warn(err) + } + //关闭与服务端的连接 + localConn.Close() + //关闭与服务端udp conn,建立新的监听 + localConn, err = net.ListenUDP("udp", localAddr) + + if err != nil { + logs.Warn(err) + } + l, err := kcp.ServeConn(nil, 150, 3, localConn) + if err != nil { + logs.Warn(err) + return + } + for { + //接收新的监听,得到conn, + udpTunnel, err := l.AcceptKCP() + logs.Warn(udpTunnel.RemoteAddr(), udpTunnel.LocalAddr()) + if err != nil { + logs.Warn(err) + l.Close() + return + } + conn.SetUdpSession(udpTunnel) + if udpTunnel.RemoteAddr().String() == string(b) { + //读取link,设置msgCh 设置msgConn消息回传响应机制 + c, e := net.Dial("tcp", "123.206.77.88:22") + if e != nil { + logs.Warn(e) + return + } + + go common.CopyBuffer(c, udpTunnel) + common.CopyBuffer(udpTunnel, c) + //读取flag ping/new/msg/msgConn//分别对于不同的做法 + break + } + } + + } +} + +func linkProcess(link *conn.Link, statusConn, msgConn *conn.Conn) { link.Host = common.FormatAddress(link.Host) //与目标建立连接 server, err := net.DialTimeout(link.ConnType, link.Host, time.Second*3) if err != nil { - c.WriteFail(link.Id) + statusConn.WriteFail(link.Id) logs.Warn("connect to ", link.Host, "error:", err) return } - c.WriteSuccess(link.Id) + statusConn.WriteSuccess(link.Id) link.Conn = conn.NewConn(server) - buf := pool.BufPoolCopy.Get().([]byte) - for { - if n, err := server.Read(buf); err != nil { - s.tunnel.SendMsg([]byte(common.IO_EOF), link) - break - } else { - if _, err := s.tunnel.SendMsg(buf[:n], link); err != nil { - c.Close() - break - } - if link.ConnType == common.CONN_UDP { - break - } - } - <-link.StatusCh - } - pool.PutBufPoolCopy(buf) - s.Lock() - s.Unlock() + link.RunRead(msgConn) } func (s *TRPClient) getMsgStatus() { diff --git a/client/local.go b/client/local.go index 69a2d90..cf90113 100644 --- a/client/local.go +++ b/client/local.go @@ -3,8 +3,10 @@ package client import ( "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/config" + "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/crypt" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" + "github.com/cnlh/nps/vender/github.com/xtaci/kcp" "net" "strings" ) @@ -39,16 +41,81 @@ func StartLocalServer(l *config.LocalServer, config *config.CommonConfig) error return nil } -func process(conn net.Conn, config *config.CommonConfig, l *config.LocalServer) { - c, err := NewConn(config.Tp, config.VKey, config.Server, common.WORD_SECRET, config.ProxyUrl) +func process(localTcpConn net.Conn, config *config.CommonConfig, l *config.LocalServer) { + var workType string + if l.Type == "secret" { + workType = common.WORK_SECRET + } else { + workType = common.WORK_P2P + } + remoteConn, err := NewConn(config.Tp, config.VKey, config.Server, workType, config.ProxyUrl) if err != nil { logs.Error("Local connection server failed ", err.Error()) } - if _, err := c.Write([]byte(crypt.Md5(l.Password))); err != nil { + if _, err := remoteConn.Write([]byte(crypt.Md5(l.Password))); err != nil { logs.Error("Local connection server failed ", err.Error()) } - go common.CopyBuffer(c, conn) - common.CopyBuffer(conn, c) - c.Close() - conn.Close() + if l.Type == "secret" { + go common.CopyBuffer(remoteConn, localTcpConn) + common.CopyBuffer(localTcpConn, remoteConn) + remoteConn.Close() + localTcpConn.Close() + } else { + //读取服务端地址、密钥 继续做处理 + logs.Warn(111) + if rAddr, err := remoteConn.GetLenContent(); err != nil { + return + } else { + logs.Warn(222) + //与服务端udp建立连接 + tmpConn, err := net.Dial("udp", "114.114.114.114:53") + if err != nil { + logs.Warn(err) + } + tmpConn.Close() + //与服务端建立udp连接 + localAddr, _ := net.ResolveUDPAddr("udp", tmpConn.LocalAddr().String()) + localConn, err := net.ListenUDP("udp", localAddr) + if err != nil { + return + } + logs.Warn(333) + localKcpConn, err := kcp.NewConn(string(rAddr), nil, 150, 3, localConn) + conn.SetUdpSession(localKcpConn) + if err != nil { + logs.Warn(err) + } + localToolConn := conn.NewConn(localKcpConn) + //写入密钥、provider身份 + if _, err := localToolConn.Write([]byte(crypt.Md5(l.Password))); err != nil { + return + } + if _, err := localToolConn.Write([]byte(common.WORK_P2P_VISITOR)); err != nil { + return + } + logs.Warn(444) + //接收服务端传的visitor地址 + if b, err := localToolConn.GetLenContent(); err != nil { + logs.Warn(err) + return + } else { + logs.Warn("收到服务回传地址", string(b)) + //关闭与服务端连接 + localConn.Close() + //建立新的连接 + localConn, err = net.ListenUDP("udp", localAddr) + udpTunnel, err := kcp.NewConn(string(b), nil, 150, 3, localConn) + if err != nil || udpTunnel == nil { + logs.Warn(err) + return + } + conn.SetUdpSession(udpTunnel) + logs.Warn(udpTunnel.RemoteAddr(), string(b), udpTunnel.LocalAddr()) + + go common.CopyBuffer(udpTunnel, localTcpConn) + common.CopyBuffer(localTcpConn, udpTunnel) + } + } + + } } diff --git a/conf/npc.conf b/conf/npc.conf index 6f14ae7..b1d2a7d 100644 --- a/conf/npc.conf +++ b/conf/npc.conf @@ -1,5 +1,5 @@ [common] -server=127.0.0.1:8284 +server=123.206.77.88:8284 tp=tcp vkey=123 auto_reconnection=true @@ -16,10 +16,6 @@ host_change=www.proxy.com target=127.0.0.1:8080 location=/cdn -[ssh_1118] -mode=secretServer -password=1111 -target=123.206.77.88:22 [tcp] mode=tcpServer @@ -37,4 +33,8 @@ port=9004 [udp] mode=udpServer port=9003 -target=114.114.114.53 \ No newline at end of file +target=114.114.114.53 + +[p2p_ssh] +port=2000 +password=p2pssh \ No newline at end of file diff --git a/lib/common/const.go b/lib/common/const.go index cc21265..ece421f 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -13,12 +13,16 @@ const ( WORK_SEND_STATUS = "sdst" WORK_CONFIG = "conf" WORK_REGISTER = "rgst" - WORD_SECRET = "sert" + WORK_SECRET = "sert" + WORK_P2P = "p2pm" + WORK_P2P_VISITOR = "p2pv" + WORK_P2P_PROVIDER = "p2pp" WORK_STATUS = "stus" RES_SIGN = "sign" RES_MSG = "msg0" RES_CLOSE = "clse" NEW_CONN = "conn" //新连接标志 + NEW_UDP_CONN = "udpc" //p2p udp conn NEW_TASK = "task" //新连接标志 NEW_CONF = "conf" //新连接标志 NEW_HOST = "host" //新连接标志 @@ -33,4 +37,5 @@ WWW-Authenticate: Basic realm="easyProxy" ConnectionFailBytes = `HTTP/1.1 404 Not Found ` + ) diff --git a/lib/common/util.go b/lib/common/util.go index 12e2db7..e88bfa3 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -251,7 +251,29 @@ func GetIpByAddr(addr string) string { func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { buf := pool.BufPoolCopy.Get().([]byte) - io.CopyBuffer(dst, src, buf) - pool.PutBufPoolCopy(buf) + for { + nr, er := src.Read(buf) + if nr > 0 { + nw, ew := dst.Write(buf[0:nr]) + if nw > 0 { + written += int64(nw) + } + if ew != nil { + err = ew + break + } + if nr != nw { + err = io.ErrShortWrite + break + } + } + if er != nil { + if er != io.EOF { + err = er + } + break + } + } + defer pool.PutBufPoolCopy(buf) return written, err } diff --git a/lib/config/config.go b/lib/config/config.go index b010072..86423f3 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -18,6 +18,7 @@ type CommonConfig struct { Client *file.Client } type LocalServer struct { + Type string Port int Password string } @@ -53,7 +54,15 @@ func NewConfig(path string) (c *Config, err error) { nowContent = c.content[nowIndex:nextIndex] if strings.Index(getTitleContent(c.title[i]), "secret") == 0 { - c.LocalServer = append(c.LocalServer, delLocalService(nowContent)) + local := delLocalService(nowContent) + local.Type = "secret" + c.LocalServer = append(c.LocalServer, local) + continue + } + if strings.Index(getTitleContent(c.title[i]), "p2p") == 0 { + local := delLocalService(nowContent) + local.Type = "p2p" + c.LocalServer = append(c.LocalServer, local) continue } switch c.title[i] { diff --git a/lib/conn/conn.go b/lib/conn/conn.go index 2ea92d0..c5ea901 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -69,6 +69,15 @@ func (s *Conn) GetHost() (method, address string, rb []byte, err error, r *http. return } +func (s *Conn) GetLenContent() (b []byte, err error) { + var l int + if l, err = s.GetLen(); err != nil { + return + } + b, err = s.ReadLen(l) + return +} + //读取指定长度内容 func (s *Conn) ReadLen(cLen int) ([]byte, error) { if cLen > pool.PoolSize { @@ -77,10 +86,11 @@ func (s *Conn) ReadLen(cLen int) ([]byte, error) { var buf []byte if cLen < pool.PoolSizeSmall { buf = pool.BufPoolSmall.Get().([]byte)[:cLen] - defer pool.PutBufPoolSmall(buf) + //TODO 回收 + //defer pool.PutBufPoolSmall(buf) } else { buf = pool.BufPoolMax.Get().([]byte)[:cLen] - defer pool.PutBufPoolMax(buf) + //defer pool.PutBufPoolMax(buf) } if n, err := io.ReadFull(s, buf); err != nil || n != cLen { return buf, errors.New("Error reading specified length " + err.Error()) @@ -95,6 +105,14 @@ func (s *Conn) GetLen() (int, error) { return int(l), err } +func (s *Conn) WriteLenContent(buf []byte) (err error) { + var b []byte + if b, err = GetLenBytes(buf); err != nil { + return + } + return binary.Write(s.Conn, binary.LittleEndian, b) +} + //read flag func (s *Conn) ReadFlag() (string, error) { val, err := s.ReadLen(4) @@ -477,7 +495,6 @@ func (s *Conn) WriteChan() (int, error) { defer s.Unlock() return s.Write([]byte(common.WORK_CHAN)) } - //获取长度+内容 func GetLenBytes(buf []byte) (b []byte, err error) { raw := bytes.NewBuffer([]byte{}) @@ -491,6 +508,7 @@ func GetLenBytes(buf []byte) (b []byte, err error) { return } + //解析出长度 func GetLenByBytes(buf []byte) (int, error) { nlen := binary.LittleEndian.Uint32(buf) @@ -508,4 +526,5 @@ func SetUdpSession(sess *kcp.UDPSession) { sess.SetNoDelay(1, 10, 2, 1) sess.SetMtu(1600) sess.SetACKNoDelay(true) + sess.SetWriteDelay(false) } diff --git a/lib/conn/link.go b/lib/conn/link.go index 6fd9d8f..83b83e3 100644 --- a/lib/conn/link.go +++ b/lib/conn/link.go @@ -56,7 +56,7 @@ func NewLink(id int, connType string, host string, en, de int, crypt bool, c *Co } } -func (s *Link) Run(flow bool) { +func (s *Link) RunWrite() { go func() { for { select { @@ -76,7 +76,7 @@ func (s *Link) Run(flow bool) { } else { s.Conn.Write(content) } - if flow { + if s.Flow != nil { s.Flow.Add(0, len(content)) } if s.ConnType == common.CONN_UDP { @@ -89,3 +89,25 @@ func (s *Link) Run(flow bool) { } }() } +func (s *Link) RunRead(msgConn *Conn) { + buf := pool.BufPoolCopy.Get().([]byte) + for { + if n, err := s.Conn.Read(buf); err != nil { + msgConn.SendMsg([]byte(common.IO_EOF), s) + break + } else { + if _, err := msgConn.SendMsg(buf[:n], s); err != nil { + msgConn.Close() + break + } + if s.ConnType == common.CONN_UDP { + break + } + if s.Flow != nil { + s.Flow.Add(n, 0) + } + } + <-s.StatusCh + } + pool.PutBufPoolCopy(buf) +} diff --git a/lib/file/file.go b/lib/file/file.go index fee8348..fca6d6d 100644 --- a/lib/file/file.go +++ b/lib/file/file.go @@ -181,7 +181,7 @@ func (s *Csv) DelTask(id int) error { } //md5 password -func (s *Csv) GetSecretTask(p string) *Tunnel { +func (s *Csv) GetTaskByMd5Password(p string) *Tunnel { for _, v := range s.Tasks { if crypt.Md5(v.Password) == p { return v diff --git a/lib/file/obj.go b/lib/file/obj.go index b24e62b..a15824b 100644 --- a/lib/file/obj.go +++ b/lib/file/obj.go @@ -53,7 +53,6 @@ func NewClient(vKey string, noStore bool, noDisplay bool) *Client { Flow: new(Flow), Rate: nil, NoStore: noStore, - id: GetCsvDb().GetClientId(), RWMutex: sync.RWMutex{}, NoDisplay: noDisplay, } diff --git a/lib/mux/bytes.go b/lib/mux/bytes.go new file mode 100644 index 0000000..a7e17f7 --- /dev/null +++ b/lib/mux/bytes.go @@ -0,0 +1,32 @@ +package mux + +import ( + "bytes" + "encoding/binary" + "io" +) + +//write bytes with int32 length +func WriteLenBytes(buf []byte, w io.Writer) (int, error) { + raw := bytes.NewBuffer([]byte{}) + if err := binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil { + return 0, err + } + if err := binary.Write(raw, binary.LittleEndian, buf); err != nil { + return 0, err + } + return w.Write(raw.Bytes()) +} + +//read bytes by length +func ReadLenBytes(buf []byte, r io.Reader) (int, error) { + var l int32 + var err error + if binary.Read(r, binary.LittleEndian, &l) != nil { + return 0, err + } + if _, err = io.ReadFull(r, buf[:l]); err != nil { + return 0, err + } + return int(l), nil +} diff --git a/lib/mux/conn.go b/lib/mux/conn.go new file mode 100644 index 0000000..a443934 --- /dev/null +++ b/lib/mux/conn.go @@ -0,0 +1,148 @@ +package mux + +import ( + "errors" + "github.com/cnlh/nps/lib/pool" + "io" + "net" + "time" +) + +type conn struct { + net.Conn + readMsgCh chan []byte + getStatusCh chan struct{} + connStatusOkCh chan struct{} + connStatusFailCh chan struct{} + readTimeOut time.Time + writeTimeOut time.Time + sendMsgCh chan *msg //mux + sendStatusCh chan int32 //mux + connId int32 + isClose bool + mux *Mux +} + +type msg struct { + connId int32 + content []byte +} + +func NewMsg(connId int32, content []byte) *msg { + return &msg{ + connId: connId, + content: content, + } +} + +func NewConn(connId int32, mux *Mux, sendMsgCh chan *msg, sendStatusCh chan int32) *conn { + return &conn{ + readMsgCh: make(chan []byte), + getStatusCh: make(chan struct{}), + connStatusOkCh: make(chan struct{}), + connStatusFailCh: make(chan struct{}), + readTimeOut: time.Time{}, + writeTimeOut: time.Time{}, + sendMsgCh: sendMsgCh, + sendStatusCh: sendStatusCh, + connId: connId, + isClose: false, + mux: mux, + } +} + +func (s *conn) Read(buf []byte) (int, error) { + if s.isClose { + return 0, errors.New("the conn has closed") + } + var b []byte + if t := s.readTimeOut.Sub(time.Now()); t > 0 { + timer := time.NewTimer(t) + select { + case <-timer.C: + s.Close() + return 0, errors.New("read timeout") + case b = <-s.readMsgCh: + } + } else { + b = <-s.readMsgCh + } + defer pool.PutBufPoolCopy(b) + if s.isClose { + return 0, io.EOF + } + s.sendStatusCh <- s.connId + return copy(buf, b), nil +} + +func (s *conn) Write(buf []byte) (int, error) { + if s.isClose { + return 0, errors.New("the conn has closed") + } + + if t := s.writeTimeOut.Sub(time.Now()); t > 0 { + timer := time.NewTimer(t) + select { + case <-timer.C: + s.Close() + return 0, errors.New("write timeout") + case s.sendMsgCh <- NewMsg(s.connId, buf): + } + } else { + s.sendMsgCh <- NewMsg(s.connId, buf) + } + + if t := s.writeTimeOut.Sub(time.Now()); t > 0 { + timer := time.NewTimer(t) + select { + case <-timer.C: + s.Close() + return 0, errors.New("write timeout") + case <-s.getStatusCh: + } + } else { + <-s.getStatusCh + } + + if s.isClose { + return 0, io.EOF + } + return len(buf), nil +} + +func (s *conn) Close() error { + if s.isClose { + return errors.New("the conn has closed") + } + s.isClose = true + close(s.getStatusCh) + close(s.readMsgCh) + close(s.connStatusOkCh) + close(s.connStatusFailCh) + s.sendMsgCh <- NewMsg(s.connId, nil) + return nil +} + +func (s *conn) LocalAddr() net.Addr { + return s.mux.conn.LocalAddr() +} + +func (s *conn) RemoteAddr() net.Addr { + return s.mux.conn.RemoteAddr() +} + +func (s *conn) SetDeadline(t time.Time) error { + s.readTimeOut = t + s.writeTimeOut = t + return nil +} + +func (s *conn) SetReadDeadline(t time.Time) error { + s.readTimeOut = t + return nil +} + +func (s *conn) SetWriteDeadline(t time.Time) error { + s.writeTimeOut = t + return nil +} diff --git a/lib/mux/map.go b/lib/mux/map.go new file mode 100644 index 0000000..69b7304 --- /dev/null +++ b/lib/mux/map.go @@ -0,0 +1,64 @@ +package mux + +import ( + "sync" + "time" +) + +type connMap struct { + connMap map[int32]*conn + closeCh chan struct{} + sync.RWMutex +} + +func NewConnMap() *connMap { + connMap := &connMap{ + connMap: make(map[int32]*conn), + closeCh: make(chan struct{}), + } + go connMap.clean() + return connMap +} + +func (s *connMap) Get(id int32) (*conn, bool) { + s.Lock() + defer s.Unlock() + if v, ok := s.connMap[id]; ok { + return v, true + } + return nil, false +} + +func (s *connMap) Set(id int32, v *conn) { + s.Lock() + defer s.Unlock() + s.connMap[id] = v +} + +func (s *connMap) Close() { + s.Lock() + defer s.Unlock() + for _, v := range s.connMap { + v.isClose = true + } + s.closeCh <- struct{}{} +} + +func (s *connMap) clean() { + ticker := time.NewTimer(time.Minute * 1) + for { + select { + case <-ticker.C: + s.Lock() + for _, v := range s.connMap { + if v.isClose { + delete(s.connMap, v.connId) + } + } + s.Unlock() + case <-s.closeCh: + ticker.Stop() + return + } + } +} diff --git a/lib/mux/mux.go b/lib/mux/mux.go new file mode 100644 index 0000000..dd344c0 --- /dev/null +++ b/lib/mux/mux.go @@ -0,0 +1,226 @@ +package mux + +import ( + "bytes" + "encoding/binary" + "errors" + "github.com/cnlh/nps/lib/pool" + "math" + "net" + "sync" + "sync/atomic" + "time" +) + +const ( + MUX_PING_FLAG int32 = iota + MUX_NEW_CONN_OK + MUX_NEW_CONN_Fail + MUX_NEW_MSG + MUX_MSG_SEND_OK + MUX_NEW_CONN + MUX_PING + MUX_CONN_CLOSE +) + +type Mux struct { + net.Listener + conn net.Conn + connMap *connMap + sendMsgCh chan *msg //write msg chan + sendStatusCh chan int32 //write read ok chan + newConnCh chan *conn + id int32 + closeChan chan struct{} + isClose bool + sync.Mutex +} + +func NewMux(c net.Conn) *Mux { + m := &Mux{ + conn: c, + connMap: NewConnMap(), + sendMsgCh: make(chan *msg), + sendStatusCh: make(chan int32), + id: 0, + closeChan: make(chan struct{}), + newConnCh: make(chan *conn), + isClose: false, + } + //read session by flag + go m.readSession() + //write session + go m.writeSession() + //ping + go m.ping() + return m +} + +func (s *Mux) NewConn() (*conn, error) { + if s.isClose { + return nil, errors.New("the mux has closed") + } + conn := NewConn(s.getId(), s, s.sendMsgCh, s.sendStatusCh) + raw := bytes.NewBuffer([]byte{}) + if err := binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN); err != nil { + return nil, err + } + if err := binary.Write(raw, binary.LittleEndian, conn.connId); err != nil { + return nil, err + } + //it must be set before send + s.connMap.Set(conn.connId, conn) + if _, err := s.conn.Write(raw.Bytes()); err != nil { + return nil, err + } + select { + case <-conn.connStatusOkCh: + return conn, nil + case <-conn.connStatusFailCh: + } + return nil, errors.New("create connection fail,the server refused the connection") +} + +func (s *Mux) Accept() (net.Conn, error) { + if s.isClose { + return nil, errors.New("accpet error,the conn has closed") + } + return <-s.newConnCh, nil +} + +func (s *Mux) Addr() net.Addr { + return s.conn.LocalAddr() +} + +func (s *Mux) ping() { + go func() { + ticker := time.NewTicker(time.Second * 5) + raw := bytes.NewBuffer([]byte{}) + for { + select { + case <-ticker.C: + } + //Avoid going beyond the scope + if (math.MaxInt32 - s.id) < 10000 { + s.id = 0 + } + raw.Reset() + binary.Write(raw, binary.LittleEndian, MUX_PING_FLAG) + binary.Write(raw, binary.LittleEndian, MUX_PING) + if _, err := s.conn.Write(raw.Bytes()); err != nil { + s.Close() + break + } + } + }() + select { + case <-s.closeChan: + } +} + +func (s *Mux) writeSession() { + go func() { + raw := bytes.NewBuffer([]byte{}) + for { + raw.Reset() + select { + case msg := <-s.sendMsgCh: + if msg.content == nil { //close + binary.Write(raw, binary.LittleEndian, MUX_CONN_CLOSE) + binary.Write(raw, binary.LittleEndian, msg.connId) + break + } + binary.Write(raw, binary.LittleEndian, MUX_NEW_MSG) + binary.Write(raw, binary.LittleEndian, msg.connId) + binary.Write(raw, binary.LittleEndian, int32(len(msg.content))) + binary.Write(raw, binary.LittleEndian, msg.content) + case connId := <-s.sendStatusCh: + binary.Write(raw, binary.LittleEndian, MUX_MSG_SEND_OK) + binary.Write(raw, binary.LittleEndian, connId) + } + if _, err := s.conn.Write(raw.Bytes()); err != nil { + s.Close() + break + } + } + }() + select { + case <-s.closeChan: + } +} + +func (s *Mux) readSession() { + go func() { + raw := bytes.NewBuffer([]byte{}) + for { + var flag, i int32 + if binary.Read(s.conn, binary.LittleEndian, &flag) == nil { + if binary.Read(s.conn, binary.LittleEndian, &i) != nil { + break + } + switch flag { + case MUX_NEW_CONN: //new conn + conn := NewConn(i, s, s.sendMsgCh, s.sendStatusCh) + s.connMap.Set(i, conn) //it has been set before send ok + s.newConnCh <- conn + raw.Reset() + binary.Write(raw, binary.LittleEndian, MUX_NEW_CONN_OK) + binary.Write(raw, binary.LittleEndian, i) + s.conn.Write(raw.Bytes()) + continue + case MUX_PING_FLAG: //ping + continue + } + if conn, ok := s.connMap.Get(i); ok { + switch flag { + case MUX_NEW_MSG: //new msg from remote conn + buf := pool.BufPoolCopy.Get().([]byte) + if n, err := ReadLenBytes(buf, s.conn); err == nil { + if !conn.isClose { + conn.readMsgCh <- buf[:n] + } else { + pool.PutBufPoolCopy(buf) + } + } else { //read len bytes error,the mux has broken + break + } + case MUX_MSG_SEND_OK: //the remote has read + conn.getStatusCh <- struct{}{} + case MUX_NEW_CONN_OK: //conn ok + conn.connStatusOkCh <- struct{}{} + case MUX_NEW_CONN_Fail: + conn.connStatusFailCh <- struct{}{} + case MUX_CONN_CLOSE: //close the connection + conn.Close() + } + } + } else { + break + } + } + s.Close() + }() + select { + case <-s.closeChan: + } +} + +func (s *Mux) Close() error { + if s.isClose { + return errors.New("the mux has closed") + } + s.isClose = true + s.connMap.Close() + s.closeChan <- struct{}{} + s.closeChan <- struct{}{} + s.closeChan <- struct{}{} + close(s.closeChan) + close(s.sendMsgCh) + close(s.sendStatusCh) + return s.conn.Close() +} + +//get new connId as unique flag +func (s *Mux) getId() int32 { + return atomic.AddInt32(&s.id, 1) +} diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go new file mode 100644 index 0000000..038cf8a --- /dev/null +++ b/lib/mux/mux_test.go @@ -0,0 +1,96 @@ +package mux + +import ( + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" + "log" + "net" + "net/http" + _ "net/http/pprof" + "testing" + "time" +) + +var conn1 net.Conn +var conn2 net.Conn + +func TestNewMux(t *testing.T) { + go func() { + http.ListenAndServe("0.0.0.0:8899", nil) + }() + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + server() + client() + time.Sleep(time.Second * 3) + go func() { + m2 := NewMux(conn2) + for { + c, err := m2.Accept() + if err != nil { + log.Fatalln(err) + } + go func(c net.Conn) { + c2, err := net.Dial("tcp", "127.0.0.1:8080") + if err != nil { + log.Fatalln(err) + } + go common.CopyBuffer(c2, c) + common.CopyBuffer(c, c2) + c.Close() + c2.Close() + }(c) + } + }() + + go func() { + m1 := NewMux(conn1) + l, err := net.Listen("tcp", "127.0.0.1:7777") + if err != nil { + log.Fatalln(err) + } + for { + conn, err := l.Accept() + if err != nil { + log.Fatalln(err) + } + go func(conn net.Conn) { + tmpCpnn, err := m1.NewConn() + if err != nil { + log.Fatalln(err) + } + go common.CopyBuffer(tmpCpnn, conn) + common.CopyBuffer(conn, tmpCpnn) + conn.Close() + tmpCpnn.Close() + }(conn) + } + }() + + for { + time.Sleep(time.Second * 5) + } +} + +func server() { + var err error + l, err := net.Listen("tcp", "127.0.0.1:9999") + if err != nil { + log.Fatalln(err) + } + go func() { + conn1, err = l.Accept() + if err != nil { + log.Fatalln(err) + } + }() + return +} + +func client() { + var err error + conn2, err = net.Dial("tcp", "127.0.0.1:9999") + if err != nil { + log.Fatalln(err) + } +} diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 6fbb77d..1f58217 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -42,6 +42,7 @@ func PutBufPoolUdp(buf []byte) { } } + func PutBufPoolCopy(buf []byte) { if cap(buf) == PoolSizeCopy { BufPoolCopy.Put(buf[:PoolSizeCopy]) diff --git a/server/proxy/base.go b/server/proxy/base.go index 6660777..94a631f 100644 --- a/server/proxy/base.go +++ b/server/proxy/base.go @@ -6,7 +6,6 @@ import ( "github.com/cnlh/nps/lib/common" "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/file" - "github.com/cnlh/nps/lib/pool" "net" "net/http" "sync" @@ -58,27 +57,11 @@ func (s *BaseServer) linkCopy(link *conn.Link, c *conn.Conn, rb []byte, tunnel * flow.Add(len(rb), 0) <-link.StatusCh } - - buf := pool.BufPoolCopy.Get().([]byte) - for { - if err := s.checkFlow(); err != nil { - c.Close() - break - } - if n, err := c.Read(buf); err != nil { - tunnel.SendMsg([]byte(common.IO_EOF), link) - break - } else { - if _, err := tunnel.SendMsg(buf[:n], link); err != nil { - c.Close() - break - } - flow.Add(n, 0) - } - <-link.StatusCh + if err := s.checkFlow(); err != nil { + c.Close() } + link.RunRead(tunnel) s.task.Client.AddConn() - pool.PutBufPoolCopy(buf) } func (s *BaseServer) writeConnFail(c net.Conn) { @@ -111,7 +94,7 @@ func (s *BaseServer) DealClient(c *conn.Conn, addr string, rb []byte) error { c.Close() return err } else { - link.Run(true) + link.RunWrite() s.linkCopy(link, c, rb, tunnel, s.task.Flow) } return nil diff --git a/server/proxy/http.go b/server/proxy/http.go index 6b0cf8f..42e2291 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -150,7 +150,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { logs.Notice(err) break } - lk.Run(true) + lk.RunWrite() isConn = false } else { r, err = http.ReadRequest(bufio.NewReader(c)) diff --git a/server/proxy/p2p.go b/server/proxy/p2p.go new file mode 100644 index 0000000..6dc0559 --- /dev/null +++ b/server/proxy/p2p.go @@ -0,0 +1,104 @@ +package proxy + +import ( + "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/conn" + "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" + "github.com/cnlh/nps/vender/github.com/xtaci/kcp" + "strconv" + "time" +) + +type P2PServer struct { + BaseServer + p2pPort int + p2p map[string]*p2p +} + +type p2p struct { + provider *conn.Conn + visitor *conn.Conn + visitorAddr string + providerAddr string +} + +func NewP2PServer(p2pPort int) *P2PServer { + return &P2PServer{ + p2pPort: p2pPort, + p2p: make(map[string]*p2p), + } +} + +func (s *P2PServer) Start() error { + kcpListener, err := kcp.ListenWithOptions(":"+strconv.Itoa(s.p2pPort), nil, 150, 3) + if err != nil { + logs.Error(err) + return err + } + for { + c, err := kcpListener.AcceptKCP() + conn.SetUdpSession(c) + if err != nil { + logs.Warn(err) + continue + } + go s.p2pProcess(conn.NewConn(c)) + } + return nil +} + +func (s *P2PServer) p2pProcess(c *conn.Conn) { + logs.Warn("new link", c.Conn.RemoteAddr()) + //获取密钥 + var ( + f string + b []byte + err error + v *p2p + ok bool + ) + if b, err = c.ReadLen(32); err != nil { + return + } + //获取角色 + if f, err = c.ReadFlag(); err != nil { + return + } + logs.Warn("收到", string(b), f) + if v, ok = s.p2p[string(b)]; !ok { + v = new(p2p) + s.p2p[string(b)] = v + } + logs.Warn(f, c.Conn.RemoteAddr().String()) + //存储 + if f == common.WORK_P2P_VISITOR { + v.visitorAddr = c.Conn.RemoteAddr().String() + v.visitor = c + for { + time.Sleep(time.Second) + if v.provider != nil { + break + } + } + logs.Warn("等待确认") + if _, err := v.provider.ReadFlag(); err == nil { + v.visitor.WriteLenContent([]byte(v.providerAddr)) + logs.Warn("收到确认") + delete(s.p2p, string(b)) + } else { + logs.Warn("收到确认失败", err) + } + } else { + v.providerAddr = c.Conn.RemoteAddr().String() + v.provider = c + for { + time.Sleep(time.Second) + if v.visitor != nil { + v.provider.WriteLenContent([]byte(v.visitorAddr)) + break + } + } + } + //假设是连接者、等待对应的被连接者连上后,发送被连接者信息 + //假设是被连接者,等待对应的连接者脸上后,发送连接者信息 +} diff --git a/server/proxy/socks5.go b/server/proxy/socks5.go index 8e01fb3..1d77883 100755 --- a/server/proxy/socks5.go +++ b/server/proxy/socks5.go @@ -148,7 +148,7 @@ func (s *Sock5ModeServer) doConnect(c net.Conn, command uint8) { return } else { s.sendReply(c, succeeded) - link.Run(true) + link.RunWrite() s.linkCopy(link, conn.NewConn(c), nil, tunnel, s.task.Flow) } return diff --git a/server/proxy/udp.go b/server/proxy/udp.go index 42efae7..9716e1e 100755 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -58,7 +58,7 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) { s.task.Flow.Add(len(data), 0) tunnel.SendMsg(data, link) pool.PutBufPoolUdp(data) - link.Run(true) + link.RunWrite() } } diff --git a/server/server.go b/server/server.go index ae4d2b7..4c05ec0 100644 --- a/server/server.go +++ b/server/server.go @@ -48,7 +48,7 @@ func DealBridgeTask() { file.GetCsvDb().DelClient(id) case s := <-Bridge.SecretChan: logs.Trace("New secret connection, addr", s.Conn.Conn.RemoteAddr()) - if t := file.GetCsvDb().GetSecretTask(s.Password); t != nil { + if t := file.GetCsvDb().GetTaskByMd5Password(s.Password); t != nil { if !t.Client.GetConn() { logs.Info("Connections exceed the current client %d limit", t.Client.Id) s.Conn.Close() @@ -75,6 +75,9 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) { } else { logs.Info("Server startup, the bridge type is %s, the bridge port is %d", bridgeType, bridgePort) } + if p, err := beego.AppConfig.Int("p2pPort"); err == nil { + go proxy.NewP2PServer(p).Start() + } go DealBridgeTask() if svr := NewMode(Bridge, cnf); svr != nil { if err := svr.Start(); err != nil {