健康检查

pull/122/head
刘河 2019-03-07 18:07:53 +08:00
parent f81fb7760e
commit f78e81b452
18 changed files with 301 additions and 46 deletions

View File

@ -2,6 +2,7 @@ package client
import (
"github.com/cnlh/nps/lib/common"
"github.com/cnlh/nps/lib/config"
"github.com/cnlh/nps/lib/conn"
"github.com/cnlh/nps/lib/mux"
"github.com/cnlh/nps/vender/github.com/astaxie/beego/logs"
@ -19,16 +20,18 @@ type TRPClient struct {
vKey string
tunnel *mux.Mux
signal *conn.Conn
cnf *config.Config
}
//new client
func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string) *TRPClient {
func NewRPClient(svraddr string, vKey string, bridgeConnType string, proxyUrl string, cnf *config.Config) *TRPClient {
return &TRPClient{
svrAddr: svraddr,
vKey: vKey,
bridgeConnType: bridgeConnType,
stop: make(chan bool),
proxyUrl: proxyUrl,
cnf: cnf,
}
}
@ -55,6 +58,7 @@ func (s *TRPClient) Close() {
func (s *TRPClient) processor(c *conn.Conn) {
s.signal = c
go s.dealChan()
go heathCheck(s.cnf, c)
for {
flags, err := c.ReadFlag()
if err != nil {

104
client/health.go Normal file
View File

@ -0,0 +1,104 @@
package client
import (
"container/heap"
"github.com/cnlh/nps/lib/config"
"github.com/cnlh/nps/lib/file"
"github.com/cnlh/nps/lib/sheap"
"net"
"net/http"
"strings"
"time"
)
func heathCheck(cnf *config.Config, c net.Conn) {
var hosts []*file.Host
var tunnels []*file.Tunnel
h := &sheap.IntHeap{}
for _, v := range cnf.Hosts {
if v.HealthMaxFail > 0 && v.HealthCheckTimeout > 0 && v.HealthCheckInterval > 0 {
v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval))
heap.Push(h, v.HealthNextTime.Unix())
v.HealthMap = make(map[string]int)
hosts = append(hosts, v)
}
}
for _, v := range cnf.Tasks {
if v.HealthMaxFail > 0 && v.HealthCheckTimeout > 0 && v.HealthCheckInterval > 0 {
v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval))
heap.Push(h, v.HealthNextTime.Unix())
v.HealthMap = make(map[string]int)
tunnels = append(tunnels, v)
}
}
if len(hosts) == 0 && len(tunnels) == 0 {
return
}
for {
rs := heap.Pop(h).(int64) - time.Now().Unix()
if rs < 0 {
continue
}
timer := time.NewTicker(time.Duration(rs))
select {
case <-timer.C:
for _, v := range hosts {
if v.HealthNextTime.Before(time.Now()) {
v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval))
//check
go checkHttp(v, c)
//reset time
heap.Push(h, v.HealthNextTime.Unix())
}
}
for _, v := range tunnels {
if v.HealthNextTime.Before(time.Now()) {
v.HealthNextTime = time.Now().Add(time.Duration(v.HealthCheckInterval))
//check
go checkTcp(v, c)
//reset time
heap.Push(h, v.HealthNextTime.Unix())
}
}
}
}
}
func checkTcp(t *file.Tunnel, c net.Conn) {
arr := strings.Split(t.Target, "\n")
for _, v := range arr {
if _, err := net.DialTimeout("tcp", v, time.Duration(t.HealthCheckTimeout)); err != nil {
t.HealthMap[v] += 1
}
if t.HealthMap[v] > t.HealthMaxFail {
t.HealthMap[v] += 1
if t.HealthMap[v] == t.HealthMaxFail {
//send fail remove
ch <- file.NewHealthInfo("tcp", v, true)
}
} else if t.HealthMap[v] >= t.HealthMaxFail {
//send recovery add
ch <- file.NewHealthInfo("tcp", v, false)
t.HealthMap[v] = 0
}
}
}
func checkHttp(h *file.Host, ch chan *file.HealthInfo) {
arr := strings.Split(h.Target, "\n")
client := &http.Client{}
client.Timeout = time.Duration(h.HealthCheckTimeout) * time.Second
for _, v := range arr {
if _, err := client.Get(v + h.HttpHealthUrl); err != nil {
h.HealthMap[v] += 1
if h.HealthMap[v] == h.HealthMaxFail {
//send fail remove
ch <- file.NewHealthInfo("http", v, true)
}
} else if h.HealthMap[v] >= h.HealthMaxFail {
//send recovery add
h.HealthMap[v] = 0
ch <- file.NewHealthInfo("http", v, false)
}
}
}

