解决使用MySQL数据库时,如果TP服务先于MySQL服务启动,会导致显示“首次安装”界面。修改方法是启动时如果无法连接到MySQL服务,就每隔5秒重试连接。只有连接成功之后才判断是否需要创建表结构,从而决定是否显示“首次安装界面”。

pull/105/head
Apex Liu 2017-12-13 19:28:07 +08:00
parent 7336219cc5
commit 1a025d3434
8 changed files with 96 additions and 120 deletions

View File

@ -41,8 +41,6 @@ class TPCron(threading.Thread):
log.v('{} stopped.\n'.format(self.name))
def run(self):
pass
while not self._stop_flag:
time.sleep(1)

View File

@ -716,27 +716,27 @@ class DatabaseInit:
'CREATE TABLE `{}syslog` ({});'.format(self.db.table_prefix, ','.join(f))
)
def _create_sys_state(self):
""" 系统运行状态记录用于dashboard页面展示
系统每5分钟记录一次当前cpu负载/磁盘IO负载/网络IO负载/远程连接数用来显示曲线图
然后系统定时每天做一次清理操作将超过30天的数据清除掉
"""
f = list()
# id: 自增主键
f.append('`id` integer PRIMARY KEY {}'.format(self.db.auto_increment))
# type 数据类型1=cpu, 2=disk-io, 3=net-io, 4=remote-count
f.append('`type` int(11) DEFAULT 0')
# val: 记录的值
f.append('`val` int(11) DEFAULT 0')
# ts: 记录时间
f.append('`ts` int(11) DEFAULT 0')
self._db_exec(
'创建系统运行状态记录表...',
'CREATE TABLE `{}sys_state` ({});'.format(self.db.table_prefix, ','.join(f))
)
# def _create_sys_state(self):
# """ 系统运行状态记录用于dashboard页面展示
# 系统每5分钟记录一次当前cpu负载/磁盘IO负载/网络IO负载/远程连接数,用来显示曲线图
# 然后系统定时每天做一次清理操作将超过30天的数据清除掉
# """
# f = list()
#
# # id: 自增主键
# f.append('`id` integer PRIMARY KEY {}'.format(self.db.auto_increment))
#
# # type 数据类型1=cpu, 2=disk-io, 3=net-io, 4=remote-count
# f.append('`type` int(11) DEFAULT 0')
# # val: 记录的值
# f.append('`val` int(11) DEFAULT 0')
# # ts: 记录时间
# f.append('`ts` int(11) DEFAULT 0')
#
# self._db_exec(
# '创建系统运行状态记录表...',
# 'CREATE TABLE `{}sys_state` ({});'.format(self.db.table_prefix, ','.join(f))
# )
def _create_record(self):
""" 运维录像日志 """

View File

