diff --git a/server/www/teleport/app/eom_app/app/core.py b/server/www/teleport/app/eom_app/app/core.py index fcb89ef..a452e79 100644 --- a/server/www/teleport/app/eom_app/app/core.py +++ b/server/www/teleport/app/eom_app/app/core.py @@ -119,12 +119,9 @@ class WebServerCore: # 启动session超时管理 web_session().start() - # 启动数据库定时事务(例如MySQL防丢失连接) - get_db().start_keep_alive() tornado.ioloop.IOLoop.instance().start() - get_db().stop_keep_alive() web_session().stop() return 0 diff --git a/server/www/teleport/app/eom_app/app/db.py b/server/www/teleport/app/eom_app/app/db.py index 67cd325..7279cab 100644 --- a/server/www/teleport/app/eom_app/app/db.py +++ b/server/www/teleport/app/eom_app/app/db.py @@ -46,10 +46,6 @@ class TPDatabase: self._table_prefix = '' self._conn_pool = None - self._stop_flag = False - self._thread_keep_alive_handle = None - self._thread_keep_alive_cond = threading.Condition() - @property def table_prefix(self): return self._table_prefix @@ -95,30 +91,6 @@ class TPDatabase: return True - def start_keep_alive(self): - self._thread_keep_alive_handle = threading.Thread(target=self._thread_keep_alive) - self._thread_keep_alive_handle.start() - - def stop_keep_alive(self): - self._stop_flag = True - self._thread_keep_alive_cond.acquire() - self._thread_keep_alive_cond.notify() - self._thread_keep_alive_cond.release() - if self._thread_keep_alive_handle is not None: - self._thread_keep_alive_handle.join() - log.v('database-keep-alive-thread stopped.\n') - - def _thread_keep_alive(self): - while True: - self._thread_keep_alive_cond.acquire() - # 每一小时醒来执行一次查询,避免连接丢失 - self._thread_keep_alive_cond.wait(3600) - self._thread_keep_alive_cond.release() - if self._stop_flag: - break - - self.query('SELECT `value` FROM `{}config` WHERE `name`="db_ver";'.format(self._table_prefix)) - def _init_sqlite(self, db_file): self.db_type = self.DB_TYPE_SQLITE self.auto_increment = 'AUTOINCREMENT' @@ -462,51 +434,118 @@ class TPMysqlPool(TPDatabasePool): autocommit=False, connect_timeout=3.0, charset='utf8') + except pymysql.err.OperationalError as e: + errno, _ = e.args + if 2003 == errno: + log.e('[mysql] connect [{}:{}] failed: {}\n'.format(self._host, self._port, e.__str__())) + return None except Exception as e: log.e('[mysql] connect [{}:{}] failed: {}\n'.format(self._host, self._port, e.__str__())) return None def _do_query(self, conn, sql): - cursor = conn.cursor() - try: - cursor.execute(sql) - db_ret = cursor.fetchall() - conn.commit() - return db_ret - except Exception as e: - log.v('[mysql] SQL={}\n'.format(sql)) - log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__())) - return None - finally: - cursor.close() + for retry in range(2): + cursor = conn.cursor() + try: + cursor.execute(sql) + db_ret = cursor.fetchall() + conn.commit() + return db_ret + except pymysql.err.OperationalError as e: + errno, _ = e.args + if retry == 1 or errno not in [2006, 2013]: + log.v('[mysql] SQL={}\n'.format(sql)) + log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__())) + return None + + log.w('[mysql] lost connection, reconnect.\n') + with self._locker: + thread_id = threading.get_ident() + if thread_id not in self._connections: + log.e('[mysql] database pool internal error.\n') + return None + _conn = self._do_connect() + if _conn is not None: + self._connections[thread_id] = _conn + conn = _conn + else: + return None + except Exception as e: + log.v('[mysql] SQL={}\n'.format(sql)) + log.e('[mysql] _do_query() failed: {}\n'.format(e.__str__())) + return None + finally: + cursor.close() def _do_exec(self, conn, sql): - cursor = conn.cursor() - try: - cursor.execute(sql) - conn.commit() - return True - except Exception as e: - log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__())) - log.e('[mysql] SQL={}'.format(sql)) - return None - finally: - cursor.close() + for retry in range(2): + cursor = conn.cursor() + try: + cursor.execute(sql) + conn.commit() + return True + except pymysql.err.OperationalError as e: + errno, _ = e.args + if retry == 1 or errno not in [2006, 2013]: + log.v('[mysql] SQL={}\n'.format(sql)) + log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__())) + return None + + log.w('[mysql] lost connection, reconnect.\n') + with self._locker: + thread_id = threading.get_ident() + if thread_id not in self._connections: + log.e('[mysql] database pool internal error.\n') + return None + _conn = self._do_connect() + if _conn is not None: + self._connections[thread_id] = _conn + conn = _conn + else: + return None + + except Exception as e: + log.e('[mysql] _do_exec() failed: {}\n'.format(e.__str__())) + log.e('[mysql] SQL={}'.format(sql)) + return None + finally: + cursor.close() def _do_transaction(self, conn, sql_list): - cursor = conn.cursor() - try: - conn.begin() - for sql in sql_list: - cursor.execute(sql) - conn.commit() - return True - except Exception as e: - conn.rollback() - log.e('[mysql] _do_transaction() failed: {}\n'.format(e.__str__())) - return False - finally: - cursor.close() + for retry in range(2): + cursor = conn.cursor() + try: + conn.begin() + for sql in sql_list: + cursor.execute(sql) + conn.commit() + return True + except pymysql.err.OperationalError as e: + errno, _ = e.args + if retry == 1 or errno not in [2006, 2013]: + log.v('[mysql] SQL={}\n'.format(sql)) + log.e('[mysql] _do_transaction() failed: {}\n'.format(e.__str__())) + return False + + log.w('[mysql] lost connection, reconnect.\n') + with self._locker: + thread_id = threading.get_ident() + if thread_id not in self._connections: + log.e('[mysql] database pool internal error.\n') + return False + _conn = self._do_connect() + if _conn is not None: + self._connections[thread_id] = _conn + conn = _conn + else: + return False + + except Exception as e: + conn.rollback() + log.e('[mysql] _do_transaction() failed: {}\n'.format(e.__str__())) + return False + finally: + cursor.close() def _last_insert_id(self, conn): cursor = conn.cursor()