web服务停止时关闭所有打开的助手websocket连接。

feature/assist-websocket
Apex Liu 2022-05-16 22:54:52 +08:00
parent bffefe8965
commit f817b79289
3 changed files with 19 additions and 34 deletions

View File

@ -4,7 +4,7 @@ from typing import Optional, Dict
import json
import threading
import app.controller.ws
# import app.controller.ws
from app.const import *
from app.base.utils import tp_unique_id, tp_timestamp_sec
from app.base.logger import *
@ -39,8 +39,10 @@ class AssistMessage(object):
self.method: str = method
# 命令发送给被调用端时的时间戳,错过一定时间未完结的命令,将会被扔掉
self.start_time: int = tp_timestamp_sec()
self.caller: app.controller.ws.AssistHandler = caller
self.callee: Optional[app.controller.ws.AssistHandler] = None
# self.caller: app.controller.ws.AssistHandler = caller
# self.callee: Optional[app.controller.ws.AssistHandler] = None
self.caller = caller
self.callee = None
def send_request(self, callee, param=None):
self.callee = callee
@ -92,6 +94,12 @@ class TPAssistBridge(object):
# 未完结的命令
self._commands: Dict[int, AssistMessage] = dict()
def finalize(self):
# stop all websocket when stop web-server.
with self._lock:
for caller in self._ws_web:
caller.close()
def get_assist_bridge(self, s_id):
with self._lock:
assist_id = self._sid_to_assist[s_id] if s_id in self._sid_to_assist else 0
@ -182,7 +190,7 @@ class TPAssistBridge(object):
caller.send_response(assist_msg, TPE_OK, data=param)
def on_disconnect(self, caller):
print('assist-ws-disconnect:', caller)
log.d('assist-ws-disconnect:', caller.assist_id)
with self._lock:
if caller.client_type == AssistInfo.WS_CLIENT_WEB:
if caller in self._ws_web:
@ -241,7 +249,7 @@ class TPAssistBridge(object):
msg_req.caller.send_response(msg_req, msg['code'], msg['message'], msg['data'])
# remove finished message
log.v('remove message, cmd_id={}\n'.format(msg_req.cmd_id))
log.d('remove message, cmd_id={}\n'.format(msg_req.cmd_id))
del self._commands[msg_req.cmd_id]

View File

@ -25,6 +25,7 @@ from app.base.stats import tp_stats
from app.base.host_alive import tp_host_alive
from app.base.utils import tp_generate_random
from app.app_ver import TP_SERVER_VER
from app.base.assist_bridge import tp_assist_bridge
class WebApp:
@ -214,6 +215,8 @@ class WebApp:
except:
log.e('\n')
tp_assist_bridge().finalize()
if tp_cfg().common.check_host_alive:
tp_host_alive().stop()
tp_cron().stop()

View File

@ -47,12 +47,11 @@ class AssistHandler(tornado.websocket.WebSocketHandler):
self.client_type: int = AssistInfo.WS_CLIENT_UNKNOWN
self.assist_id: int = 0
self.close()
def check_origin(self, origin): # 针对websocket处理类重写同源检查的方法
# print('ws-assist origin:', origin)
return True
# todo: send_request()/send_response()...
def send_request(self, msg: AssistMessage, param=None):
if param is None:
param = {}
@ -82,27 +81,6 @@ class AssistHandler(tornado.websocket.WebSocketHandler):
def set_assist_id(self, assist_id: int) -> None:
self.assist_id = assist_id
# def write_json(self, code, message='', data=None):
# if not isinstance(code, int):
# raise RuntimeError('`code` must be a integer.')
# if not isinstance(message, str):
# raise RuntimeError('`msg` must be a string.')
#
# if data is None:
# data = dict()
#
# _ret = {
# 'type': AssistMessage.MESSAGE_TYPE_RESPONSE,
# 'command_id': 0,
# 'method': '',
# 'code': code,
# 'message': message,
# 'data': data
# }
# log.w('send ws message: {}\n'.format(json_encode(_ret)))
# self.write_message(json_encode(_ret))
# 接受websocket连接保存连接实例
def open(self, message):
log.w('message on_open: {}\n'.format(message))
self.on_message(message)
@ -111,7 +89,7 @@ class AssistHandler(tornado.websocket.WebSocketHandler):
tp_assist_bridge().on_disconnect(self)
def on_message(self, message):
log.w('raw on_message: {}\n'.format(message))
log.d('raw on_message: {}\n'.format(message))
msg_req = AssistMessage(self, 'UNKNOWN')
try:
@ -152,14 +130,13 @@ class AssistHandler(tornado.websocket.WebSocketHandler):
return self.send_response(msg_req, TPE_PARAM)
else:
# return self.write_json(TPE_PARAM, message='未知的操作:{}'.format(param['method']))
tp_assist_bridge().on_request(msg_req, param)
elif msg['type'] == AssistMessage.MESSAGE_TYPE_RESPONSE:
if 'command_id' not in msg or msg['command_id'] == 0:
log.e('invalid response, need `command_id`: {}\n'.format(message))
return
log.v('forward: {}\n'.format(message))
log.d('forward: {}\n'.format(message))
tp_assist_bridge().forward_response(self, msg)
else:
log.e('unknown `type` field in message: {}\n'.format(message))
@ -168,14 +145,11 @@ class AssistHandler(tornado.websocket.WebSocketHandler):
def _on_web_client_connected(self, msg_req, param):
s_id = self.get_cookie('_sid')
if s_id is None:
# return self.write_json(TPE_NEED_LOGIN, '需要登录')
return self.send_response(msg_req, TPE_NEED_LOGIN, '需要登录')
k = 'user-{}'.format(s_id)
user_info = tp_session().get(k, None)
if user_info is None or not user_info['_is_login']:
# return self.write_json(TPE_NEED_LOGIN, '需要登录')
return self.send_response(msg_req, TPE_NEED_LOGIN, '需要登录')
# print('user-info:', user_info)
tp_assist_bridge().on_web_client_connect(msg_req, s_id, param)