@ -41,6 +41,7 @@ class TPDatabase:
self.mysql_user = ''
self.mysql_password = ''
self.connected = False # 数据库是否已经连接上了
self.need_create = False # 数据尚未存在,需要创建
self.need_upgrade = False # 数据库已存在但版本较低,需要升级
self.current_ver = 0
@ -62,8 +63,8 @@ class TPDatabase:
cfg.set_default('database::sqlite-file', os.path.join(cfg.data_path, 'db', 'teleport.db'))
if not self._init_sqlite(cfg.database.sqlite_file):
return False
if self.need_create:
return True
# if self.need_create:
# return True
elif 'mysql' == cfg.database.type:
if not self._init_mysql(cfg.database.mysql_host, cfg.database.mysql_port,
cfg.database.mysql_db, cfg.database.mysql_prefix,
@ -73,13 +74,20 @@ class TPDatabase:
log.e('unknown database type `{}`, support sqlite/mysql now.\n'.format(cfg.database.type))
return False
return True
def connect(self):
if self._conn_pool.connect():
self.connected = True
def check_status(self):
# 看看数据库中是否存在指定的数据表(如果不存在,可能是一个空数据库文件),则可能是一个新安装的系统
# ret = self.query('SELECT COUNT(*) FROM `sqlite_master` WHERE `type`="table" AND `name`="{}account";'.format(self._table_prefix))
ret = self.is_table_exists('{}config'.format(self._table_prefix))
if ret is None or not ret:
log.w('database need create.\n')
self.need_create = True
return True
return
# 尝试从配置表中读取当前数据库版本号(如果不存在,说明是比较旧的版本了)
ret = self.query('SELECT `value` FROM `{}config` WHERE `name`="db_ver";'.format(self._table_prefix))
@ -91,12 +99,7 @@ class TPDatabase:
if self.current_ver < self.DB_VERSION:
log.w('database need upgrade.\n')
self.need_upgrade = True
return True
# DO TEST
# self.alter_table('ts_account', [['account_id', 'id'], ['account_type', 'type']])
return True
return
def load_system_config(self):
sys_cfg = dict()
@ -180,7 +183,6 @@ class TPDatabase:
return False
elif self.db_type == self.DB_TYPE_MYSQL:
ret = self.query('DESC `{}` `{}`;'.format(table_name, field_name))
print(ret)
if ret is None:
return None
if len(ret) == 0:
@ -325,6 +327,9 @@ class TPDatabasePool:
self._locker = threading.RLock()
self._connections = dict()
def connect(self):
return False if self._get_connect() is None else True
def query(self, sql, args):
_conn = self._get_connect()
if _conn is None:
@ -471,6 +476,21 @@ class TPMysqlPool(TPDatabasePool):
log.e('[mysql] connect [{}:{}] failed: {}\n'.format(self._host, self._port, e.__str__()))
return None
def _reconnect(self):
log.w('[mysql] lost connection, reconnect.\n')
with self._locker:
thread_id = threading.get_ident()
if thread_id not in self._connections:
log.e('[mysql] database pool internal error.\n')
return None
_conn = self._do_connect()
if _conn is not None:
self._connections[thread_id] = _conn
return _conn
else:
del self._connections[thread_id]
return None
def _do_query(self, conn, sql, args):
for retry in range(2):
cursor = conn.cursor()
@ -485,19 +505,18 @@ class TPMysqlPool(TPDatabasePool):
log.v('[mysql] SQL={}\n'.format(sql))
log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__()))
return None
conn = self._reconnect()
if conn is None:
return None
except pymysql.err.InterfaceError as e:
if retry == 1:
log.v('[mysql] SQL={}\n'.format(sql))
log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__()))
return None
conn = self._reconnect()
if conn is None:
return None
log.w('[mysql] lost connection, reconnect.\n')
with self._locker:
thread_id = threading.get_ident()
if thread_id not in self._connections:
log.e('[mysql] database pool internal error.\n')
return None
_conn = self._do_connect()
if _conn is not None:
self._connections[thread_id] = _conn
conn = _conn
else:
return None
except Exception as e:
log.v('[mysql] SQL={}\n'.format(sql))
log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__()))
@ -518,19 +537,18 @@ class TPMysqlPool(TPDatabasePool):
log.v('[mysql] SQL={}\n'.format(sql))
log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__()))
return None
conn = self._reconnect()
if conn is None:
return None
log.w('[mysql] lost connection, reconnect.\n')
with self._locker:
thread_id = threading.get_ident()
if thread_id not in self._connections:
log.e('[mysql] database pool internal error.\n')
return None
_conn = self._do_connect()
if _conn is not None:
self._connections[thread_id] = _conn
conn = _conn
else:
return None
except pymysql.err.InterfaceError as e:
if retry == 1:
log.v('[mysql] SQL={}\n'.format(sql))
log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__()))
return None
conn = self._reconnect()
if conn is None:
return None
except Exception as e:
log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__()))
@ -553,19 +571,17 @@ class TPMysqlPool(TPDatabasePool):
if retry == 1 or errno not in [2006, 2013]:
log.e('[mysql] _do_transaction() failed: {}\n'.format(e.__str__()))
return False
conn = self._reconnect()
if conn is None:
return None
log.w('[mysql] lost connection, reconnect.\n')
with self._locker:
thread_id = threading.get_ident()
if thread_id not in self._connections:
log.e('[mysql] database pool internal error.\n')
return False
_conn = self._do_connect()
if _conn is not None:
self._connections[thread_id] = _conn
conn = _conn
else:
return False
except pymysql.err.InterfaceError as e:
if retry == 1:
log.e('[mysql] _do_transaction() failed: {}\n'.format(e.__str__()))
return None
conn = self._reconnect()
if conn is None:
return None
except Exception as e:
conn.rollback()

View File

