From 8bc95e5c176cb97e15fe718d4e59a78a42025183 Mon Sep 17 00:00:00 2001 From: Apex Liu Date: Thu, 14 Dec 2017 02:30:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4dashboard=EF=BC=8C=E7=94=A8?= =?UTF-8?q?=E6=88=B7=E6=95=B0=E3=80=81=E4=B8=BB=E6=9C=BA=E6=95=B0=E3=80=81?= =?UTF-8?q?=E8=B4=A6=E5=8F=B7=E6=95=B0=E5=92=8C=E8=BF=9E=E6=8E=A5=E6=95=B0?= =?UTF-8?q?=E4=B9=9F=E9=83=BD=E4=BB=8Ewebsocket=E6=96=B9=E5=BC=8F=E8=BF=9B?= =?UTF-8?q?=E8=A1=8C=E8=8E=B7=E5=8F=96=E5=B9=B6=E8=AE=A2=E9=98=85=E9=80=9A?= =?UTF-8?q?=E7=9F=A5=EF=BC=8C=E4=BB=A5=E5=BE=97=E5=88=B0=E5=AE=9E=E6=97=B6?= =?UTF-8?q?=E5=8F=8D=E9=A6=88=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../teleport/static/js/dashboard/dashboard.js | 91 ++++++---- server/www/teleport/webroot/app/base/stats.py | 160 ++++++++++++++++++ .../www/teleport/webroot/app/base/status.py | 107 ------------ .../www/teleport/webroot/app/base/webapp.py | 4 +- server/www/teleport/webroot/app/base/wss.py | 94 ++++++++++ .../webroot/app/controller/__init__.py | 2 - .../webroot/app/controller/account.py | 29 ---- .../webroot/app/controller/dashboard.py | 21 +-- .../teleport/webroot/app/controller/rpc.py | 20 ++- .../www/teleport/webroot/app/controller/ws.py | 92 +--------- .../www/teleport/webroot/app/model/account.py | 5 + server/www/teleport/webroot/app/model/host.py | 5 + server/www/teleport/webroot/app/model/stat.py | 88 ---------- .../www/teleport/webroot/app/model/stats.py | 86 ++++++++++ server/www/teleport/webroot/app/model/user.py | 5 + 15 files changed, 435 insertions(+), 374 deletions(-) create mode 100644 server/www/teleport/webroot/app/base/stats.py delete mode 100644 server/www/teleport/webroot/app/base/status.py create mode 100644 server/www/teleport/webroot/app/base/wss.py delete mode 100644 server/www/teleport/webroot/app/model/stat.py create mode 100644 server/www/teleport/webroot/app/model/stats.py diff --git a/server/www/teleport/static/js/dashboard/dashboard.js b/server/www/teleport/static/js/dashboard/dashboard.js index e3f56ca..2ab1da1 100644 --- a/server/www/teleport/static/js/dashboard/dashboard.js +++ b/server/www/teleport/static/js/dashboard/dashboard.js @@ -13,10 +13,14 @@ $app.on_init = function (cb_stack) { bar_disk: $('#bar-disk') }; - window.onresize = $app.on_screen_resize; + $app.stat_counter = { + user: -1, + host: -1, + acc: -1, + conn: -1 + }; - // refresh basic info every 1m. - $app.load_basic_info(); + window.onresize = $app.on_screen_resize; $app.ws = null; $app.init_ws(); @@ -24,27 +28,6 @@ $app.on_init = function (cb_stack) { cb_stack.exec(); }; -$app.load_basic_info = function () { - $tp.ajax_post_json('/dashboard/do-get-basic', {}, - function (ret) { - if (ret.code === TPE_OK) { - $app.dom.count_user.text(ret.data.count_user); - $app.dom.count_host.text(ret.data.count_host); - $app.dom.count_acc.text(ret.data.count_acc); - $app.dom.count_conn.text(ret.data.count_conn); - } else { - console.log('do-get-basic failed:' + tp_error_msg(ret.code, ret.message)); - } - - }, - function () { - console.log('can not connect to server.'); - } - ); - - setTimeout($app.load_basic_info, 60 * 1000); -}; - $app.init_sys_status_info = function (data) { var i = 0; @@ -101,8 +84,6 @@ $app.init_sys_status_info = function (data) { } }; - console.log(axis_size_cfg); - //===================================== // CPU //===================================== @@ -405,6 +386,48 @@ $app.init_sys_status_info = function (data) { }; +$app.update_stat_counter = function (data) { + if ($app.stat_counter.user === -1) { + $app.stat_counter.user = data.user; + $app.dom.count_user.text(data.user); + } else if ($app.stat_counter.user !== data.user) { + $app.stat_counter.user = data.user; + $app.dom.count_user.fadeOut(300, function () { + $app.dom.count_user.text(data.user).fadeIn(400); + }); + } + + if ($app.stat_counter.host === -1) { + $app.stat_counter.host = data.host; + $app.dom.count_host.text(data.host); + } else if ($app.stat_counter.host !== data.host) { + $app.stat_counter.host = data.host; + $app.dom.count_host.fadeOut(300, function () { + $app.dom.count_host.text(data.host).fadeIn(400); + }); + } + + if ($app.stat_counter.acc === -1) { + $app.stat_counter.acc = data.acc; + $app.dom.count_acc.text(data.acc); + } else if ($app.stat_counter.acc !== data.acc) { + $app.stat_counter.acc = data.acc; + $app.dom.count_acc.fadeOut(300, function () { + $app.dom.count_acc.text(data.acc).fadeIn(400); + }); + } + + if ($app.stat_counter.conn === -1) { + $app.stat_counter.conn = data.conn; + $app.dom.count_conn.text(data.conn); + } else if ($app.stat_counter.conn !== data.conn) { + $app.stat_counter.conn = data.conn; + $app.dom.count_conn.fadeOut(300, function () { + $app.dom.count_conn.text(data.conn).fadeIn(400); + }); + } +}; + $app.init_ws = function () { if ($app.ws !== null) delete $app.ws; @@ -414,16 +437,13 @@ $app.init_ws = function () { $app.ws.onopen = function (e) { $app.ws.send('{"method": "request", "param": "sys_status"}'); - // 订阅: - // $app.ws.send('{"method": "subscribe", "params": ["sys_status"]}'); + $app.ws.send('{"method": "request", "param": "stat_counter"}'); }; $app.ws.onclose = function (e) { - // console.log('[ws] ws-on-close', e); setTimeout($app.init_ws, 5000); }; $app.ws.onmessage = function (e) { var t = JSON.parse(e.data); - // console.log(t); if (t.method === 'request' && t.param === 'sys_status') { $app.init_sys_status_info(t.data); @@ -432,6 +452,13 @@ $app.init_ws = function () { return; } + if (t.method === 'request' && t.param === 'stat_counter') { + $app.update_stat_counter(t.data); + // 订阅数量信息 + $app.ws.send('{"method": "subscribe", "params": ["stat_counter"]}'); + return; + } + if (t.method === 'subscribe' && t.param === 'sys_status') { $app.bar_cpu_user.shift(); $app.bar_cpu_user.push({name: tp_format_datetime_ms(tp_utc2local_ms(t.data.t), 'HH:mm:ss'), value: [t.data.t, t.data.cpu.u]}); @@ -463,6 +490,10 @@ $app.init_ws = function () { {series: [{data: $app.bar_disk_read}, {data: $app.bar_disk_write}]} ); } + + if (t.method === 'subscribe' && t.param === 'stat_counter') { + $app.update_stat_counter(t.data); + } }; }; diff --git a/server/www/teleport/webroot/app/base/stats.py b/server/www/teleport/webroot/app/base/stats.py new file mode 100644 index 0000000..3a9b5d3 --- /dev/null +++ b/server/www/teleport/webroot/app/base/stats.py @@ -0,0 +1,160 @@ +# -*- coding: utf-8 -*- + +import psutil +from app.base.utils import tp_utc_timestamp_ms +from app.const import * +from app.base.wss import tp_wss +from app.base.cron import tp_corn +from app.model import stats + + +class TPStats(object): + _INTERVAL = 5 # seconds + + def __init__(self): + super().__init__() + + import builtins + if '__tp_stats__' in builtins.__dict__: + raise RuntimeError('TPStats object exists, you can not create more than one instance.') + + # 实时数据我们在内存中保留最近10分钟的数据,每5秒收集一次,共 10*60/5 = 120 条记录 + self._sys_stats = list() + + # 网络流量和磁盘IO是递增的数据,因此要记下上一次采集的数据,以便计算间隔时间内的增量 + self._net_recv = 0 + self._net_sent = 0 + self._disk_read = 0 + self._disk_write = 0 + + self._counter_stats = { + 'user': 0, + 'host': 0, + 'acc': 0, + 'conn': 0 + } + + def init(self): + t = tp_utc_timestamp_ms() - 10 * 60 * 1000 + cnt = int((10 * 60 + self._INTERVAL - 1) / self._INTERVAL) + for i in range(cnt): + val = { + 't': t, + 'cpu': {'u': 0, 's': 0}, + 'mem': {'u': 1, 't': 100}, + 'disk': {'r': 0, 'w': 0}, + 'net': {'r': 0, 's': 0} + } + self._sys_stats.append(val) + t += self._INTERVAL * 1000 + + psutil.cpu_times_percent() + net = psutil.net_io_counters(pernic=False) + self._net_recv = net.bytes_recv + self._net_sent = net.bytes_sent + disk = psutil.disk_io_counters(perdisk=False) + self._disk_read = disk.read_bytes + self._disk_write = disk.write_bytes + + err, c = stats.get_basic_stats() + if TPE_OK == err: + self._counter_stats = c + + # 每 5秒 采集一次系统状态统计数据 + tp_corn().add_job('sys_status', self._check_sys_stats, first_interval_seconds=self._INTERVAL, interval_seconds=self._INTERVAL) + # 每 一小时 重新查询一次数据库,得到用户数/主机数/账号数/连接数,避免统计数量出现偏差 + tp_corn().add_job('query_counter', self._query_counter, first_interval_seconds=60 * 60, interval_seconds=60 * 60) + tp_wss().register_get_sys_status_callback(self.get_sys_stats) + tp_wss().register_get_stat_counter_callback(self.get_counter_stats) + + return True + + def _check_sys_stats(self): + val = {'t': tp_utc_timestamp_ms()} + + cpu = psutil.cpu_times_percent() + val['cpu'] = {'u': cpu.user, 's': cpu.system} + + mem = psutil.virtual_memory() + val['mem'] = {'u': mem.used, 't': mem.total} + + disk = psutil.disk_io_counters(perdisk=False) + _read = disk.read_bytes - self._disk_read + _write = disk.write_bytes - self._disk_write + self._disk_read = disk.read_bytes + self._disk_write = disk.write_bytes + + if _read < 0: + _read = 0 + if _write < 0: + _write = 0 + val['disk'] = {'r': int(_read / self._INTERVAL), 'w': int(_write / self._INTERVAL)} + + net = psutil.net_io_counters(pernic=False) + _recv = net.bytes_recv - self._net_recv + _sent = net.bytes_sent - self._net_sent + self._net_recv = net.bytes_recv + self._net_sent = net.bytes_sent + + # On some systems such as Linux, on a very busy or long-lived system, the numbers + # returned by the kernel may overflow and wrap (restart from zero) + if _recv < 0: + _recv = 0 + if _sent < 0: + _sent = 0 + val['net'] = {'r': int(_recv / self._INTERVAL), 's': int(_sent / self._INTERVAL)} + + self._sys_stats.pop(0) + self._sys_stats.append(val) + + tp_wss().send_message('sys_status', val) + + def _query_counter(self): + # 直接从数据库中查询数据,避免长时间运行后计数不准确 + err, c = stats.get_basic_stats() + if TPE_OK == err: + self._counter_stats = c + tp_wss().send_message('stat_counter', self._counter_stats) + + def get_sys_stats(self): + return self._sys_stats + + def get_counter_stats(self): + return self._counter_stats + + def user_counter_change(self, alt_count): + self._counter_stats['user'] += alt_count + if self._counter_stats['user'] < 0: + self._counter_stats['user'] = 0 + tp_wss().send_message('stat_counter', self._counter_stats) + + def host_counter_change(self, alt_count): + self._counter_stats['host'] += alt_count + if self._counter_stats['host'] < 0: + self._counter_stats['host'] = 0 + tp_wss().send_message('stat_counter', self._counter_stats) + + def acc_counter_change(self, alt_count): + self._counter_stats['acc'] += alt_count + if self._counter_stats['acc'] < 0: + self._counter_stats['acc'] = 0 + tp_wss().send_message('stat_counter', self._counter_stats) + + def conn_counter_change(self, alt_count): + self._counter_stats['conn'] += alt_count + if self._counter_stats['conn'] < 0: + self._counter_stats['conn'] = 0 + tp_wss().send_message('stat_counter', self._counter_stats) + + +def tp_stats(): + """ + 取得 TPSysStatus 的唯一实例 + + :rtype : TPStats + """ + + import builtins + if '__tp_stats__' not in builtins.__dict__: + builtins.__dict__['__tp_stats__'] = TPStats() + return builtins.__dict__['__tp_stats__'] diff --git a/server/www/teleport/webroot/app/base/status.py b/server/www/teleport/webroot/app/base/status.py deleted file mode 100644 index 54a4f3e..0000000 --- a/server/www/teleport/webroot/app/base/status.py +++ /dev/null @@ -1,107 +0,0 @@ -# -*- coding: utf-8 -*- - -import psutil -from app.base.utils import tp_timestamp_utc_now, tp_utc_timestamp_ms -from app.controller.ws import tp_wss -from app.base.cron import tp_corn - - -class TPSysStatus(object): - _INTERVAL = 5 # seconds - - def __init__(self): - super().__init__() - - import builtins - if '__tp_sys_status__' in builtins.__dict__: - raise RuntimeError('TPSysStatus object exists, you can not create more than one instance.') - - # 实时数据我们在内存中保留最近10分钟的数据,每5秒收集一次,共 10*60/5 = 120 条记录 - self._history = list() - - self._disk_read = 0 - self._disk_write = 0 - self._net_recv = 0 - self._net_sent = 0 - - def init(self): - t = tp_utc_timestamp_ms() - 10 * 60 * 1000 - cnt = int((10 * 60 + self._INTERVAL - 1) / self._INTERVAL) - for i in range(cnt): - val = { - 't': t, - 'cpu': {'u': 0, 's': 0}, - 'mem': {'u': 1, 't': 100}, - 'disk': {'r': 0, 'w': 0}, - 'net': {'r': 0, 's': 0} - } - self._history.append(val) - t += self._INTERVAL*1000 - - psutil.cpu_times_percent() - net = psutil.net_io_counters(pernic=False) - self._net_recv = net.bytes_recv - self._net_sent = net.bytes_sent - disk = psutil.disk_io_counters(perdisk=False) - self._disk_read = disk.read_bytes - self._disk_write = disk.write_bytes - - tp_corn().add_job('sys_status', self._check_status, first_interval_seconds=self._INTERVAL, interval_seconds=self._INTERVAL) - tp_wss().register_get_sys_status_callback(self.get_status) - return True - - def _check_status(self): - val = {'t': tp_utc_timestamp_ms()} - - cpu = psutil.cpu_times_percent() - val['cpu'] = {'u': cpu.user, 's': cpu.system} - - mem = psutil.virtual_memory() - val['mem'] = {'u': mem.used, 't': mem.total} - - disk = psutil.disk_io_counters(perdisk=False) - _read = disk.read_bytes - self._disk_read - _write = disk.write_bytes - self._disk_write - self._disk_read = disk.read_bytes - self._disk_write = disk.write_bytes - - if _read < 0: - _read = 0 - if _write < 0: - _write = 0 - val['disk'] = {'r': int(_read / self._INTERVAL), 'w': int(_write / self._INTERVAL)} - - net = psutil.net_io_counters(pernic=False) - _recv = net.bytes_recv - self._net_recv - _sent = net.bytes_sent - self._net_sent - self._net_recv = net.bytes_recv - self._net_sent = net.bytes_sent - - # On some systems such as Linux, on a very busy or long-lived system, the numbers - # returned by the kernel may overflow and wrap (restart from zero) - if _recv < 0: - _recv = 0 - if _sent < 0: - _sent = 0 - val['net'] = {'r': int(_recv / self._INTERVAL), 's': int(_sent / self._INTERVAL)} - - self._history.pop(0) - self._history.append(val) - - tp_wss().send_message('sys_status', val) - - def get_status(self): - return self._history - - -def tp_sys_status(): - """ - 取得TPSysStatus管理器的唯一实例 - - :rtype : TPSysStatus - """ - - import builtins - if '__tp_sys_status__' not in builtins.__dict__: - builtins.__dict__['__tp_sys_status__'] = TPSysStatus() - return builtins.__dict__['__tp_sys_status__'] diff --git a/server/www/teleport/webroot/app/base/webapp.py b/server/www/teleport/webroot/app/base/webapp.py index 87f6c1b..ffb80ed 100644 --- a/server/www/teleport/webroot/app/base/webapp.py +++ b/server/www/teleport/webroot/app/base/webapp.py @@ -17,7 +17,7 @@ from app.base.db import get_db from app.base.logger import log from app.base.session import tp_session from app.base.cron import tp_corn -from app.base.status import tp_sys_status +from app.base.stats import tp_stats class WebApp: @@ -94,7 +94,7 @@ class WebApp: if not tp_session().init(): log.e('can not initialize session manager.\n') return 0 - if not tp_sys_status().init(): + if not tp_stats().init(): log.e('can not initialize system status collector.\n') return 0 diff --git a/server/www/teleport/webroot/app/base/wss.py b/server/www/teleport/webroot/app/base/wss.py new file mode 100644 index 0000000..38d120b --- /dev/null +++ b/server/www/teleport/webroot/app/base/wss.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- + +import json +import threading + +from app.base.logger import * + + +class TPWebSocketServer(object): + _clients = {} + _lock = threading.RLock() + + def __init__(self): + super().__init__() + + import builtins + if '__tp_websocket_server__' in builtins.__dict__: + raise RuntimeError('TPWebSocketServer object exists, you can not create more than one instance.') + + self._cb_get_sys_status = None + self._cb_get_stat_counter = None + + def register_get_sys_status_callback(self, cb): + self._cb_get_sys_status = cb + + def register_get_stat_counter_callback(self, cb): + self._cb_get_stat_counter = cb + + def have_callbacker(self, callbacker): + return callbacker in self._clients + + def register(self, callbacker): + # 记录客户端连接实例 + with self._lock: + if not self.have_callbacker(callbacker): + self._clients[callbacker] = {'subscribe': []} + + def unregister(self, callbacker): + with self._lock: + # 删除客户端连接实例 + try: + del self._clients[callbacker] + except: + # print('when unregister, not exists.') + pass + + def on_message(self, callbacker, message): + print('got message', message) + try: + req = json.loads(message) + except: + log.e('need json-format request.\n') + return + + if req['method'] == 'subscribe': + for p in req['params']: + if p not in self._clients[callbacker]['subscribe']: + self._clients[callbacker]['subscribe'].append(p) + elif req['method'] == 'request': + if req['param'] == 'sys_status': + if self._cb_get_sys_status is not None: + message = self._cb_get_sys_status() + msg = {'method': 'request', 'param': 'sys_status', 'data': message} + s = json.dumps(msg, separators=(',', ':')) + callbacker.write_message(s) + if req['param'] == 'stat_counter': + if self._cb_get_stat_counter is not None: + message = self._cb_get_stat_counter() + msg = {'method': 'request', 'param': 'stat_counter', 'data': message} + s = json.dumps(msg, separators=(',', ':')) + callbacker.write_message(s) + + def send_message(self, subscribe, message): + s = None + with self._lock: + for c in self._clients: + if subscribe in self._clients[c]['subscribe']: + if s is None: + msg = {'method': 'subscribe', 'param': subscribe, 'data': message} + s = json.dumps(msg, separators=(',', ':')) + c.write_message(s) + + +def tp_wss(): + """ + 取得 TPWebSocketServer 管理器的唯一实例 + + :rtype : TPWebSocketServer + """ + + import builtins + if '__tp_websocket_server__' not in builtins.__dict__: + builtins.__dict__['__tp_websocket_server__'] = TPWebSocketServer() + return builtins.__dict__['__tp_websocket_server__'] diff --git a/server/www/teleport/webroot/app/controller/__init__.py b/server/www/teleport/webroot/app/controller/__init__.py index 1317c02..31d84b7 100644 --- a/server/www/teleport/webroot/app/controller/__init__.py +++ b/server/www/teleport/webroot/app/controller/__init__.py @@ -25,8 +25,6 @@ controllers = [ # ==================================================== # - 控制台页面 (r'/dashboard', dashboard.IndexHandler), - # - [json] 获取基本信息 - (r'/dashboard/do-get-basic', dashboard.DoGetBasicHandler), # ==================================================== # 外部调用接口 diff --git a/server/www/teleport/webroot/app/controller/account.py b/server/www/teleport/webroot/app/controller/account.py index 8b70c64..ddf8db9 100644 --- a/server/www/teleport/webroot/app/controller/account.py +++ b/server/www/teleport/webroot/app/controller/account.py @@ -12,8 +12,6 @@ from app.const import * from app.model import account from app.model import group -# cfg = tp_cfg() - # 临时认证ID的基数,每次使用时均递减 tmp_auth_id_base = -1 tmp_auth_id_lock = threading.RLock() @@ -293,30 +291,3 @@ class DoUpdateAccountsHandler(TPBaseJsonHandler): return self.write_json(err) else: return self.write_json(TPE_PARAM) - -# class DoRemoveAccountHandler(TPBaseJsonHandler): -# def post(self): -# ret = self.check_privilege(TP_PRIVILEGE_ACCOUNT | TP_PRIVILEGE_ACCOUNT_GROUP) -# if ret != TPE_OK: -# return -# -# args = self.get_argument('args', None) -# if args is None: -# return self.write_json(TPE_PARAM) -# try: -# args = json.loads(args) -# except: -# return self.write_json(TPE_JSON_FORMAT) -# -# try: -# host_id = int(args['host_id']) -# acc_id = int(args['acc_id']) -# except: -# log.e('\n') -# return self.write_json(TPE_PARAM) -# -# if host_id <= 0 or acc_id <= 0: -# return self.write_json(TPE_PARAM) -# -# err = account.delete_account(self, host_id, acc_id) -# self.write_json(err) diff --git a/server/www/teleport/webroot/app/controller/dashboard.py b/server/www/teleport/webroot/app/controller/dashboard.py index c3e9df0..65a5f2e 100644 --- a/server/www/teleport/webroot/app/controller/dashboard.py +++ b/server/www/teleport/webroot/app/controller/dashboard.py @@ -2,7 +2,7 @@ from app.const import * from app.base.controller import TPBaseHandler, TPBaseJsonHandler -from app.model import stat +from app.model import stats class IndexHandler(TPBaseHandler): @@ -12,22 +12,3 @@ class IndexHandler(TPBaseHandler): return self.render('dashboard/index.mako') - - -class DoGetBasicHandler(TPBaseJsonHandler): - def post(self): - ret = self.check_privilege(TP_PRIVILEGE_LOGIN_WEB) - if ret != TPE_OK: - return - - err, info = stat.get_basic() - if err != TPE_OK: - return self.write_json(err) - - # ret = dict() - # ret['count_user'] = 5 - # ret['count_host'] = 5 - # ret['count_acc'] = 5 - # ret['count_conn'] = 5 - self.write_json(TPE_OK, data=info) - diff --git a/server/www/teleport/webroot/app/controller/rpc.py b/server/www/teleport/webroot/app/controller/rpc.py index aece3cb..d306af5 100644 --- a/server/www/teleport/webroot/app/controller/rpc.py +++ b/server/www/teleport/webroot/app/controller/rpc.py @@ -9,6 +9,7 @@ from app.base.configs import tp_cfg from app.base.session import tp_session from app.base.core_server import core_service_async_post_http from app.model import record +from app.base.stats import tp_stats from app.base.logger import * from app.base.controller import TPBaseJsonHandler @@ -63,7 +64,10 @@ class RpcHandler(TPBaseJsonHandler): conn_id = param['conn_id'] x = tp_session().taken('tmp-conn-info-{}'.format(conn_id), None) - return self.write_json(0, data=x) + if x is None: + return self.write_json(TPE_NOT_EXISTS) + else: + return self.write_json(TPE_OK, data=x) def _session_begin(self, param): try: @@ -87,6 +91,7 @@ class RpcHandler(TPBaseJsonHandler): if err != TPE_OK: return self.write_json(err, message='can not write database.') else: + tp_stats().conn_counter_change(1) return self.write_json(TPE_OK, data={'rid': record_id}) def _session_update(self, param): @@ -106,19 +111,20 @@ class RpcHandler(TPBaseJsonHandler): def _session_end(self, param): if 'rid' not in param or 'code' not in param: - return self.write_json(-1, message='invalid request.') + return self.write_json(TPE_PARAM, message='invalid request.') if not record.session_end(param['rid'], param['code']): - return self.write_json(-1, 'can not write database.') + return self.write_json(TPE_DATABASE, 'can not write database.') else: - return self.write_json(0) + tp_stats().conn_counter_change(-1) + return self.write_json(TPE_OK) def _register_core(self, param): # 因为core服务启动了(之前可能非正常终止了),做一下数据库中会话状态的修复操作 record.session_fix() if 'rpc' not in param: - return self.write_json(-1, 'invalid param.') + return self.write_json(TPE_PARAM, 'invalid param.') tp_cfg().common.core_server_rpc = param['rpc'] @@ -132,8 +138,8 @@ class RpcHandler(TPBaseJsonHandler): log.d('update base server config info.\n') tp_cfg().update_core(ret_data) - return self.write_json(0) + return self.write_json(TPE_OK) def _exit(self): # set exit flag. - return self.write_json(0) + return self.write_json(TPE_OK) diff --git a/server/www/teleport/webroot/app/controller/ws.py b/server/www/teleport/webroot/app/controller/ws.py index a4332de..5bf1956 100644 --- a/server/www/teleport/webroot/app/controller/ws.py +++ b/server/www/teleport/webroot/app/controller/ws.py @@ -1,101 +1,15 @@ # -*- coding: utf-8 -*- import json -# import urllib.parse -import threading -# import tornado.gen -from app.const import * -# from app.base.configs import tp_cfg -from app.base.session import tp_session -# from app.base.core_server import core_service_async_post_http -# from app.model import record -from app.base.logger import * -# from app.base.controller import TPBaseJsonHandler import tornado.websocket - - -class TPWebSocketServer(object): - _clients = {} - _lock = threading.RLock() - - def __init__(self): - super().__init__() - - import builtins - if '__tp_websocket_server__' in builtins.__dict__: - raise RuntimeError('TPWebSocketServer object exists, you can not create more than one instance.') - - self._cb_get_sys_status = None - - def register_get_sys_status_callback(self, cb): - self._cb_get_sys_status = cb - - def have_callbacker(self, callbacker): - return callbacker in self._clients - - def register(self, callbacker): - # 记录客户端连接实例 - with self._lock: - if not self.have_callbacker(callbacker): - self._clients[callbacker] = {'subscribe': []} - - def unregister(self, callbacker): - with self._lock: - # 删除客户端连接实例 - try: - del self._clients[callbacker] - except: - # print('when unregister, not exists.') - pass - - def on_message(self, callbacker, message): - print('got message', message) - try: - req = json.loads(message) - except: - log.e('need json-format request.\n') - return - - if req['method'] == 'subscribe': - for p in req['params']: - if p not in self._clients[callbacker]['subscribe']: - self._clients[callbacker]['subscribe'].append(p) - elif req['method'] == 'request': - if req['param'] == 'sys_status': - if self._cb_get_sys_status is not None: - message = self._cb_get_sys_status() - msg = {'method': 'request', 'param': 'sys_status', 'data': message} - s = json.dumps(msg, separators=(',', ':')) - callbacker.write_message(s) - - def send_message(self, subscribe, message): - s = None - with self._lock: - for c in self._clients: - if subscribe in self._clients[c]['subscribe']: - if s is None: - msg = {'method': 'subscribe', 'param': subscribe, 'data': message} - s = json.dumps(msg, separators=(',', ':')) - c.write_message(s) - - -def tp_wss(): - """ - 取得 TPWebSocketServer 管理器的唯一实例 - - :rtype : TPWebSocketServer - """ - - import builtins - if '__tp_websocket_server__' not in builtins.__dict__: - builtins.__dict__['__tp_websocket_server__'] = TPWebSocketServer() - return builtins.__dict__['__tp_websocket_server__'] +from app.base.session import tp_session +from app.base.wss import tp_wss +from app.const import * class WebSocketHandler(tornado.websocket.WebSocketHandler): def check_origin(self, origin): # 针对websocket处理类重写同源检查的方法 - # print('check_origin: ', origin) return True # 接受websocket链接,保存链接实例 diff --git a/server/www/teleport/webroot/app/model/account.py b/server/www/teleport/webroot/app/model/account.py index 145c56c..7aa5ebe 100644 --- a/server/www/teleport/webroot/app/model/account.py +++ b/server/www/teleport/webroot/app/model/account.py @@ -5,6 +5,7 @@ from app.base.logger import log from app.base.db import get_db, SQL from . import syslog from app.base.utils import tp_timestamp_utc_now +from app.base.stats import tp_stats def get_account_info(acc_id): @@ -215,6 +216,8 @@ def add_account(handler, host_id, args): # if not db_ret: # return TPE_DATABASE, 0 + tp_stats().acc_counter_change(1) + return TPE_OK, _id @@ -349,6 +352,8 @@ def remove_accounts(handler, host_id, acc_ids): syslog.sys_log(handler.get_current_user(), handler.request.remote_ip, TPE_OK, "删除账号:{}".format(','.join(acc_names))) + tp_stats().acc_counter_change(-1) + return TPE_OK diff --git a/server/www/teleport/webroot/app/model/host.py b/server/www/teleport/webroot/app/model/host.py index 80bb3fa..85d55d1 100644 --- a/server/www/teleport/webroot/app/model/host.py +++ b/server/www/teleport/webroot/app/model/host.py @@ -7,6 +7,7 @@ from app.const import * from app.base.logger import log from app.base.db import get_db, SQL from . import syslog +from app.base.stats import tp_stats from app.base.utils import tp_timestamp_utc_now @@ -113,6 +114,7 @@ def add_host(handler, args): if len(args['router_ip']) > 0: h_name += '(由{}:{}路由)'.format(args['router_ip'], args['router_port']) syslog.sys_log(handler.get_current_user(), handler.request.remote_ip, TPE_OK, "创建主机:{}".format(h_name)) + tp_stats().host_counter_change(1) return TPE_OK, _id @@ -203,8 +205,10 @@ def remove_hosts(handler, hosts): if len(acc_names) > 0: syslog.sys_log(handler.get_current_user(), handler.request.remote_ip, TPE_OK, "删除账号:{}".format(','.join(acc_names))) + tp_stats().acc_counter_change(0 - len(acc_names)) if len(host_names) > 0: syslog.sys_log(handler.get_current_user(), handler.request.remote_ip, TPE_OK, "删除主机:{}".format(','.join(host_names))) + tp_stats().host_counter_change(0 - len(host_names)) return TPE_OK @@ -266,6 +270,7 @@ def update_hosts_state(handler, host_ids, state): else: return TPE_DATABASE + # # def unlock_hosts(handler, host_ids): # db = get_db() diff --git a/server/www/teleport/webroot/app/model/stat.py b/server/www/teleport/webroot/app/model/stat.py deleted file mode 100644 index 40460db..0000000 --- a/server/www/teleport/webroot/app/model/stat.py +++ /dev/null @@ -1,88 +0,0 @@ -# -*- coding: utf-8 -*- - -from app.const import * -from app.base.db import get_db, SQL -from app.base.logger import log -from app.base.utils import tp_timestamp_utc_now - - -def get_basic(): - db = get_db() - - ret = {'count_user': 0, - 'count_host': 0, - 'count_acc': 0, - 'count_conn': 0 - } - - sql = 'SELECT COUNT(*) FROM `{tpdb}user`;'.format(tpdb=db.table_prefix) - db_ret = db.query(sql) - if not db_ret or len(db_ret) == 0: - pass - else: - ret['count_user'] = db_ret[0][0] - - sql = 'SELECT COUNT(*) FROM `{tpdb}host`;'.format(tpdb=db.table_prefix) - db_ret = db.query(sql) - if not db_ret or len(db_ret) == 0: - pass - else: - ret['count_host'] = db_ret[0][0] - - sql = 'SELECT COUNT(*) FROM `{tpdb}acc`;'.format(tpdb=db.table_prefix) - db_ret = db.query(sql) - if not db_ret or len(db_ret) == 0: - pass - else: - ret['count_acc'] = db_ret[0][0] - - sql = 'SELECT COUNT(*) FROM `{tpdb}record` WHERE `state` IN ({sess_running},{sess_started});'.format(tpdb=db.table_prefix, sess_running=TP_SESS_STAT_RUNNING, sess_started=TP_SESS_STAT_STARTED) - db_ret = db.query(sql) - if not db_ret or len(db_ret) == 0: - pass - else: - ret['count_conn'] = db_ret[0][0] - - return TPE_OK, ret - - -def get_logs(sql_filter, sql_order, sql_limit): - s = SQL(get_db()) - s.select_from('syslog', ['id', 'user_name', 'user_surname', 'client_ip', 'code', 'log_time', 'message'], alt_name='l') - - str_where = '' - _where = list() - - if len(sql_filter) > 0: - for k in sql_filter: - if k == 'log_user_name': - _where.append('l.user_name="{}"'.format(sql_filter[k])) - # elif k == 'search_record': - # _where.append('(h.name LIKE "%{}%" OR h.ip LIKE "%{}%" OR h.router_addr LIKE "%{}%" OR h.desc LIKE "%{}%" OR h.cid LIKE "%{}%")'.format(sql_filter[k], sql_filter[k], sql_filter[k], sql_filter[k], sql_filter[k])) - - if len(_where) > 0: - str_where = '( {} )'.format(' AND '.join(_where)) - - s.where(str_where) - - if sql_order is not None: - _sort = False if not sql_order['asc'] else True - if 'log_time' == sql_order['name']: - s.order_by('l.log_time', _sort) - # elif 'name' == sql_order['name']: - # s.order_by('h.name', _sort) - # elif 'os_type' == sql_order['name']: - # s.order_by('h.os_type', _sort) - # elif 'cid' == sql_order['name']: - # s.order_by('h.cid', _sort) - # elif 'state' == sql_order['name']: - # s.order_by('h.state', _sort) - else: - log.e('unknown order field: {}\n'.format(sql_order['name'])) - return TPE_PARAM, s.total_count, s.recorder - - if len(sql_limit) > 0: - s.limit(sql_limit['page_index'], sql_limit['per_page']) - - err = s.query() - return err, s.total_count, s.recorder diff --git a/server/www/teleport/webroot/app/model/stats.py b/server/www/teleport/webroot/app/model/stats.py new file mode 100644 index 0000000..4109337 --- /dev/null +++ b/server/www/teleport/webroot/app/model/stats.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- + +from app.const import * +from app.base.db import get_db + + +def get_basic_stats(): + db = get_db() + + ret = {'user': 0, + 'host': 0, + 'acc': 0, + 'conn': 0 + } + + sql = 'SELECT COUNT(*) FROM `{tpdb}user`;'.format(tpdb=db.table_prefix) + db_ret = db.query(sql) + if not db_ret or len(db_ret) == 0: + pass + else: + ret['user'] = db_ret[0][0] + + sql = 'SELECT COUNT(*) FROM `{tpdb}host`;'.format(tpdb=db.table_prefix) + db_ret = db.query(sql) + if not db_ret or len(db_ret) == 0: + pass + else: + ret['host'] = db_ret[0][0] + + sql = 'SELECT COUNT(*) FROM `{tpdb}acc`;'.format(tpdb=db.table_prefix) + db_ret = db.query(sql) + if not db_ret or len(db_ret) == 0: + pass + else: + ret['acc'] = db_ret[0][0] + + sql = 'SELECT COUNT(*) FROM `{tpdb}record` WHERE `state` IN ({sess_running},{sess_started});'.format(tpdb=db.table_prefix, sess_running=TP_SESS_STAT_RUNNING, sess_started=TP_SESS_STAT_STARTED) + db_ret = db.query(sql) + if not db_ret or len(db_ret) == 0: + pass + else: + ret['conn'] = db_ret[0][0] + + return TPE_OK, ret + + +# def get_logs(sql_filter, sql_order, sql_limit): +# s = SQL(get_db()) +# s.select_from('syslog', ['id', 'user_name', 'user_surname', 'client_ip', 'code', 'log_time', 'message'], alt_name='l') +# +# str_where = '' +# _where = list() +# +# if len(sql_filter) > 0: +# for k in sql_filter: +# if k == 'log_user_name': +# _where.append('l.user_name="{}"'.format(sql_filter[k])) +# # elif k == 'search_record': +# # _where.append('(h.name LIKE "%{}%" OR h.ip LIKE "%{}%" OR h.router_addr LIKE "%{}%" OR h.desc LIKE "%{}%" OR h.cid LIKE "%{}%")'.format(sql_filter[k], sql_filter[k], sql_filter[k], sql_filter[k], sql_filter[k])) +# +# if len(_where) > 0: +# str_where = '( {} )'.format(' AND '.join(_where)) +# +# s.where(str_where) +# +# if sql_order is not None: +# _sort = False if not sql_order['asc'] else True +# if 'log_time' == sql_order['name']: +# s.order_by('l.log_time', _sort) +# # elif 'name' == sql_order['name']: +# # s.order_by('h.name', _sort) +# # elif 'os_type' == sql_order['name']: +# # s.order_by('h.os_type', _sort) +# # elif 'cid' == sql_order['name']: +# # s.order_by('h.cid', _sort) +# # elif 'state' == sql_order['name']: +# # s.order_by('h.state', _sort) +# else: +# log.e('unknown order field: {}\n'.format(sql_order['name'])) +# return TPE_PARAM, s.total_count, s.recorder +# +# if len(sql_limit) > 0: +# s.limit(sql_limit['page_index'], sql_limit['per_page']) +# +# err = s.query() +# return err, s.total_count, s.recorder diff --git a/server/www/teleport/webroot/app/model/user.py b/server/www/teleport/webroot/app/model/user.py index 6c3becb..c063e9b 100644 --- a/server/www/teleport/webroot/app/model/user.py +++ b/server/www/teleport/webroot/app/model/user.py @@ -8,6 +8,7 @@ from app.base.logger import log from app.base.utils import tp_timestamp_utc_now, tp_generate_random from app.const import * from app.model import syslog +from app.base.stats import tp_stats from app.logic.auth.password import tp_password_verify from app.logic.auth.oath import tp_oath_verify_code @@ -204,6 +205,7 @@ def create_users(handler, user_list, success, failed): if len(name_list) > 0: syslog.sys_log(operator, handler.request.remote_ip, TPE_OK, "创建用户:{}".format(','.join(name_list))) + tp_stats().user_counter_change(len(name_list)) def create_user(handler, args): @@ -240,6 +242,7 @@ def create_user(handler, args): _id = db.last_insert_id() syslog.sys_log(operator, handler.request.remote_ip, TPE_OK, "创建用户:{}".format(args['username'])) + tp_stats().user_counter_change(1) return TPE_OK, _id @@ -512,6 +515,8 @@ def remove_users(handler, users): syslog.sys_log(handler.get_current_user(), handler.request.remote_ip, TPE_OK, "删除用户:{}".format(','.join(name_list))) + tp_stats().user_counter_change(0 - len(name_list)) + return TPE_OK