From f78e81b4524f3e64436f9527adf57cf5424559ed Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=88=98=E6=B2=B3?= Date: Thu, 7 Mar 2019 18:07:53 +0800 Subject: [PATCH] =?UTF-8?q?=E5=81=A5=E5=BA=B7=E6=A3=80=E6=9F=A5?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client/client.go | 6 +- client/health.go | 104 +++++++++++++++++++++++++++++++++++ cmd/npc/npc.go | 15 ++++- conf/hosts.csv | 3 +- conf/npc.conf | 13 ++++- conf/nps.conf | 9 +-- conf/tasks.csv | 2 + lib/common/util.go | 27 +++++++++ lib/config/config.go | 20 ++++++- lib/file/obj.go | 55 ++++++++++++++---- lib/mux/pmux.go | 26 +++++---- lib/sheap/heap.go | 21 +++++++ server/proxy/http.go | 12 ++-- server/proxy/tcp.go | 2 +- web/controllers/base.go | 4 +- web/views/index/add.html | 8 ++- web/views/index/edit.html | 9 +-- web/views/public/layout.html | 11 ++++ 18 files changed, 301 insertions(+), 46 deletions(-) create mode 100644 client/health.go create mode 100644 lib/sheap/heap.go diff --git a/client/client.go b/client/client.go index 5a755b0..56e9a93 100755 --- a/client/client.go +++ b/client/client.go @@ -2,6 +2,7 @@ package client import ( "github.com/cnlh/nps/lib/common" + "github.com/cnlh/nps/lib/config" "github.com/cnlh/nps/lib/conn" "github.com/cnlh/nps/lib/mux" "github.com/cnlh/nps/vender/github.com/astaxie/beego/logs" @@ -19,16 +20,18 @@ type TRPClient struct { vKey string tunnel *mux.Mux signal *conn.Conn + cnf *config.Config } //new client -func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient { +func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string, cnf *config.Config) *TRPClient { return &TRPClient{ svrAddr: svraddr, vKey: vKey, bridgeConnType: bridgeConnType, stop: make(chan bool), proxyUrl: proxyUrl, + cnf: cnf, } } @@ -55,6 +58,7 @@ func (s *TRPClient) Close() { func (s *TRPClient) processor(c *conn.Conn) { s.signal = c go s.dealChan() + go heathCheck(s.cnf, c) for { flags, err := c.ReadFlag() if err != nil { diff --git a/client/health.go b/client/health.go new file mode 100644 index 0000000..1e09376 --- /dev/null +++ b/client/health.go @@ -0,0 +1,104 @@ +package client + +import ( + "container/heap" + "github.com/cnlh/nps/lib/config" + "github.com/cnlh/nps/lib/file" + "github.com/cnlh/nps/lib/sheap" + "net" + "net/http" + "strings" + "time" +) + +func heathCheck(cnf *config.Config, c net.Conn) { + var hosts []*file.Host + var tunnels []*file.Tunnel + h := &sheap.IntHeap{} + for _, v := range cnf.Hosts { + if v.HealthMaxFail > 0 && v.HealthCheckTimeout > 0 && v.HealthCheckInterval > 0 { + v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval)) + heap.Push(h, v.HealthNextTime.Unix()) + v.HealthMap = make(map[string]int) + hosts = append(hosts, v) + } + } + for _, v := range cnf.Tasks { + if v.HealthMaxFail > 0 && v.HealthCheckTimeout > 0 && v.HealthCheckInterval > 0 { + v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval)) + heap.Push(h, v.HealthNextTime.Unix()) + v.HealthMap = make(map[string]int) + tunnels = append(tunnels, v) + } + } + if len(hosts) == 0 && len(tunnels) == 0 { + return + } + for { + rs := heap.Pop(h).(int64) - time.Now().Unix() + if rs < 0 { + continue + } + timer := time.NewTicker(time.Duration(rs)) + select { + case <-timer.C: + for _, v := range hosts { + if v.HealthNextTime.Before(time.Now()) { + v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval)) + //check + go checkHttp(v, c) + //reset time + heap.Push(h, v.HealthNextTime.Unix()) + } + } + for _, v := range tunnels { + if v.HealthNextTime.Before(time.Now()) { + v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval)) + //check + go checkTcp(v, c) + //reset time + heap.Push(h, v.HealthNextTime.Unix()) + } + } + } + } +} + +func checkTcp(t *file.Tunnel, c net.Conn) { + arr := strings.Split(t.Target, "\n") + for _, v := range arr { + if _, err := net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)); err != nil { + t.HealthMap[v] += 1 + } + if t.HealthMap[v] > t.HealthMaxFail { + t.HealthMap[v] += 1 + if t.HealthMap[v] == t.HealthMaxFail { + //send fail remove + ch <- file.NewHealthInfo("tcp", v, true) + } + } else if t.HealthMap[v] >= t.HealthMaxFail { + //send recovery add + ch <- file.NewHealthInfo("tcp", v, false) + t.HealthMap[v] = 0 + } + } +} + +func checkHttp(h *file.Host, ch chan *file.HealthInfo) { + arr := strings.Split(h.Target, "\n") + client := &http.Client{} + client.Timeout = time.Duration(h.HealthCheckTimeout) * time.Second + for _, v := range arr { + if _, err := client.Get(v + h.HttpHealthUrl); err != nil { + h.HealthMap[v] += 1 + if h.HealthMap[v] == h.HealthMaxFail { + //send fail remove + ch <- file.NewHealthInfo("http", v, true) + } + } else if h.HealthMap[v] >= h.HealthMaxFail { + //send recovery add + h.HealthMap[v] = 0 + ch <- file.NewHealthInfo("http", v, false) + } + } +} diff --git a/cmd/npc/npc.go b/cmd/npc/npc.go index b7d4e30..7131694 100644 --- a/cmd/npc/npc.go +++ b/cmd/npc/npc.go @@ -13,7 +13,7 @@ import ( var ( serverAddr = flag.String("server", "", "Server addr (ip:port)") - configPath = flag.String("config", "npc.conf", "Configuration file path") + configPath = flag.String("config", "", "Configuration file path") verifyKey = flag.String("vkey", "", "Authentication key") logType = flag.String("log", "stdout", "Log output mode(stdout|file)") connType = flag.String("type", "tcp", "Connection type with the server(kcp|tcp)") @@ -21,6 +21,7 @@ var ( logLevel = flag.String("log_level", "7", "log level 0~7") registerTime = flag.Int("time", 2, "register time long /h") ) + func main() { flag.Parse() if len(os.Args) > 2 { @@ -41,13 +42,23 @@ func main() { } else { logs.SetLogger(logs.AdapterFile, `{"level":`+*logLevel+`,"filename":"npc_log.log"}`) } - if *verifyKey != "" && *serverAddr != "" { + env := common.GetEnvMap() + if *serverAddr == "" { + *serverAddr, _ = env["NPS_SERVER_ADDR"] + } + if *verifyKey == "" { + *verifyKey, _ = env["NPS_SERVER_VKEY"] + } + if *verifyKey != "" && *serverAddr != "" && *configPath == "" { for { client.NewRPClient(*serverAddr, *verifyKey, *connType, *proxyUrl).Start() logs.Info("It will be reconnected in five seconds") time.Sleep(time.Second * 5) } } else { + if *configPath == "" { + *configPath = "npc.conf" + } client.StartFromFile(*configPath) } } diff --git a/conf/hosts.csv b/conf/hosts.csv index c56b455..c091bd7 100644 --- a/conf/hosts.csv +++ b/conf/hosts.csv @@ -1 +1,2 @@ -a.o.com,127.0.0.1:8082,2,,,111,/,3,5290945,30260,all +a.o.com,127.0.0.1:8082,2,,,111,/,3,5290945,32285,http +a.o.com,127.0.0.1:8080,2,,,,/,4,0,0,https diff --git a/conf/npc.conf b/conf/npc.conf index ac61982..ca98a9c 100644 --- a/conf/npc.conf +++ b/conf/npc.conf @@ -1,16 +1,25 @@ [common] -server=127.0.0.1:8024 +server={{.NPS_SERVER_ADDR}} tp=tcp -vkey=123 +vkey={{.NPS_SERVER_VKEY}} auto_reconnection=true [web] host=b.o.com target=127.0.0.1:8082 +health_check_timeout = 3 +health_check_max_failed = 3 +health_check_interval = 10 +health_http_url=/ + [tcp] mode=tcp target=8006-8010,8012 port=9006-9010,9012 targetAddr=123.206.77.88 +health_check_timeout = 3 +health_check_max_failed = 3 +health_check_interval = 10 +health_http_url=/ [socks5] mode=socks5 diff --git a/conf/nps.conf b/conf/nps.conf index 1422445..290b993 100755 --- a/conf/nps.conf +++ b/conf/nps.conf @@ -2,13 +2,11 @@ appname = nps #Boot mode(dev|pro) runmode = dev -#Web API unauthenticated IP address -#auth_key=test -#auth_crypt_key =1234567812345678 + #HTTP(S) proxy port, no startup if empty http_proxy_port=80 -https_proxy_port=445 +https_proxy_port=443 #certFile absolute path pem_path=conf/server.pem #KeyFile absolute path @@ -43,5 +41,8 @@ web_username=admin web_password=123 web_port = 8080 web_ip=0.0.0.0 +#Web API unauthenticated IP address +auth_key=test +auth_crypt_key =1234567812345678 #allow_ports=9001-9009,10001,11000-12000 \ No newline at end of file diff --git a/conf/tasks.csv b/conf/tasks.csv index a06ff67..fbe8834 100644 --- a/conf/tasks.csv +++ b/conf/tasks.csv @@ -1,2 +1,4 @@ 8025,socks5,,1,1,2,,0,0, 8026,httpProxy,,1,2,2,,0,0, +8001,tcp,"127.0.0.1:8080 +127.0.0.1:8082",1,3,5,,0,0, diff --git a/lib/common/util.go b/lib/common/util.go index a47cd45..31c94e3 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -6,6 +6,7 @@ import ( "encoding/binary" "github.com/cnlh/nps/lib/crypt" "github.com/cnlh/nps/lib/pool" + "html/template" "io" "io/ioutil" "net" @@ -276,3 +277,29 @@ func GetLocalUdpAddr() (net.Conn, error) { } return tmpConn, tmpConn.Close() } + +func ParseStr(str string) (string, error) { + tmp := template.New("npc") + var err error + w := new(bytes.Buffer) + if tmp, err = tmp.Parse(str); err != nil { + return "", err + } + if err = tmp.Execute(w, GetEnvMap()); err != nil { + return "", err + } + return w.String(), nil +} + +//get env +func GetEnvMap() map[string]string { + m := make(map[string]string) + environ := os.Environ() + for i := range environ { + tmp := strings.Split(environ[i], "=") + if len(tmp) == 2 { + m[tmp[0]] = tmp[1] + } + } + return m +} diff --git a/lib/config/config.go b/lib/config/config.go index d54d011..b75b1d5 100644 --- a/lib/config/config.go +++ b/lib/config/config.go @@ -38,7 +38,9 @@ func NewConfig(path string) (c *Config, err error) { if b, err = common.ReadAllFromFile(path); err != nil { return } else { - c.content = string(b) + if c.content, err = common.ParseStr(string(b)); err != nil { + return nil, err + } if c.title, err = getAllTitle(c.content); err != nil { return } @@ -153,6 +155,14 @@ func dealHost(s string) *file.Host { h.HostChange = item[1] case "location": h.Location = item[1] + case "health_check_timeout": + h.HealthCheckTimeout = common.GetIntNoErrByStr(item[1]) + case "health_check_max_failed": + h.HealthMaxFail = common.GetIntNoErrByStr(item[1]) + case "health_check_interval": + h.HealthCheckInterval = common.GetIntNoErrByStr(item[1]) + case "health_http_url": + h.HttpHealthUrl = item[1] default: if strings.Contains(item[0], "header") { headerChange += strings.Replace(item[0], "header_", "", -1) + ":" + item[1] + "\n" @@ -178,7 +188,7 @@ func dealTunnel(s string) *file.Tunnel { case "mode": t.Mode = item[1] case "target": - t.Target = item[1] + t.Target = strings.Replace(item[1], ",", "\n", -1) case "targetAddr": t.TargetAddr = item[1] case "password": @@ -187,6 +197,12 @@ func dealTunnel(s string) *file.Tunnel { t.LocalPath = item[1] case "strip_pre": t.StripPre = item[1] + case "health_check_timeout": + t.HealthCheckTimeout = common.GetIntNoErrByStr(item[1]) + case "health_check_max_failed": + t.HealthMaxFail = common.GetIntNoErrByStr(item[1]) + case "health_check_interval": + t.HealthCheckInterval = common.GetIntNoErrByStr(item[1]) } } return t diff --git a/lib/file/obj.go b/lib/file/obj.go index d6154bb..80a6198 100644 --- a/lib/file/obj.go +++ b/lib/file/obj.go @@ -4,6 +4,7 @@ import ( "github.com/cnlh/nps/lib/rate" "strings" "sync" + "time" ) type Flow struct { @@ -96,14 +97,15 @@ func (s *Client) HasHost(h *Host) bool { } type Tunnel struct { - Id int //Id - Port int //服务端监听端口 - Mode string //启动方式 - Target string //目标 - Status bool //设置是否开启 - RunStatus bool //当前运行状态 - Client *Client //所属客户端id - Ports string //客户端与服务端传递 + Id int //Id + Port int //服务端监听端口 + Mode string //启动方式 + Target string //目标 + TargetArr []string //目标 + Status bool //设置是否开启 + RunStatus bool //当前运行状态 + Client *Client //所属客户端id + Ports string //客户端与服务端传递 Flow *Flow Password string //私密模式密码,唯一 Remark string //备注 @@ -111,6 +113,35 @@ type Tunnel struct { NoStore bool LocalPath string StripPre string + NowIndex int + Health + sync.RWMutex +} + +type Health struct { + HealthCheckTimeout int + HealthMaxFail int + HealthCheckInterval int + HealthNextTime time.Time + HealthMap map[string]int + HttpHealthUrl string + HealthRemoveArr []string +} + +func (s *Tunnel) GetRandomTarget() string { + if s.TargetArr == nil { + s.TargetArr = strings.Split(s.Target, "\n") + } + if len(s.TargetArr) == 1 { + return s.TargetArr[0] + } + s.Lock() + defer s.Unlock() + if s.NowIndex >= len(s.TargetArr)-1 { + s.NowIndex = -1 + } + s.NowIndex++ + return s.TargetArr[s.NowIndex] } type Config struct { @@ -133,7 +164,8 @@ type Host struct { NowIndex int TargetArr []string NoStore bool - Scheme string //http https all + Scheme string //http https all + Health sync.RWMutex } @@ -141,10 +173,13 @@ func (s *Host) GetRandomTarget() string { if s.TargetArr == nil { s.TargetArr = strings.Split(s.Target, "\n") } + if len(s.TargetArr) == 1 { + return s.TargetArr[0] + } s.Lock() defer s.Unlock() if s.NowIndex >= len(s.TargetArr)-1 { - s.NowIndex = 0 + s.NowIndex = -1 } else { s.NowIndex++ } diff --git a/lib/mux/pmux.go b/lib/mux/pmux.go index ff4f5d4..340497e 100644 --- a/lib/mux/pmux.go +++ b/lib/mux/pmux.go @@ -11,18 +11,20 @@ import ( "net" "strconv" "strings" + "time" ) const ( - HTTP_GET = 716984 - HTTP_POST = 807983 - HTTP_HEAD = 726965 - HTTP_PUT = 808585 - HTTP_DELETE = 686976 - HTTP_CONNECT = 677978 - HTTP_OPTIONS = 798084 - HTTP_TRACE = 848265 - CLIENT = 848384 + HTTP_GET = 716984 + HTTP_POST = 807983 + HTTP_HEAD = 726965 + HTTP_PUT = 808585 + HTTP_DELETE = 686976 + HTTP_CONNECT = 677978 + HTTP_OPTIONS = 798084 + HTTP_TRACE = 848265 + CLIENT = 848384 + ACCEPT_TIME_OUT = 10 ) type PortMux struct { @@ -122,7 +124,11 @@ func (pMux *PortMux) process(conn net.Conn) { if len(rs) == 0 { rs = buf } - ch <- newPortConn(conn, rs) + timer := time.NewTimer(ACCEPT_TIME_OUT) + select { + case <-timer.C: + case ch <- newPortConn(conn, rs): + } } func (pMux *PortMux) Close() error { diff --git a/lib/sheap/heap.go b/lib/sheap/heap.go new file mode 100644 index 0000000..c892c39 --- /dev/null +++ b/lib/sheap/heap.go @@ -0,0 +1,21 @@ +package sheap + +type IntHeap []int64 + +func (h IntHeap) Len() int { return len(h) } +func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *IntHeap) Push(x interface{}) { + // Push and Pop use pointer receivers because they modify the slice's length, + // not just its contents. + *h = append(*h, x.(int64)) +} + +func (h *IntHeap) Pop() interface{} { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/server/proxy/http.go b/server/proxy/http.go index d6049ab..54be9ca 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -159,7 +159,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { logs.Warn("auth error", err, r.RemoteAddr) break } - lk := conn.NewLink(common.CONN_TCP, host.Target, host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr) + lk := conn.NewLink(common.CONN_TCP, host.GetRandomTarget(), host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr) if target, err = s.bridge.SendLinkInfo(host.Client.Id, lk, c.Conn.RemoteAddr().String(), nil); err != nil { logs.Notice("connect to target %s error %s", lk.Host, err) break @@ -174,10 +174,10 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { }() } else { r, err = http.ReadRequest(bufio.NewReader(c)) - r.URL.Scheme = scheme if err != nil { break } + r.URL.Scheme = scheme //What happened ,Why one character less??? if r.Method == "ET" { r.Method = "GET" @@ -190,10 +190,14 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI) break } else if host != lastHost { + host.Client.AddConn() + if !hostTmp.Client.GetConn() { + break + } host = hostTmp lastHost = host isConn = true - host.Client.AddConn() + goto start } } @@ -204,7 +208,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) { break } host.Flow.Add(int64(len(b)), 0) - logs.Trace("http(s) request, method %s, host %s, url %s, remote address %s, target %s", r.Method, r.Host, r.RequestURI, r.RemoteAddr, host.Target) + logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.RequestURI, r.RemoteAddr, host.Target) //write connClient.Write(b) } diff --git a/server/proxy/tcp.go b/server/proxy/tcp.go index eeb4b55..8efb074 100755 --- a/server/proxy/tcp.go +++ b/server/proxy/tcp.go @@ -109,7 +109,7 @@ type process func(c *conn.Conn, s *TunnelModeServer) error //tcp隧道模式 func ProcessTunnel(c *conn.Conn, s *TunnelModeServer) error { - return s.DealClient(c, s.task.Target, nil, common.CONN_TCP) + return s.DealClient(c, s.task.GetRandomTarget(), nil, common.CONN_TCP) } //http代理模式 diff --git a/web/controllers/base.go b/web/controllers/base.go index 91be786..a52ae28 100755 --- a/web/controllers/base.go +++ b/web/controllers/base.go @@ -26,8 +26,8 @@ func (s *BaseController) Prepare() { // param 2 is timestamp (It's limited to 20 seconds.) md5Key := s.GetString("auth_key") timestamp := s.GetIntNoErr("timestamp") - configKey := beego.AppConfig.String("authKey") - if !(time.Now().Unix()-int64(timestamp) < 20 && time.Now().Unix()-int64(timestamp) > 0 && crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key) { + configKey := beego.AppConfig.String("auth_key") + if !(time.Now().Unix()-int64(timestamp) <= 20 && time.Now().Unix()-int64(timestamp) >= 0 && crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key) { if s.GetSession("auth") != true { s.Redirect("/login/index", 302) } diff --git a/web/views/index/add.html b/web/views/index/add.html index 63a1d39..693c286 100755 --- a/web/views/index/add.html +++ b/web/views/index/add.html @@ -38,9 +38,11 @@
- - can only fill in ports if it is local machine proxy + + can only fill in ports if it is local machine proxy, only tcp supports load balancing + +
diff --git a/web/views/index/edit.html b/web/views/index/edit.html index a11c33e..0091d61 100755 --- a/web/views/index/edit.html +++ b/web/views/index/edit.html @@ -39,10 +39,11 @@
- - can only fill in ports if it is local machine proxy -
+ + can only fill in ports if it is local machine proxy, only tcp supports load balancing + +
diff --git a/web/views/public/layout.html b/web/views/public/layout.html index 2c2661a..8156cda 100755 --- a/web/views/public/layout.html +++ b/web/views/public/layout.html @@ -120,4 +120,15 @@ + +{{/**/}} +