@ -3,18 +3,15 @@
import datetime
import threading
from app.base.logger import log
from app.base.configs import tp_cfg
from app.base.cron import tp_corn
# class SessionManager(threading.Thread):
class SessionManager(object):
# SESSION_EXPIRE = 3600 # 60*60 默认超时时间为1小时
_expire = 3600
def __init__(self):
# super().__init__(name='session-manager-thread')
super().__init__()
import builtins
@ -31,38 +28,13 @@ class SessionManager(object):
def init(self):
self.update_default_expire()
tp_corn().add_job('session_expire', self._check_expire, first_interval_seconds=None, interval_seconds=60)
return True
def update_default_expire(self):
self._expire = tp_cfg().sys.login.session_timeout * 60
# def stop(self):
# self._stop_flag = True
# self._timer_cond.acquire()
# self._timer_cond.notify()
# self._timer_cond.release()
# self.join()
# log.v('{} stopped.\n'.format(self.name))
# def run(self):
# while True:
# self._timer_cond.acquire()
# # 每隔一分钟醒来检查一次超时的会话
# self._timer_cond.wait(60)
# self._timer_cond.release()
# if self._stop_flag:
# break
#
# _now = int(datetime.datetime.utcnow().timestamp())
# with self._lock:
# _keys = [k for k in self._session_dict]
# for k in _keys:
# if self._session_dict[k]['e'] == 0:
# continue
# if _now - self._session_dict[k]['t'] > self._session_dict[k]['e']:
# del self._session_dict[k]
def check_expire(self):
def _check_expire(self):
_now = int(datetime.datetime.utcnow().timestamp())
with self._lock:
_keys = [k for k in self._session_dict]

View File

@ -2,6 +2,7 @@
import json
import os
import time
import urllib.parse
import urllib.request
@ -64,6 +65,8 @@ class WebApp:
log.i('###############################################################\n')
log.i('Web Server starting ...\n')
tp_corn().init()
# 尝试通过CORE-JSON-RPC获取core服务的配置主要是ssh/rdp/telnet的端口以及录像文件存放路径
self._get_core_server_config()
@ -72,8 +75,15 @@ class WebApp:
log.e('can not initialize database interface.\n')
return 0
_db.connect()
while not _db.connected:
log.w('database not connected, retry after 5 seconds.\n')
time.sleep(5)
_db.connect()
cfg = tp_cfg()
_db.check_status()
if _db.need_create or _db.need_upgrade:
cfg.app_mode = APP_MODE_MAINTENANCE
tp_cfg().update_sys(None)
@ -134,20 +144,7 @@ class WebApp:
return 0
# 启动定时任务调度器
tp_corn().init()
tp_corn().start()
# 启动session超时管理
# tp_session().start()
# def job():
# log.v('---job--\n')
# tp_corn().add_job('test', job, first_interval_seconds=None, interval_seconds=10)
# tp_sys_status().init()
# tp_sys_status().start()
tp_corn().add_job('session_expire', tp_session().check_expire, first_interval_seconds=None, interval_seconds=60)
# tp_corn().add_job('sys_status', tp_sys_status().check_status, first_interval_seconds=5, interval_seconds=5)
try:
tornado.ioloop.IOLoop.instance().start()
@ -155,9 +152,6 @@ class WebApp:
log.e('\n')
tp_corn().stop()
# tp_sys_status().stop()
# tp_session().stop()
return 0

View File

@ -103,7 +103,6 @@ class WebSocketHandler(tornado.websocket.WebSocketHandler):
# 处理新的连接
k = '{}-{}'.format('user', sid)
_user = tp_session().get(k, None)
print(_user)
if _user is None:
ret = {'code': TPE_NEED_LOGIN, 'message': '需要登录'}
self.write_message(json.dumps(ret))

View File

@ -275,8 +275,6 @@ def make_group_map(gtype, gm):
def get_groups(sql_filter, sql_order, sql_limit, sql_restrict, sql_exclude):
print(sql_filter)
dbtp = get_db().table_prefix
s = SQL(get_db())
s.select_from('group', ['id', 'state', 'name', 'desc'], alt_name='g')

View File

@ -118,7 +118,6 @@ def add_host(handler, args):
def remove_hosts(handler, hosts):
print('----', hosts)
db = get_db()
host_ids = ','.join([str(i) for i in hosts])