View File

@ -13,7 +13,7 @@ import (
var (
serverAddr = flag.String("server", "", "Server addr (ip:port)")
configPath = flag.String("config", "npc.conf", "Configuration file path")
configPath = flag.String("config", "", "Configuration file path")
verifyKey = flag.String("vkey", "", "Authentication key")
logType = flag.String("log", "stdout", "Log output modestdout|file")
connType = flag.String("type", "tcp", "Connection type with the serverkcp|tcp")
@ -21,6 +21,7 @@ var (
logLevel = flag.String("log_level", "7", "log level 0~7")
registerTime = flag.Int("time", 2, "register time long /h")
)
func main() {
flag.Parse()
if len(os.Args) > 2 {
@ -41,13 +42,23 @@ func main() {
} else {
logs.SetLogger(logs.AdapterFile, `{"level":`+*logLevel+`,"filename":"npc_log.log"}`)
}
if *verifyKey != "" && *serverAddr != "" {
env := common.GetEnvMap()
if *serverAddr == "" {
*serverAddr, _ = env["NPS_SERVER_ADDR"]
}
if *verifyKey == "" {
*verifyKey, _ = env["NPS_SERVER_VKEY"]
}
if *verifyKey != "" && *serverAddr != "" && *configPath == "" {
for {
client.NewRPClient(*serverAddr, *verifyKey, *connType, *proxyUrl).Start()
logs.Info("It will be reconnected in five seconds")
time.Sleep(time.Second * 5)
}
} else {
if *configPath == "" {
*configPath = "npc.conf"
}
client.StartFromFile(*configPath)
}
}

View File

@ -1 +1,2 @@
a.o.com,127.0.0.1:8082,2,,,111,/,3,5290945,30260,all
a.o.com,127.0.0.1:8082,2,,,111,/,3,5290945,32285,http
a.o.com,127.0.0.1:8080,2,,,,/,4,0,0,https

1 a.o.com 127.0.0.1:8082 2 111 / 3 5290945 30260 32285 all http
2 a.o.com 127.0.0.1:8080 2 / 4 0 0 https

View File

@ -1,16 +1,25 @@
[common]
server=127.0.0.1:8024
server={{.NPS_SERVER_ADDR}}
tp=tcp
vkey=123
vkey={{.NPS_SERVER_VKEY}}
auto_reconnection=true
[web]
host=b.o.com
target=127.0.0.1:8082
health_check_timeout = 3
health_check_max_failed = 3
health_check_interval = 10
health_http_url=/
[tcp]
mode=tcp
target=8006-8010,8012
port=9006-9010,9012
targetAddr=123.206.77.88
health_check_timeout = 3
health_check_max_failed = 3
health_check_interval = 10
health_http_url=/
[socks5]
mode=socks5

View File

@ -2,13 +2,11 @@ appname = nps
#Boot mode(dev|pro)
runmode = dev
#Web API unauthenticated IP address
#auth_key=test
#auth_crypt_key =1234567812345678
#HTTP(S) proxy port, no startup if empty
http_proxy_port=80
https_proxy_port=445
https_proxy_port=443
#certFile absolute path
pem_path=conf/server.pem
#KeyFile absolute path
@ -43,5 +41,8 @@ web_username=admin
web_password=123
web_port = 8080
web_ip=0.0.0.0
#Web API unauthenticated IP address
auth_key=test
auth_crypt_key =1234567812345678
#allow_ports=9001-9009,10001,11000-12000

View File

@ -1,2 +1,4 @@
8025,socks5,,1,1,2,,0,0,
8026,httpProxy,,1,2,2,,0,0,
8001,tcp,"127.0.0.1:8080
127.0.0.1:8082",1,3,5,,0,0,

1 8025 socks5 1 1 2 0 0
2 8026 httpProxy 1 2 2 0 0
3 8001 tcp 127.0.0.1:8080 127.0.0.1:8082 1 3 5 0 0
4

View File

@ -6,6 +6,7 @@ import (
"encoding/binary"
"github.com/cnlh/nps/lib/crypt"
"github.com/cnlh/nps/lib/pool"
"html/template"
"io"
"io/ioutil"
"net"
@ -276,3 +277,29 @@ func GetLocalUdpAddr() (net.Conn, error) {
}
return tmpConn, tmpConn.Close()
}
func ParseStr(str string) (string, error) {
tmp := template.New("npc")
var err error
w := new(bytes.Buffer)
if tmp, err = tmp.Parse(str); err != nil {
return "", err
}
if err = tmp.Execute(w, GetEnvMap()); err != nil {
return "", err
}
return w.String(), nil
}
//get env
func GetEnvMap() map[string]string {
m := make(map[string]string)
environ := os.Environ()
for i := range environ {
tmp := strings.Split(environ[i], "=")
if len(tmp) == 2 {
m[tmp[0]] = tmp[1]
}
}
return m
}

View File

@ -38,7 +38,9 @@ func NewConfig(path string) (c *Config, err error) {
if b, err = common.ReadAllFromFile(path); err != nil {
return
} else {
c.content = string(b)
if c.content, err = common.ParseStr(string(b)); err != nil {
return nil, err
}
if c.title, err = getAllTitle(c.content); err != nil {
return
}
@ -153,6 +155,14 @@ func dealHost(s string) *file.Host {
h.HostChange = item[1]
case "location":
h.Location = item[1]
case "health_check_timeout":
h.HealthCheckTimeout = common.GetIntNoErrByStr(item[1])
case "health_check_max_failed":
h.HealthMaxFail = common.GetIntNoErrByStr(item[1])
case "health_check_interval":
h.HealthCheckInterval = common.GetIntNoErrByStr(item[1])
case "health_http_url":
h.HttpHealthUrl = item[1]
default:
if strings.Contains(item[0], "header") {
headerChange += strings.Replace(item[0], "header_", "", -1) + ":" + item[1] + "\n"
@ -178,7 +188,7 @@ func dealTunnel(s string) *file.Tunnel {
case "mode":
t.Mode = item[1]
case "target":
t.Target = item[1]
t.Target = strings.Replace(item[1], ",", "\n", -1)
case "targetAddr":
t.TargetAddr = item[1]
case "password":
@ -187,6 +197,12 @@ func dealTunnel(s string) *file.Tunnel {
t.LocalPath = item[1]
case "strip_pre":
t.StripPre = item[1]
case "health_check_timeout":
t.HealthCheckTimeout = common.GetIntNoErrByStr(item[1])
case "health_check_max_failed":
t.HealthMaxFail = common.GetIntNoErrByStr(item[1])
case "health_check_interval":
t.HealthCheckInterval = common.GetIntNoErrByStr(item[1])
}
}
return t

View File

@ -4,6 +4,7 @@ import (
"github.com/cnlh/nps/lib/rate"
"strings"
"sync"
"time"
)
type Flow struct {
@ -96,14 +97,15 @@ func (s *Client) HasHost(h *Host) bool {
}
type Tunnel struct {
Id int //Id
Port int //服务端监听端口
Mode string //启动方式
Target string //目标
Status bool //设置是否开启
RunStatus bool //当前运行状态
Client *Client //所属客户端id
Ports string //客户端与服务端传递
Id int //Id
Port int //服务端监听端口
Mode string //启动方式
Target string //目标
TargetArr []string //目标
Status bool //设置是否开启
RunStatus bool //当前运行状态
Client *Client //所属客户端id
Ports string //客户端与服务端传递
Flow *Flow
Password string //私密模式密码,唯一
Remark string //备注
@ -111,6 +113,35 @@ type Tunnel struct {
NoStore bool
LocalPath string
StripPre string
NowIndex int
Health
sync.RWMutex
}
type Health struct {
HealthCheckTimeout int
HealthMaxFail int
HealthCheckInterval int
HealthNextTime time.Time
HealthMap map[string]int
HttpHealthUrl string
HealthRemoveArr []string
}
func (s *Tunnel) GetRandomTarget() string {
if s.TargetArr == nil {
s.TargetArr = strings.Split(s.Target, "\n")
}
if len(s.TargetArr) == 1 {
return s.TargetArr[0]
}
s.Lock()
defer s.Unlock()
if s.NowIndex >= len(s.TargetArr)-1 {
s.NowIndex = -1
}
s.NowIndex++
return s.TargetArr[s.NowIndex]
}
type Config struct {
@ -133,7 +164,8 @@ type Host struct {
NowIndex int
TargetArr []string
NoStore bool
Scheme string //http https all
Scheme string //http https all
Health
sync.RWMutex
}
@ -141,10 +173,13 @@ func (s *Host) GetRandomTarget() string {
if s.TargetArr == nil {
s.TargetArr = strings.Split(s.Target, "\n")
}
if len(s.TargetArr) == 1 {
return s.TargetArr[0]
}
s.Lock()
defer s.Unlock()
if s.NowIndex >= len(s.TargetArr)-1 {
s.NowIndex = 0
s.NowIndex = -1
} else {
s.NowIndex++
}

View File

@ -11,18 +11,20 @@ import (
"net"
"strconv"
"strings"
"time"
)
const (
HTTP_GET = 716984
HTTP_POST = 807983
HTTP_HEAD = 726965
HTTP_PUT = 808585
HTTP_DELETE = 686976
HTTP_CONNECT = 677978
HTTP_OPTIONS = 798084
HTTP_TRACE = 848265
CLIENT = 848384
HTTP_GET = 716984
HTTP_POST = 807983
HTTP_HEAD = 726965
HTTP_PUT = 808585
HTTP_DELETE = 686976
HTTP_CONNECT = 677978
HTTP_OPTIONS = 798084
HTTP_TRACE = 848265
CLIENT = 848384
ACCEPT_TIME_OUT = 10
)
type PortMux struct {
@ -122,7 +124,11 @@ func (pMux *PortMux) process(conn net.Conn) {
if len(rs) == 0 {
rs = buf
}
ch <- newPortConn(conn, rs)
timer := time.NewTimer(ACCEPT_TIME_OUT)
select {
case <-timer.C:
case ch <- newPortConn(conn, rs):
}
}
func (pMux *PortMux) Close() error {

21
lib/sheap/heap.go Normal file
View File

@ -0,0 +1,21 @@
package sheap
type IntHeap []int64
func (h IntHeap) Len() int { return len(h) }
func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *IntHeap) Push(x interface{}) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(int64))
}
func (h *IntHeap) Pop() interface{} {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

View File

@ -159,7 +159,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
logs.Warn("auth error", err, r.RemoteAddr)
break
}
lk := conn.NewLink(common.CONN_TCP, host.Target, host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr)
lk := conn.NewLink(common.CONN_TCP, host.GetRandomTarget(), host.Client.Cnf.Crypt, host.Client.Cnf.Compress, r.RemoteAddr)
if target, err = s.bridge.SendLinkInfo(host.Client.Id, lk, c.Conn.RemoteAddr().String(), nil); err != nil {
logs.Notice("connect to target %s error %s", lk.Host, err)
break
@ -174,10 +174,10 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
}()
} else {
r, err = http.ReadRequest(bufio.NewReader(c))
r.URL.Scheme = scheme
if err != nil {
break
}
r.URL.Scheme = scheme
//What happened Why one character less???
if r.Method == "ET" {
r.Method = "GET"
@ -190,10 +190,14 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI)
break
} else if host != lastHost {
host.Client.AddConn()
if !hostTmp.Client.GetConn() {
break
}
host = hostTmp
lastHost = host
isConn = true
host.Client.AddConn()
goto start
}
}
@ -204,7 +208,7 @@ func (s *httpServer) process(c *conn.Conn, r *http.Request) {
break
}
host.Flow.Add(int64(len(b)), 0)
logs.Trace("http(s) request, method %s, host %s, url %s, remote address %s, target %s", r.Method, r.Host, r.RequestURI, r.RemoteAddr, host.Target)
logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.RequestURI, r.RemoteAddr, host.Target)
//write
connClient.Write(b)
}

View File

@ -109,7 +109,7 @@ type process func(c *conn.Conn, s *TunnelModeServer) error
//tcp隧道模式
func ProcessTunnel(c *conn.Conn, s *TunnelModeServer) error {
return s.DealClient(c, s.task.Target, nil, common.CONN_TCP)
return s.DealClient(c, s.task.GetRandomTarget(), nil, common.CONN_TCP)
}
//http代理模式

View File

@ -26,8 +26,8 @@ func (s *BaseController) Prepare() {
// param 2 is timestamp (It's limited to 20 seconds.)
md5Key := s.GetString("auth_key")
timestamp := s.GetIntNoErr("timestamp")
configKey := beego.AppConfig.String("authKey")
if !(time.Now().Unix()-int64(timestamp) < 20 && time.Now().Unix()-int64(timestamp) > 0 && crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key) {
configKey := beego.AppConfig.String("auth_key")
if !(time.Now().Unix()-int64(timestamp) <= 20 && time.Now().Unix()-int64(timestamp) >= 0 && crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key) {
if s.GetSession("auth") != true {
s.Redirect("/login/index", 302)
}

View File

@ -38,9 +38,11 @@
<div class="form-group" id="target">
<label class="col-sm-2 control-label">target of Intranet(ip:port)</label>
<div class="col-sm-10">
<input class="form-control" type="text" name="target"
placeholder="such as 10.1.50.203:22 ">
<span class="help-block m-b-none">can only fill in ports if it is local machine proxy</span>
<textarea class="form-control" name="target" rows="4" placeholder="10.1.50.203:22
10.1.50.202:22"></textarea>
<span class="help-block m-b-none">can only fill in ports if it is local machine proxy, only tcp supports load balancing
</span>
</div>
</div>

View File

@ -39,10 +39,11 @@
<div class="form-group" id="target">
<label class="col-sm-2 control-label">target of Intranet(ip:port)</label>
<div class="col-sm-10">
<input value="{{.t.Target}}" class="form-control" type="text" name="target"
placeholder="such as 10.1.50.203:22 ">
<span class="help-block m-b-none">can only fill in ports if it is local machine proxy</span>
</div>
<textarea class="form-control" name="target" rows="4" placeholder="10.1.50.203:22
10.1.50.202:22">{{.t.Target}}</textarea>
<span class="help-block m-b-none">can only fill in ports if it is local machine proxy, only tcp supports load balancing
</span> </div>
</div>
<div class="form-group" id="client_id">

View File

@ -120,4 +120,15 @@
</body>
</html>
<script>
// googleTranslateElementInit()
//
// function googleTranslateElementInit() {
// new google.translate.TranslateElement({
// layout: google.translate.TranslateElement.InlineLayout.HORIZONTAL
// }, 'wrapper');
// }
</script>
{{/*<script src="http://translate.google.com/translate_a/element.js?cb=googleTranslateElementInit"></script>*/}}