新增:使用PING的方式检查主机在线情况。

dev
Apex Liu 2020-12-16 02:18:41 +08:00
parent 3a42600b73
commit f6cc313f87
9 changed files with 404 additions and 18 deletions

View File

@ -85,6 +85,15 @@ $app.create_controls = function (cb_stack) {
render: 'account',
fields: {count: 'acc_count'}
},
{
title: "在线",
key: "_alive",
sort: false,
width: 90,
align: 'center',
render: 'host_alive',
fields: {id: 'id', alive: '_alive', alive_info: '_alive_info'}
},
{
title: "状态",
key: "state",
@ -232,6 +241,12 @@ $app.on_table_host_cell_created = function (tbl, row_id, col_key, cell_obj) {
cell_obj.find('[data-action="edit-account"]').click(function () {
$app.dlg_accounts.show(row_id);
});
} else if (col_key === '_alive') {
cell_obj.find('[data-toggle="popover"]').popover({trigger: 'hover'});
// } else if (col_key === 'account') {
// cell_obj.find('[data-action="add-account"]').click(function () {
// $app.dlg_accounts.show(row_id);
// });
}
};
@ -337,6 +352,41 @@ $app.on_table_host_render_created = function (render) {
return '<span class="label label-sm label-' + _style + '">' + _state + '</span>'
};
render.host_alive = function (row_id, fields) {
var _style, _alive;
if (fields.alive === 0) {
_style = 'alive-unknown';
_alive = '正在检测,请稍后刷新页面';
} else if (fields.alive === 1) {
_style = 'alive-online';
_alive = '在线<hr/>最后检测:' + tp_second2str(fields.alive_info.last_check) + '前';
} else if (fields.alive === 2) {
_style = 'alive-warning';
_alive = '可能离线<hr/>最后检测:' + tp_second2str(fields.alive_info.last_check) + '前';
} else if (fields.alive === 3) {
_style = 'alive-offline';
_alive = '离线<hr/>最后在线:';
if (fields.alive_info.last_online === 0)
_alive = _alive + '未发现曾经上线';
else
_alive = _alive + tp_format_datetime(fields.alive_info.last_online);
} else {
_style = 'alive-unknown';
_alive = '正在检测';
}
var ret = [];
ret.push('<div><a class="alive ' + _style + '" data-toggle="popover" data-placement="left"');
ret.push(' data-html="true"');
ret.push(' data-content="' + _alive + '"');
ret.push('><i class="fa fa-circle"></i></a>');
ret.push('</div>');
return ret.join('');
};
render.make_host_action_btn = function (row_id, fields) {
var h = [];
h.push('<div class="btn-group btn-group-sm">');
@ -1505,8 +1555,7 @@ $app.create_dlg_edit_account = function () {
dlg.dom.protocol_port.val(23);
dlg.dom.prompt_username.val('ogin:');
dlg.dom.prompt_password.val('assword:');
}
else {
} else {
dlg.dom.protocol_port.val(dlg.account.protocol_port);
dlg.dom.prompt_username.val(dlg.account.username_prompt);
dlg.dom.prompt_password.val(dlg.account.password_prompt);
@ -1520,7 +1569,7 @@ $app.create_dlg_edit_account = function () {
dlg.dom.auth_type.empty().append($(html.join('')));
if(!_.isNull(dlg.account))
if (!_.isNull(dlg.account))
dlg.dom.auth_type.val(dlg.account.auth_type);
dlg.on_auth_change();

View File

@ -18,6 +18,25 @@
<%block name="embed_css">
<style>
.alive {
font-size:24px;
}
.popover hr {
padding:0;
margin:3px 0;
}
.alive-online, a.alive-online:hover {
color: #3c763d;
}
.alive-unknown, a.alive-unknown:hover {
color: #cdcdcd;
}
.alive-warning, a.alive-warning:hover {
color: #eabc61;
}
.alive-offline, a.alive-offline:hover {
color: #d65a5a;
}
</style>
</%block>

View File

@ -31,7 +31,7 @@ class TPCron(threading.Thread):
if name in self._jobs:
return False
self._jobs[name] = {'f': func, 't': 0, 'i': interval_seconds}
_now = int(datetime.datetime.utcnow().timestamp())
_now = int(datetime.datetime.now().timestamp())
if first_interval_seconds is not None:
self._jobs[name]['t'] = _now + first_interval_seconds - interval_seconds
@ -45,7 +45,7 @@ class TPCron(threading.Thread):
time.sleep(1)
with self._lock:
_now = int(datetime.datetime.utcnow().timestamp())
_now = int(datetime.datetime.now().timestamp())
for j in self._jobs:
# log.v('--now: {}, job-name: {}, job-t: {}, job-i: {}\n'.format(_now, j, self._jobs[j]['t'], self._jobs[j]['i']))
if _now - self._jobs[j]['t'] >= self._jobs[j]['i']:

View File

@ -228,9 +228,9 @@ class TPDatabase:
args = ()
# log.d('[db] {}, {}\n'.format(sql, args))
# _start = datetime.datetime.utcnow().timestamp()
# _start = datetime.datetime.now().timestamp()
ret = self._conn_pool.query(sql, args)
# _end = datetime.datetime.utcnow().timestamp()
# _end = datetime.datetime.now().timestamp()
# log.d('[db] cost {} seconds.\n'.format(_end - _start))
return ret
@ -239,9 +239,9 @@ class TPDatabase:
args = ()
# log.d('[db] {}\n'.format(sql, args))
# print('[db]', sql, args)
# _start = datetime.datetime.utcnow().timestamp()
# _start = datetime.datetime.now().timestamp()
ret = self._conn_pool.exec(sql, args)
# _end = datetime.datetime.utcnow().timestamp()
# _end = datetime.datetime.now().timestamp()
# log.d('[db] cost {} seconds.\n'.format(_end - _start))
return ret
@ -250,9 +250,9 @@ class TPDatabase:
# for sql in sql_list:
# log.d('[db] * {}\n'.format(sql))
# _start = datetime.datetime.utcnow().timestamp()
# _start = datetime.datetime.now().timestamp()
ret = self._conn_pool.transaction(sql_list)
# _end = datetime.datetime.utcnow().timestamp()
# _end = datetime.datetime.now().timestamp()
# log.d('[db] transaction\n')
# log.d('[db] cost {} seconds.\n'.format(_end - _start))
return ret

View File

@ -0,0 +1,263 @@
# -*- coding: utf-8 -*-
"""
通过PING等方式判断远程主机的存活状态
对系统中所有主机执行PING默认每一分钟执行一次默认PING为5秒超时
对每台主机
记录最后一次PING成功的时间并与当前时间比较
1分钟以内正常界面上绿色图标
大于1分钟小于2分钟故障界面上黄色图标
大于2分钟离线界面上红色图标
"""
import time
import threading
import socket
import select
import struct
from app.base.configs import tp_cfg
from app.base.cron import tp_cron
from app.base.logger import log
from app.model import host
# import app.model.host
def calc_icmp_checksum(data):
n = len(data)
m = n % 2
checksum = 0
for i in range(0, n - m, 2):
# 每两个字节视作一个小端字节序的uint16把它们加到一起
checksum += (data[i]) + ((data[i + 1]) << 8)
if m:
checksum += (data[-1])
# 将高16位与低16位相加
checksum = (checksum >> 16) + (checksum & 0xffff)
# 如果还有高于16位将继续与低16位相加
checksum += (checksum >> 16)
# 结果是一个 uint16
ret = ~checksum & 0xffff
# 主机字节序转网络字节序列(小端序转大端序)
ret = ret >> 8 | (ret << 8 & 0xff00)
return ret
class HostAlive(object):
PING_INTERVAL = 60 # 每分钟执行一次
PING_TIMEOUT = 5 # 5秒
METHOD_PING = 0
METHOD_HTTP_GET = 1
ICMP_ECHO_REQUEST = 8
STATE_UNKNOWN = 0 # 未知(尚未检测)
STATE_ONLINE = 1 # 在线
STATE_WARNING = 2 # 可能离线
STATE_OFFLINE = 3 # 已经离线(距离上次在线状态超过两分钟)
def __init__(self):
super().__init__()
import builtins
if '__host_alive__' in builtins.__dict__:
raise RuntimeError('HostAlive object exists, you can not create more than one instance.')
# 主机状态表主机ip为索引每个项为一个字典包括
# {
# 'last_online': TIMESTAMP,
# 'last_check': TIMESTAMP,
# 'method': 0=PING,1=HTTP-GET
# 'param': {}
# }
self._states = dict()
self._need_stop = False
self._socket_ping = None
self._base_ping_pkg_id = 0
self._thread_recv_ping_result = None
self._ping_pkg_id_list = dict()
self._lock = threading.RLock()
def init(self):
icmp_protocol = socket.getprotobyname('icmp')
try:
self._socket_ping = socket.socket(socket.AF_INET, socket.SOCK_RAW, icmp_protocol)
except PermissionError:
print('To use PING to check host state, must run as root.')
log.e('To use PING to check host state, must run as root.\n')
return False
# 加载所有主机IP
hosts = host.get_all_hosts_for_check_state()
for h in hosts:
if h['router_ip'] != '':
self.add_host(h['router_ip'], HostAlive.METHOD_PING)
else:
self.add_host(h['ip'], HostAlive.METHOD_PING)
self._thread_recv_ping_result = threading.Thread(target=self._thread_func_recv_ping_result)
self._thread_recv_ping_result.start()
tp_cron().add_job('host_check_alive', self._check_alive, first_interval_seconds=10, interval_seconds=HostAlive.PING_INTERVAL)
# for test:
# tp_cron().add_job('host_show_alive', self._show_alive, first_interval_seconds=20, interval_seconds=HostAlive.PING_INTERVAL)
return True
def stop(self):
self._need_stop = True
if self._thread_recv_ping_result is not None:
self._thread_recv_ping_result.join()
def add_host(self, host_ip, method=0, param=None, check_now=False):
if param is None:
param = {}
# now we support PING only
if method != HostAlive.METHOD_PING:
log.e('Unknown method for check host state: {}\n'.format(method))
return False
with self._lock:
if host_ip not in self._states:
self._states[host_ip] = {'last_online': 0, 'last_check': 0, 'method': method, 'param': param}
else:
self._states[host_ip]['method'] = method
self._states[host_ip]['param'] = param
if check_now:
if method == HostAlive.METHOD_PING:
self._ping(host_ip)
else:
log.w('Warning: check alive method not implement.\n')
def remove_host(self, host_ip):
with self._lock:
if host_ip not in self._states:
return
del self._states[host_ip]
def get_states(self, host_ip_list):
with self._lock:
ret = dict()
time_now = int(time.time())
for k in host_ip_list:
if k not in self._states:
ret[k] = {'state': HostAlive.STATE_UNKNOWN, 'last_online': 0, 'last_check': 0}
continue
if self._states[k]['last_check'] == 0:
ret[k] = {'state': HostAlive.STATE_UNKNOWN, 'last_online': 0, 'last_check': time_now - self._states[k]['last_check']}
continue
if self._states[k]['last_online'] == 0:
ret[k] = {'state': HostAlive.STATE_WARNING, 'last_online': 0, 'last_check': time_now - self._states[k]['last_check']}
if time_now - self._states[k]['last_online'] > 2 * 60:
_state = HostAlive.STATE_OFFLINE
elif time_now - self._states[k]['last_online'] > 60:
_state = HostAlive.STATE_WARNING
else:
_state = HostAlive.STATE_ONLINE
ret[k] = {'state': _state, 'last_online': self._states[k]['last_online'], 'last_check': time_now - self._states[k]['last_check']}
return ret
def _check_alive(self):
with self._lock:
self._ping_pkg_id_list.clear()
for k in self._states.keys():
if self._states[k]['method'] == HostAlive.METHOD_PING:
self._ping(k)
# def _show_alive(self):
# with self._lock:
# log.v('-------------')
# time_now = time.time()
# for k in self._states.keys():
# if time_now - self._states[k]['last_online'] > 2 * 60:
# state = 'OFF-LINE'
# elif time_now - self._states[k]['last_online'] > 60:
# state = 'Maybe off-line'
# else:
# state = 'ON-LINE'
#
# print('{:>15s} {}'.format(k, state))
def _ping(self, host_ip):
pkg_data, pkg_id = self._make_ping_packet()
return self._send_icmp_request(host_ip, pkg_id, pkg_data)
def _send_icmp_request(self, target_ip, pkg_id, icmp_pkg):
with self._lock:
if target_ip not in self._states:
return False
self._states[target_ip]['last_check'] = int(time.time())
self._ping_pkg_id_list[pkg_id] = target_ip
self._socket_ping.sendto(icmp_pkg, (target_ip, 1))
return True
def _make_ping_packet(self):
self._base_ping_pkg_id += 1
if self._base_ping_pkg_id > 65530:
self._base_ping_pkg_id = 1
pkg_id = self._base_ping_pkg_id
pkg_type = HostAlive.ICMP_ECHO_REQUEST
pkg_code = 0 # must be zero
pkg_sequence = 1 # sequence number
pkg_payload = b'12345678901234567890123456789012' # 32B payload data
icmp_checksum = 0
# type(1B), code(1B), checksum(2B), id(2B), sequence(2B), payload
icmp_packet = struct.pack('>BBHHH32s', pkg_type, pkg_code, icmp_checksum, pkg_id, pkg_sequence, pkg_payload)
icmp_checksum = calc_icmp_checksum(icmp_packet)
icmp_packet = struct.pack('>BBHHH32s', pkg_type, pkg_code, icmp_checksum, pkg_id, pkg_sequence, pkg_payload)
return icmp_packet, pkg_id
def _thread_func_recv_ping_result(self):
while not self._need_stop:
event = select.select([self._socket_ping], [], [], 1)
if not event[0]:
continue
data, _ = self._socket_ping.recvfrom(128) # data-length=60
if len(data) < 28:
continue
_type, _code, _checksum, _pkg_id, _sequence = struct.unpack(">BBHHH", data[20:28])
if _type != 0 or _sequence != 1:
continue
time_now = int(time.time())
with self._lock:
if _pkg_id not in self._ping_pkg_id_list:
continue
target_host = self._ping_pkg_id_list[_pkg_id]
del self._ping_pkg_id_list[_pkg_id]
time_used = time_now - self._states[target_host]['last_check']
if time_used <= HostAlive.PING_TIMEOUT:
self._states[target_host]['last_online'] = time_now
log.v('thread for receive PING result stopped.\n')
def tp_host_alive():
"""
取得远程主机存活状态检查器的唯一实例
:rtype : HostAlive
"""
import builtins
if '__host_alive__' not in builtins.__dict__:
builtins.__dict__['__host_alive__'] = HostAlive()
return builtins.__dict__['__host_alive__']

View File

@ -23,8 +23,8 @@ class SessionManager(object):
self._expire = 0
self._lock = threading.RLock()
self._stop_flag = False
self._timer_cond = threading.Condition()
# self._stop_flag = False
# self._timer_cond = threading.Condition()
def init(self):
self.update_default_expire()
@ -35,7 +35,7 @@ class SessionManager(object):
self._expire = tp_cfg().sys.login.session_timeout * 60
def _check_expire(self):
_now = int(datetime.datetime.utcnow().timestamp())
_now = int(datetime.datetime.now().timestamp())
with self._lock:
_keys = [k for k in self._session_dict]
for k in _keys:
@ -61,7 +61,7 @@ class SessionManager(object):
if s_id in self._session_dict:
del self._session_dict[s_id]
else:
self._session_dict[s_id] = {'v': value, 't': int(datetime.datetime.utcnow().timestamp()), 'e': expire}
self._session_dict[s_id] = {'v': value, 't': int(datetime.datetime.now().timestamp()), 'e': expire}
def get(self, s_id, _default=None):
# 从session中获取一个数据读取并更新最后访问时间
@ -70,11 +70,11 @@ class SessionManager(object):
if self._session_dict[s_id]['e'] == 0:
return self._session_dict[s_id]['v']
else:
if int(datetime.datetime.utcnow().timestamp()) - self._session_dict[s_id]['t'] > self._session_dict[s_id]['e']:
if int(datetime.datetime.now().timestamp()) - self._session_dict[s_id]['t'] > self._session_dict[s_id]['e']:
del self._session_dict[s_id]
return _default
else:
self._session_dict[s_id]['t'] = int(datetime.datetime.utcnow().timestamp())
self._session_dict[s_id]['t'] = int(datetime.datetime.now().timestamp())
return self._session_dict[s_id]['v']
else:

View File

@ -21,6 +21,7 @@ from app.base.logger import log
from app.base.session import tp_session
from app.base.cron import tp_cron
from app.base.stats import tp_stats
from app.base.host_alive import tp_host_alive
from app.app_ver import TP_SERVER_VER
@ -131,6 +132,9 @@ class WebApp:
if not tp_stats().init():
log.e('can not initialize system status collector.\n')
return 0
if not tp_host_alive().init():
log.e('can not initialize host state inspector.\n')
return 0
settings = {
#
@ -185,6 +189,7 @@ class WebApp:
except:
log.e('\n')
tp_host_alive().stop()
tp_cron().stop()
return 0

View File

@ -10,6 +10,7 @@ import tornado.gen
import tornado.httpclient
from app.base.configs import tp_cfg
from app.base.host_alive import tp_host_alive
from app.const import *
from app.model import host
from app.model import account
@ -102,10 +103,27 @@ class DoGetHostsHandler(TPBaseJsonHandler):
err, total_count, page_index, row_data = \
host.get_hosts(sql_filter, sql_order, sql_limit, sql_restrict, sql_exclude)
ip_list = list()
for x in range(len(row_data)):
if row_data[x]['router_ip'] != '':
ip_list.append(row_data[x]['router_ip'])
else:
ip_list.append(row_data[x]['ip'])
ip_list = list(set(ip_list))
host_states = tp_host_alive().get_states(ip_list)
for x in range(len(row_data)):
if row_data[x]['router_ip'] != '':
row_data[x]['_alive_info'] = host_states[row_data[x]['router_ip']]
else:
row_data[x]['_alive_info'] = host_states[row_data[x]['ip']]
row_data[x]['_alive'] = row_data[x]['_alive_info']['state']
ret = dict()
ret['page_index'] = page_index
ret['total'] = total_count
ret['data'] = row_data
self.write_json(err, data=ret)

View File

@ -9,6 +9,7 @@ from app.base.db import get_db, SQL
from . import syslog
from app.base.stats import tp_stats
from app.base.utils import tp_timestamp_sec
import app.base.host_alive
def get_host_info(host_id):
@ -141,6 +142,11 @@ def add_host(handler, args):
if not db_ret:
return TPE_DATABASE, 0
if len(args['router_ip']) > 0:
app.base.host_alive.tp_host_alive().add_host(args['router_ip'], check_now=True)
else:
app.base.host_alive.tp_host_alive().add_host(args['ip'], check_now=True)
_id = db.last_insert_id()
h_name = args['ip']
@ -215,6 +221,9 @@ def remove_hosts(handler, hosts):
h_name = h['ip']
if len(h['router_ip']) > 0:
h_name += '(由{}:{}路由)'.format(h['router_ip'], h['router_port'])
app.base.host_alive.tp_host_alive().remove_host(h['router_ip'])
else:
app.base.host_alive.tp_host_alive().remove_host(h['ip'])
if h_name not in host_names:
host_names.append(h_name)
@ -253,11 +262,23 @@ def update_host(handler, args):
db = get_db()
# 1. 判断是否存在
sql = 'SELECT `id` FROM `{}host` WHERE `id`={};'.format(db.table_prefix, args['id'])
sql = 'SELECT `id`,`ip`,`router_ip` FROM `{}host` WHERE `id`={};'.format(db.table_prefix, args['id'])
db_ret = db.query(sql)
if db_ret is None or len(db_ret) == 0:
return TPE_NOT_EXISTS
old_ip = db_ret[0][1]
old_router_ip = db_ret[0][2]
if len(old_router_ip) > 0:
app.base.host_alive.tp_host_alive().remove_host(old_router_ip)
else:
app.base.host_alive.tp_host_alive().remove_host(old_ip)
if len(args['router_ip']) > 0:
app.base.host_alive.tp_host_alive().add_host(args['router_ip'], check_now=True)
else:
app.base.host_alive.tp_host_alive().add_host(args['ip'], check_now=True)
sql_list = list()
sql_s = 'UPDATE `{tp}host` SET `os_type`={ph},`name`={ph},`ip`={ph},`router_ip`={ph}, ' \
'`router_port`={ph},`cid`={ph},`desc`={ph} WHERE `id`={ph};' \
@ -485,3 +506,14 @@ def api_v1_get_host(hosts_ip):
ret[ip]['account'].append({'id': a['id'], 'name': a['username'], 'protocol': a['protocol_type']})
return TPE_OK, ret
def get_all_hosts_for_check_state():
"""查询所有主机"""
s = SQL(get_db())
s.select_from('host', ['ip', 'router_ip'], alt_name='h')
err = s.query()
if err != TPE_OK:
return None
return s.recorder