From a4fece3f51b2082d1e3cb2a36a7ea18f39bc436b Mon Sep 17 00:00:00 2001 From: fatedier Date: Thu, 23 Mar 2017 02:01:25 +0800 Subject: [PATCH] api: add server web api for statistics --- client/process_udp.go.bak | 153 ---------------------- cmd/frps/main.go | 1 + models/config/proxy.go | 28 ++-- models/consts/consts.go | 4 +- models/metric/server.go | 221 -------------------------------- server/control.go | 18 +++ server/dashboard.go | 24 ++-- server/dashboard_api.go | 203 +++++++++++++++++++++++++---- server/manager.go | 14 ++ server/metric.go | 241 +++++++++++++++++++++++++++++++++++ server/proxy.go | 21 ++- server/service.go | 19 +++ utils/metric/counter.go | 60 +++++++++ utils/metric/date_counter.go | 157 +++++++++++++++++++++++ 14 files changed, 738 insertions(+), 426 deletions(-) delete mode 100644 client/process_udp.go.bak delete mode 100644 models/metric/server.go create mode 100644 server/metric.go create mode 100644 utils/metric/counter.go create mode 100644 utils/metric/date_counter.go diff --git a/client/process_udp.go.bak b/client/process_udp.go.bak deleted file mode 100644 index d1dbcc21..00000000 --- a/client/process_udp.go.bak +++ /dev/null @@ -1,153 +0,0 @@ -// Copyright 2016 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package client - -import ( - "fmt" - "io" - "net" - "sync" - "time" - - "github.com/fatedier/frp/src/models/msg" - "github.com/fatedier/frp/src/utils/conn" - "github.com/fatedier/frp/src/utils/pool" -) - -type UdpProcesser struct { - tcpConn *conn.Conn - closeCh chan struct{} - - localAddr string - - // cache local udp connections - // key is remoteAddr - localUdpConns map[string]*net.UDPConn - mutex sync.RWMutex - tcpConnMutex sync.RWMutex -} - -func NewUdpProcesser(c *conn.Conn, localIp string, localPort int64) *UdpProcesser { - return &UdpProcesser{ - tcpConn: c, - closeCh: make(chan struct{}), - localAddr: fmt.Sprintf("%s:%d", localIp, localPort), - localUdpConns: make(map[string]*net.UDPConn), - } -} - -func (up *UdpProcesser) UpdateTcpConn(c *conn.Conn) { - up.tcpConnMutex.Lock() - defer up.tcpConnMutex.Unlock() - up.tcpConn = c -} - -func (up *UdpProcesser) Run() { - go up.ReadLoop() -} - -func (up *UdpProcesser) ReadLoop() { - var ( - buf string - err error - ) - for { - udpPacket := &msg.UdpPacket{} - - // read udp package from frps - buf, err = up.tcpConn.ReadLine() - if err != nil { - if err == io.EOF { - return - } else { - continue - } - } - err = udpPacket.UnPack([]byte(buf)) - if err != nil { - continue - } - - // write to local udp port - sendConn, ok := up.GetUdpConn(udpPacket.SrcStr) - if !ok { - dstAddr, err := net.ResolveUDPAddr("udp", up.localAddr) - if err != nil { - continue - } - sendConn, err = net.DialUDP("udp", nil, dstAddr) - if err != nil { - continue - } - - up.SetUdpConn(udpPacket.SrcStr, sendConn) - } - - _, err = sendConn.Write(udpPacket.Content) - if err != nil { - sendConn.Close() - continue - } - - if !ok { - go up.Forward(udpPacket, sendConn) - } - } -} - -func (up *UdpProcesser) Forward(udpPacket *msg.UdpPacket, singleConn *net.UDPConn) { - addr := udpPacket.SrcStr - defer up.RemoveUdpConn(addr) - - buf := pool.GetBuf(2048) - for { - singleConn.SetReadDeadline(time.Now().Add(120 * time.Second)) - n, remoteAddr, err := singleConn.ReadFromUDP(buf) - if err != nil { - return - } - - // forward to frps - forwardPacket := msg.NewUdpPacket(buf[0:n], remoteAddr, udpPacket.Src) - up.tcpConnMutex.RLock() - err = up.tcpConn.WriteString(string(forwardPacket.Pack()) + "\n") - up.tcpConnMutex.RUnlock() - if err != nil { - return - } - } -} - -func (up *UdpProcesser) GetUdpConn(addr string) (singleConn *net.UDPConn, ok bool) { - up.mutex.RLock() - defer up.mutex.RUnlock() - singleConn, ok = up.localUdpConns[addr] - return -} - -func (up *UdpProcesser) SetUdpConn(addr string, conn *net.UDPConn) { - up.mutex.Lock() - defer up.mutex.Unlock() - up.localUdpConns[addr] = conn -} - -func (up *UdpProcesser) RemoveUdpConn(addr string) { - up.mutex.Lock() - defer up.mutex.Unlock() - if c, ok := up.localUdpConns[addr]; ok { - c.Close() - } - delete(up.localUdpConns, addr) -} diff --git a/cmd/frps/main.go b/cmd/frps/main.go index b8ebfe78..fc5d6436 100644 --- a/cmd/frps/main.go +++ b/cmd/frps/main.go @@ -113,5 +113,6 @@ func main() { if config.ServerCommonCfg.PrivilegeMode == true { log.Info("PrivilegeMode is enabled, you should pay more attention to security issues") } + server.ServerService = svr svr.Run() } diff --git a/models/config/proxy.go b/models/config/proxy.go index 4213c426..247b2b09 100644 --- a/models/config/proxy.go +++ b/models/config/proxy.go @@ -88,11 +88,11 @@ func NewProxyConfFromFile(name string, section ini.Section) (cfg ProxyConf, err // BaseProxy info type BaseProxyConf struct { - ProxyName string - ProxyType string + ProxyName string `json:"proxy_name"` + ProxyType string `json:"proxy_type"` - UseEncryption bool - UseCompression bool + UseEncryption bool `json:"use_encryption"` + UseCompression bool `json:"use_compression"` } func (cfg *BaseProxyConf) GetName() string { @@ -139,8 +139,8 @@ func (cfg *BaseProxyConf) UnMarshalToMsg(pMsg *msg.NewProxy) { // Bind info type BindInfoConf struct { - BindAddr string - RemotePort int64 + BindAddr string `json:"bind_addr"` + RemotePort int64 `json:"remote_port"` } func (cfg *BindInfoConf) LoadFromMsg(pMsg *msg.NewProxy) { @@ -178,8 +178,8 @@ func (cfg *BindInfoConf) check() (err error) { // Domain info type DomainConf struct { - CustomDomains []string - SubDomain string + CustomDomains []string `json:"custom_domains"` + SubDomain string `json:"sub_domain"` } func (cfg *DomainConf) LoadFromMsg(pMsg *msg.NewProxy) { @@ -235,8 +235,8 @@ func (cfg *DomainConf) check() (err error) { } type LocalSvrConf struct { - LocalIp string - LocalPort int + LocalIp string `json:"-"` + LocalPort int `json:"-"` } func (cfg *LocalSvrConf) LoadFromFile(name string, section ini.Section) (err error) { @@ -333,10 +333,10 @@ type HttpProxyConf struct { LocalSvrConf - Locations []string - HostHeaderRewrite string - HttpUser string - HttpPwd string + Locations []string `json:"locations"` + HostHeaderRewrite string `json:"host_header_rewrite"` + HttpUser string `json:"-"` + HttpPwd string `json:"-"` } func (cfg *HttpProxyConf) LoadFromMsg(pMsg *msg.NewProxy) { diff --git a/models/consts/consts.go b/models/consts/consts.go index db163ca9..170bd240 100644 --- a/models/consts/consts.go +++ b/models/consts/consts.go @@ -15,10 +15,12 @@ package consts var ( - // server status + // proxy status Idle string = "idle" Working string = "working" Closed string = "closed" + Online string = "online" + Offline string = "offline" // proxy type TcpProxy string = "tcp" diff --git a/models/metric/server.go b/models/metric/server.go deleted file mode 100644 index 8300f23a..00000000 --- a/models/metric/server.go +++ /dev/null @@ -1,221 +0,0 @@ -// Copyright 2016 fatedier, fatedier@gmail.com -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package metric - -/* -var ( - DailyDataKeepDays int = 7 - ServerMetricInfoMap map[string]*ServerMetric - smMutex sync.RWMutex -) - -type ServerMetric struct { - Name string `json:"name"` - Type string `json:"type"` - BindAddr string `json:"bind_addr"` - ListenPort int64 `json:"listen_port"` - CustomDomains []string `json:"custom_domains"` - Locations []string `json:"locations"` - Status string `json:"status"` - UseEncryption bool `json:"use_encryption"` - UseGzip bool `json:"use_gzip"` - PrivilegeMode bool `json:"privilege_mode"` - - // statistics - CurrentConns int64 `json:"current_conns"` - Daily []*DailyServerStats `json:"daily"` - mutex sync.RWMutex -} - -type DailyServerStats struct { - Time string `json:"time"` - FlowIn int64 `json:"flow_in"` - FlowOut int64 `json:"flow_out"` - TotalAcceptConns int64 `json:"total_accept_conns"` -} - -// for sort -type ServerMetricList []*ServerMetric - -func (l ServerMetricList) Len() int { return len(l) } -func (l ServerMetricList) Less(i, j int) bool { return l[i].Name < l[j].Name } -func (l ServerMetricList) Swap(i, j int) { l[i], l[j] = l[j], l[i] } - -func init() { - ServerMetricInfoMap = make(map[string]*ServerMetric) -} - -func (s *ServerMetric) clone() *ServerMetric { - copy := *s - copy.CustomDomains = make([]string, len(s.CustomDomains)) - var i int - for i = range copy.CustomDomains { - copy.CustomDomains[i] = s.CustomDomains[i] - } - - copy.Daily = make([]*DailyServerStats, len(s.Daily)) - for i = range copy.Daily { - tmpDaily := *s.Daily[i] - copy.Daily[i] = &tmpDaily - } - return © -} - -func GetAllProxyMetrics() []*ServerMetric { - result := make(ServerMetricList, 0) - smMutex.RLock() - for _, metric := range ServerMetricInfoMap { - metric.mutex.RLock() - tmpMetric := metric.clone() - metric.mutex.RUnlock() - result = append(result, tmpMetric) - } - smMutex.RUnlock() - - // sort for result by proxy name - sort.Sort(result) - return result -} - -// if proxyName isn't exist, return nil -func GetProxyMetrics(proxyName string) *ServerMetric { - smMutex.RLock() - defer smMutex.RUnlock() - metric, ok := ServerMetricInfoMap[proxyName] - if ok { - metric.mutex.RLock() - tmpMetric := metric.clone() - metric.mutex.RUnlock() - return tmpMetric - } else { - return nil - } -} - -func SetProxyInfo(proxyName string, proxyType, bindAddr string, - useEncryption, useGzip, privilegeMode bool, customDomains []string, - locations []string, listenPort int64) { - smMutex.Lock() - info, ok := ServerMetricInfoMap[proxyName] - if !ok { - info = &ServerMetric{} - info.Daily = make([]*DailyServerStats, 0) - } - info.Name = proxyName - info.Type = proxyType - info.UseEncryption = useEncryption - info.UseGzip = useGzip - info.PrivilegeMode = privilegeMode - info.BindAddr = bindAddr - info.ListenPort = listenPort - info.CustomDomains = customDomains - info.Locations = locations - ServerMetricInfoMap[proxyName] = info - smMutex.Unlock() -} - -func SetStatus(proxyName string, status int64) { - smMutex.RLock() - metric, ok := ServerMetricInfoMap[proxyName] - smMutex.RUnlock() - if ok { - metric.mutex.Lock() - metric.Status = consts.StatusStr[status] - metric.mutex.Unlock() - } -} - -type DealFuncType func(*DailyServerStats) - -func DealDailyData(dailyData []*DailyServerStats, fn DealFuncType) (newDailyData []*DailyServerStats) { - now := time.Now().Format("20060102") - dailyLen := len(dailyData) - if dailyLen == 0 { - daily := &DailyServerStats{} - daily.Time = now - fn(daily) - dailyData = append(dailyData, daily) - } else { - daily := dailyData[dailyLen-1] - if daily.Time == now { - fn(daily) - } else { - newDaily := &DailyServerStats{} - newDaily.Time = now - fn(newDaily) - if dailyLen == DailyDataKeepDays { - for i := 0; i < dailyLen-1; i++ { - dailyData[i] = dailyData[i+1] - } - dailyData[dailyLen-1] = newDaily - } else { - dailyData = append(dailyData, newDaily) - } - } - } - return dailyData -} - -func OpenConnection(proxyName string) { - smMutex.RLock() - metric, ok := ServerMetricInfoMap[proxyName] - smMutex.RUnlock() - if ok { - metric.mutex.Lock() - metric.CurrentConns++ - metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { - stats.TotalAcceptConns++ - }) - metric.mutex.Unlock() - } -} - -func CloseConnection(proxyName string) { - smMutex.RLock() - metric, ok := ServerMetricInfoMap[proxyName] - smMutex.RUnlock() - if ok { - metric.mutex.Lock() - metric.CurrentConns-- - metric.mutex.Unlock() - } -} - -func AddFlowIn(proxyName string, value int64) { - smMutex.RLock() - metric, ok := ServerMetricInfoMap[proxyName] - smMutex.RUnlock() - if ok { - metric.mutex.Lock() - metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { - stats.FlowIn += value - }) - metric.mutex.Unlock() - } -} - -func AddFlowOut(proxyName string, value int64) { - smMutex.RLock() - metric, ok := ServerMetricInfoMap[proxyName] - smMutex.RUnlock() - if ok { - metric.mutex.Lock() - metric.Daily = DealDailyData(metric.Daily, func(stats *DailyServerStats) { - stats.FlowOut += value - }) - metric.mutex.Unlock() - } -} -*/ diff --git a/server/control.go b/server/control.go index d022c74c..b2cf0abc 100644 --- a/server/control.go +++ b/server/control.go @@ -1,3 +1,17 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package server import ( @@ -253,10 +267,13 @@ func (ctl *Control) stoper() { for _, pxy := range ctl.proxies { pxy.Close() ctl.svr.DelProxy(pxy.GetName()) + StatsCloseProxy(pxy.GetConf().GetBaseInfo().ProxyType) } ctl.allShutdown.Done() ctl.conn.Info("client exit success") + + StatsCloseClient() } func (ctl *Control) manager() { @@ -296,6 +313,7 @@ func (ctl *Control) manager() { ctl.conn.Warn("new proxy [%s] error: %v", m.ProxyName, err) } else { ctl.conn.Info("new proxy [%s] success", m.ProxyName) + StatsNewProxy(m.ProxyName, m.ProxyType) } ctl.sendCh <- resp case *msg.Ping: diff --git a/server/dashboard.go b/server/dashboard.go index aca9495c..d6e290b1 100644 --- a/server/dashboard.go +++ b/server/dashboard.go @@ -1,4 +1,4 @@ -// Copyright 2016 fatedier, fatedier@gmail.com +// Copyright 2017 fatedier, fatedier@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -22,6 +22,8 @@ import ( "github.com/fatedier/frp/assets" "github.com/fatedier/frp/models/config" + + "github.com/julienschmidt/httprouter" ) var ( @@ -31,20 +33,27 @@ var ( func RunDashboardServer(addr string, port int64) (err error) { // url router - mux := http.NewServeMux() + router := httprouter.New() + // api, see dashboard_api.go //mux.HandleFunc("/api/reload", use(apiReload, basicAuth)) - //mux.HandleFunc("/api/proxies", apiProxies) + router.GET("/api/serverinfo", apiServerInfo) + router.GET("/api/proxy/tcp", apiProxyTcp) + router.GET("/api/proxy/udp", apiProxyUdp) + router.GET("/api/proxy/http", apiProxyHttp) + router.GET("/api/proxy/https", apiProxyHttps) + router.GET("/api/proxy/flow/:name", apiProxyFlow) // view, see dashboard_view.go - mux.Handle("/favicon.ico", http.FileServer(assets.FileSystem)) - mux.Handle("/static/", http.StripPrefix("/static/", http.FileServer(assets.FileSystem))) - //mux.HandleFunc("/", use(viewDashboard, basicAuth)) + //router.GET("/favicon.ico", http.FileServer(assets.FileSystem)) + router.Handler("GET", "/favicon.ico", http.FileServer(assets.FileSystem)) + router.Handler("GET", "/static", http.StripPrefix("/static/", http.FileServer(assets.FileSystem))) + //router.GET("/", use(viewDashboard, basicAuth)) address := fmt.Sprintf("%s:%d", addr, port) server := &http.Server{ Addr: address, - Handler: mux, + Handler: router, ReadTimeout: httpServerReadTimeout, WriteTimeout: httpServerWriteTimeout, } @@ -64,7 +73,6 @@ func use(h http.HandlerFunc, middleware ...func(http.HandlerFunc) http.HandlerFu for _, m := range middleware { h = m(h) } - return h } diff --git a/server/dashboard_api.go b/server/dashboard_api.go index 83204b6e..226a4824 100644 --- a/server/dashboard_api.go +++ b/server/dashboard_api.go @@ -1,4 +1,4 @@ -// Copyright 2016 fatedier, fatedier@gmail.com +// Copyright 2017 fatedier, fatedier@gmail.com // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -14,14 +14,15 @@ package server -/* import ( "encoding/json" - "fmt" "net/http" - "github.com/fatedier/frp/src/models/metric" - "github.com/fatedier/frp/src/utils/log" + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/models/consts" + "github.com/fatedier/frp/utils/log" + + "github.com/julienschmidt/httprouter" ) type GeneralResponse struct { @@ -29,41 +30,187 @@ type GeneralResponse struct { Msg string `json:"msg"` } -func apiReload(w http.ResponseWriter, r *http.Request) { - var buf []byte - res := &GeneralResponse{} +// api/serverinfo +type ServerInfoResp struct { + GeneralResponse + + VhostHttpPort int64 `json:"vhost_http_port"` + VhostHttpsPort int64 `json:"vhost_https_port"` + AuthTimeout int64 `json:"auth_timeout"` + SubdomainHost string `json:"subdomain_host"` + MaxPoolCount int64 `json:"max_pool_count"` + HeartBeatTimeout int64 `json:"heart_beat_timeout"` + + TotalFlowIn int64 `json:"total_flow_in"` + TotalFlowOut int64 `json:"total_flow_out"` + CurConns int64 `json:"cur_conns"` + ClientCounts int64 `json:"client_counts"` + ProxyTypeCounts map[string]int64 `json:"proxy_type_count"` +} + +func apiServerInfo(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res ServerInfoResp + ) defer func() { - log.Info("Http response [/api/reload]: %s", string(buf)) + log.Info("Http response [/api/serverinfo]: code [%d]", res.Code) }() - log.Info("Http request: [/api/reload]") - err := ReloadConf(ConfigFile) - if err != nil { - res.Code = 2 - res.Msg = fmt.Sprintf("%v", err) - log.Error("frps reload error: %v", err) + log.Info("Http request: [/api/serverinfo]") + cfg := config.ServerCommonCfg + serverStats := StatsGetServer() + res = ServerInfoResp{ + VhostHttpPort: cfg.VhostHttpPort, + VhostHttpsPort: cfg.VhostHttpsPort, + AuthTimeout: cfg.AuthTimeout, + SubdomainHost: cfg.SubDomainHost, + MaxPoolCount: cfg.MaxPoolCount, + HeartBeatTimeout: cfg.HeartBeatTimeout, + + TotalFlowIn: serverStats.TotalFlowIn, + TotalFlowOut: serverStats.TotalFlowOut, + CurConns: serverStats.CurConns, + ClientCounts: serverStats.ClientCounts, + ProxyTypeCounts: serverStats.ProxyTypeCounts, } - buf, _ = json.Marshal(res) + buf, _ = json.Marshal(&res) w.Write(buf) } -type ProxiesResponse struct { - Code int64 `json:"code"` - Msg string `json:"msg"` - Proxies []*metric.ServerMetric `json:"proxies"` +// Get proxy info. +type ProxyStatsInfo struct { + Conf config.ProxyConf `json:"conf"` + TodayFlowIn int64 `json:"today_flow_in"` + TodayFlowOut int64 `json:"today_flow_out"` + CurConns int64 `json:"cur_conns"` + Status string `json:"status"` } -func apiProxies(w http.ResponseWriter, r *http.Request) { - var buf []byte - res := &ProxiesResponse{} +type GetProxyInfoResp struct { + GeneralResponse + Proxies []*ProxyStatsInfo `json:"proxies"` +} + +// api/proxy/tcp +func apiProxyTcp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res GetProxyInfoResp + ) defer func() { - log.Info("Http response [/api/proxies]: code [%d]", res.Code) + log.Info("Http response [/api/proxy/tcp]: code [%d]", res.Code) }() + log.Info("Http request: [/api/proxy/tcp]") - log.Info("Http request: [/api/proxies]") - res.Proxies = metric.GetAllProxyMetrics() - buf, _ = json.Marshal(res) + res.Proxies = getProxyStatsByType(consts.TcpProxy) + + buf, _ = json.Marshal(&res) + w.Write(buf) +} + +// api/proxy/udp +func apiProxyUdp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res GetProxyInfoResp + ) + defer func() { + log.Info("Http response [/api/proxy/udp]: code [%d]", res.Code) + }() + log.Info("Http request: [/api/proxy/udp]") + + res.Proxies = getProxyStatsByType(consts.UdpProxy) + + buf, _ = json.Marshal(&res) + w.Write(buf) +} + +// api/proxy/http +func apiProxyHttp(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res GetProxyInfoResp + ) + defer func() { + log.Info("Http response [/api/proxy/http]: code [%d]", res.Code) + }() + log.Info("Http request: [/api/proxy/http]") + + res.Proxies = getProxyStatsByType(consts.HttpProxy) + + buf, _ = json.Marshal(&res) + w.Write(buf) +} + +// api/proxy/https +func apiProxyHttps(w http.ResponseWriter, r *http.Request, _ httprouter.Params) { + var ( + buf []byte + res GetProxyInfoResp + ) + defer func() { + log.Info("Http response [/api/proxy/https]: code [%d]", res.Code) + }() + log.Info("Http request: [/api/proxy/https]") + + res.Proxies = getProxyStatsByType(consts.HttpsProxy) + + buf, _ = json.Marshal(&res) + w.Write(buf) +} + +func getProxyStatsByType(proxyType string) (proxyInfos []*ProxyStatsInfo) { + proxyStats := StatsGetProxiesByType(proxyType) + proxyInfos = make([]*ProxyStatsInfo, 0, len(proxyStats)) + for _, ps := range proxyStats { + proxyInfo := &ProxyStatsInfo{} + if pxy, ok := ServerService.pxyManager.GetByName(ps.Name); ok { + proxyInfo.Conf = pxy.GetConf() + proxyInfo.Status = consts.Online + } else { + proxyInfo.Status = consts.Offline + } + proxyInfo.TodayFlowIn = ps.TodayFlowIn + proxyInfo.TodayFlowOut = ps.TodayFlowOut + proxyInfo.CurConns = ps.CurConns + proxyInfos = append(proxyInfos, proxyInfo) + } + return +} + +// api/proxy/:name/flow +type GetProxyFlowResp struct { + GeneralResponse + + Name string `json:"name"` + FlowIn []int64 `json:"flow_in"` + FlowOut []int64 `json:"flow_out"` +} + +func apiProxyFlow(w http.ResponseWriter, r *http.Request, params httprouter.Params) { + var ( + buf []byte + res GetProxyFlowResp + ) + name := params.ByName("name") + + defer func() { + log.Info("Http response [/api/proxy/flow/:name]: code [%d]", res.Code) + }() + log.Info("Http request: [/api/proxy/flow/:name]") + + res.Name = name + proxyFlowInfo := StatsGetProxyFlow(name) + if proxyFlowInfo == nil { + res.Code = 1 + res.Msg = "no proxy info found" + } else { + res.FlowIn = proxyFlowInfo.FlowIn + res.FlowOut = proxyFlowInfo.FlowOut + } + + buf, _ = json.Marshal(&res) w.Write(buf) } -*/ diff --git a/server/manager.go b/server/manager.go index 81845198..47456c00 100644 --- a/server/manager.go +++ b/server/manager.go @@ -1,3 +1,17 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package server import ( diff --git a/server/metric.go b/server/metric.go new file mode 100644 index 00000000..d3771274 --- /dev/null +++ b/server/metric.go @@ -0,0 +1,241 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package server + +import ( + "sync" + + "github.com/fatedier/frp/models/config" + "github.com/fatedier/frp/utils/metric" +) + +const ( + ReserveDays = 7 +) + +var globalStats *ServerStatistics + +type ServerStatistics struct { + TotalFlowIn metric.DateCounter + TotalFlowOut metric.DateCounter + CurConns metric.Counter + + ClientCounts metric.Counter + ProxyTypeCounts map[string]metric.Counter + + ProxyStatistics map[string]*ProxyStatistics + + mu sync.Mutex +} + +type ProxyStatistics struct { + ProxyType string + FlowIn metric.DateCounter + FlowOut metric.DateCounter + CurConns metric.Counter +} + +func init() { + globalStats = &ServerStatistics{ + TotalFlowIn: metric.NewDateCounter(ReserveDays), + TotalFlowOut: metric.NewDateCounter(ReserveDays), + CurConns: metric.NewCounter(), + + ClientCounts: metric.NewCounter(), + ProxyTypeCounts: make(map[string]metric.Counter), + + ProxyStatistics: make(map[string]*ProxyStatistics), + } +} + +func StatsNewClient() { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.ClientCounts.Inc(1) + } +} + +func StatsCloseClient() { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.ClientCounts.Dec(1) + } +} + +func StatsNewProxy(name string, proxyType string) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + counter, ok := globalStats.ProxyTypeCounts[proxyType] + if !ok { + counter = metric.NewCounter() + } + counter.Inc(1) + globalStats.ProxyTypeCounts[proxyType] = counter + + proxyStats, ok := globalStats.ProxyStatistics[name] + if !ok { + proxyStats = &ProxyStatistics{ + ProxyType: proxyType, + CurConns: metric.NewCounter(), + FlowIn: metric.NewDateCounter(ReserveDays), + FlowOut: metric.NewDateCounter(ReserveDays), + } + globalStats.ProxyStatistics[name] = proxyStats + } + } +} + +func StatsCloseProxy(proxyType string) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + if counter, ok := globalStats.ProxyTypeCounts[proxyType]; ok { + counter.Dec(1) + } + } +} + +func StatsOpenConnection(name string) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.CurConns.Inc(1) + + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + proxyStats, ok := globalStats.ProxyStatistics[name] + if ok { + proxyStats.CurConns.Inc(1) + globalStats.ProxyStatistics[name] = proxyStats + } + } +} + +func StatsCloseConnection(name string) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.CurConns.Dec(1) + + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + proxyStats, ok := globalStats.ProxyStatistics[name] + if ok { + proxyStats.CurConns.Dec(1) + globalStats.ProxyStatistics[name] = proxyStats + } + } +} + +func StatsAddFlowIn(name string, flowIn int64) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.TotalFlowIn.Inc(flowIn) + + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + proxyStats, ok := globalStats.ProxyStatistics[name] + if ok { + proxyStats.FlowIn.Inc(flowIn) + globalStats.ProxyStatistics[name] = proxyStats + } + } +} + +func StatsAddFlowOut(name string, flowOut int64) { + if config.ServerCommonCfg.DashboardPort != 0 { + globalStats.TotalFlowOut.Inc(flowOut) + + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + proxyStats, ok := globalStats.ProxyStatistics[name] + if ok { + proxyStats.FlowOut.Inc(flowOut) + globalStats.ProxyStatistics[name] = proxyStats + } + } +} + +// Functions for getting server stats. +type ServerStats struct { + TotalFlowIn int64 + TotalFlowOut int64 + CurConns int64 + ClientCounts int64 + ProxyTypeCounts map[string]int64 +} + +func StatsGetServer() *ServerStats { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + s := &ServerStats{ + TotalFlowIn: globalStats.TotalFlowIn.TodayCount(), + TotalFlowOut: globalStats.TotalFlowOut.TodayCount(), + CurConns: globalStats.CurConns.Count(), + ClientCounts: globalStats.ClientCounts.Count(), + ProxyTypeCounts: make(map[string]int64), + } + for k, v := range globalStats.ProxyTypeCounts { + s.ProxyTypeCounts[k] = v.Count() + } + return s +} + +type ProxyStats struct { + Name string + Type string + TodayFlowIn int64 + TodayFlowOut int64 + CurConns int64 +} + +func StatsGetProxiesByType(proxyType string) []*ProxyStats { + res := make([]*ProxyStats, 0) + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + for name, proxyStats := range globalStats.ProxyStatistics { + if proxyStats.ProxyType != proxyType { + continue + } + + ps := &ProxyStats{ + Name: name, + Type: proxyStats.ProxyType, + TodayFlowIn: proxyStats.FlowIn.TodayCount(), + TodayFlowOut: proxyStats.FlowOut.TodayCount(), + CurConns: proxyStats.CurConns.Count(), + } + res = append(res, ps) + } + return res +} + +type ProxyFlowInfo struct { + Name string + FlowIn []int64 + FlowOut []int64 +} + +func StatsGetProxyFlow(name string) (res *ProxyFlowInfo) { + globalStats.mu.Lock() + defer globalStats.mu.Unlock() + + proxyStats, ok := globalStats.ProxyStatistics[name] + if ok { + res = &ProxyFlowInfo{ + Name: name, + } + res.FlowIn = proxyStats.FlowIn.GetLastDaysCount(ReserveDays) + res.FlowOut = proxyStats.FlowOut.GetLastDaysCount(ReserveDays) + } + return +} diff --git a/server/proxy.go b/server/proxy.go index c6d96091..54a31dbc 100644 --- a/server/proxy.go +++ b/server/proxy.go @@ -1,3 +1,17 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package server import ( @@ -402,6 +416,11 @@ func HandleUserTcpConnection(pxy Proxy, userConn frpNet.Conn) { } pxy.Debug("join connections, workConn(l[%s] r[%s]) userConn(l[%s] r[%s])", workConn.LocalAddr().String(), workConn.RemoteAddr().String(), userConn.LocalAddr().String(), userConn.RemoteAddr().String()) - tcp.Join(local, userConn) + + StatsOpenConnection(pxy.GetName()) + inCount, outCount := tcp.Join(local, userConn) + StatsCloseConnection(pxy.GetName()) + StatsAddFlowIn(pxy.GetName(), inCount) + StatsAddFlowOut(pxy.GetName(), outCount) pxy.Debug("join connections closed") } diff --git a/server/service.go b/server/service.go index a07506dc..b11c44c9 100644 --- a/server/service.go +++ b/server/service.go @@ -1,3 +1,17 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package server import ( @@ -14,6 +28,8 @@ import ( "github.com/fatedier/frp/utils/vhost" ) +var ServerService *Service + // Server service. type Service struct { // Accept connections from client. @@ -172,6 +188,9 @@ func (svr *Service) RegisterControl(ctlConn net.Conn, loginMsg *msg.Login) (err ctlConn.AddLogPrefix(loginMsg.RunId) ctl.Start() + + // for statistics + StatsNewClient() return } diff --git a/utils/metric/counter.go b/utils/metric/counter.go new file mode 100644 index 00000000..4b9c7a66 --- /dev/null +++ b/utils/metric/counter.go @@ -0,0 +1,60 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "sync/atomic" +) + +type Counter interface { + Count() int64 + Inc(int64) + Dec(int64) + Snapshot() Counter + Clear() +} + +func NewCounter() Counter { + return &StandardCounter{ + count: 0, + } +} + +type StandardCounter struct { + count int64 +} + +func (c *StandardCounter) Count() int64 { + return atomic.LoadInt64(&c.count) +} + +func (c *StandardCounter) Inc(count int64) { + atomic.AddInt64(&c.count, count) +} + +func (c *StandardCounter) Dec(count int64) { + atomic.AddInt64(&c.count, -count) +} + +func (c *StandardCounter) Snapshot() Counter { + tmp := &StandardCounter{ + count: atomic.LoadInt64(&c.count), + } + return tmp +} + +func (c *StandardCounter) Clear() { + atomic.StoreInt64(&c.count, 0) +} diff --git a/utils/metric/date_counter.go b/utils/metric/date_counter.go new file mode 100644 index 00000000..ce32f4f3 --- /dev/null +++ b/utils/metric/date_counter.go @@ -0,0 +1,157 @@ +// Copyright 2017 fatedier, fatedier@gmail.com +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "sync" + "time" +) + +type DateCounter interface { + TodayCount() int64 + GetLastDaysCount(lastdays int64) []int64 + Inc(int64) + Dec(int64) + Snapshot() DateCounter + Clear() + Close() +} + +func NewDateCounter(reserveDays int64) DateCounter { + if reserveDays <= 0 { + reserveDays = 1 + } + return newStandardDateCounter(reserveDays) +} + +type StandardDateCounter struct { + reserveDays int64 + counts []int64 + + closeCh chan struct{} + closed bool + mu sync.Mutex +} + +func newStandardDateCounter(reserveDays int64) *StandardDateCounter { + s := &StandardDateCounter{ + reserveDays: reserveDays, + counts: make([]int64, reserveDays), + closeCh: make(chan struct{}), + } + s.startRotateWorker() + return s +} + +func (c *StandardDateCounter) TodayCount() int64 { + c.mu.Lock() + defer c.mu.Unlock() + return c.counts[0] +} + +func (c *StandardDateCounter) GetLastDaysCount(lastdays int64) []int64 { + if lastdays > c.reserveDays { + lastdays = c.reserveDays + } + counts := make([]int64, lastdays) + + c.mu.Lock() + defer c.mu.Unlock() + for i := 0; i < int(lastdays); i++ { + counts[i] = c.counts[i] + } + return counts +} + +func (c *StandardDateCounter) Inc(count int64) { + c.mu.Lock() + defer c.mu.Unlock() + c.counts[0] += count +} + +func (c *StandardDateCounter) Dec(count int64) { + c.mu.Lock() + defer c.mu.Unlock() + c.counts[0] -= count +} + +func (c *StandardDateCounter) Snapshot() DateCounter { + c.mu.Lock() + defer c.mu.Unlock() + tmp := &StandardDateCounter{ + reserveDays: c.reserveDays, + counts: make([]int64, c.reserveDays), + } + for i := 0; i < int(c.reserveDays); i++ { + tmp.counts[i] = c.counts[i] + } + return tmp +} + +func (c *StandardDateCounter) Clear() { + c.mu.Lock() + defer c.mu.Unlock() + for i := 0; i < int(c.reserveDays); i++ { + c.counts[i] = 0 + } +} + +func (c *StandardDateCounter) Close() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.closed { + close(c.closeCh) + c.closed = true + } +} + +func (c *StandardDateCounter) rotate() { + c.mu.Lock() + defer c.mu.Unlock() + newCounts := make([]int64, c.reserveDays) + + for i := 1; i < int(c.reserveDays-1); i++ { + newCounts[i] = c.counts[i+1] + } + c.counts = newCounts +} + +func (c *StandardDateCounter) startRotateWorker() { + now := time.Now() + nextDayTimeStr := now.Add(24 * time.Hour).Format("20060102") + nextDay, _ := time.Parse("20060102", nextDayTimeStr) + d := nextDay.Sub(now) + + firstTimer := time.NewTimer(d) + rotateTicker := time.NewTicker(24 * time.Hour) + + go func() { + for { + select { + case <-firstTimer.C: + firstTimer.Stop() + rotateTicker.Stop() + rotateTicker = time.NewTicker(24 * time.Hour) + c.rotate() + case <-rotateTicker.C: + c.rotate() + case <-c.closeCh: + break + } + } + firstTimer.Stop() + rotateTicker.Stop() + }() +}