From 7ac9681f0dfb8ccfd108d1921772ca54447d90f9 Mon Sep 17 00:00:00 2001 From: Eric Date: Wed, 16 Nov 2022 14:27:50 +0800 Subject: [PATCH] perf: asyncio ws task log --- apps/ops/ws.py | 107 +++++++++++++++------------------- requirements/requirements.txt | 1 + 2 files changed, 49 insertions(+), 59 deletions(-) diff --git a/apps/ops/ws.py b/apps/ops/ws.py index 94d71d90d..473093ba2 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -1,18 +1,18 @@ -import time +import asyncio import os -import threading -import json -from channels.generic.websocket import JsonWebsocketConsumer -from common.utils import get_logger +import aiofiles +from channels.generic.websocket import AsyncJsonWebsocketConsumer + from common.db.utils import close_old_connections -from .celery.utils import get_celery_task_log_path +from common.utils import get_logger from .ansible.utils import get_ansible_task_log_path +from .celery.utils import get_celery_task_log_path logger = get_logger(__name__) -class TaskLogWebsocket(JsonWebsocketConsumer): +class TaskLogWebsocket(AsyncJsonWebsocketConsumer): disconnected = False log_types = { @@ -20,70 +20,59 @@ class TaskLogWebsocket(JsonWebsocketConsumer): 'ansible': get_ansible_task_log_path } - def connect(self): + async def connect(self): user = self.scope["user"] if user.is_authenticated: - self.accept() + await self.accept() else: - self.close() + await self.close() - def get_log_path(self, task_id): - func = self.log_types.get(self.log_type) + def get_log_path(self, task_id, log_type): + func = self.log_types.get(log_type) if func: return func(task_id) - def receive(self, text_data=None, bytes_data=None, **kwargs): - data = json.loads(text_data) - task_id = data.get('task') - self.log_type = data.get('type', 'celery') - if task_id: - self.handle_task(task_id) + async def receive_json(self, content, **kwargs): + task_id = content.get('task') + task_typ = content.get('type', 'celery') + log_path = self.get_log_path(task_id, task_typ) + await self.async_handle_task(task_id, log_path) - def wait_util_log_path_exist(self, task_id): - log_path = self.get_log_path(task_id) + async def async_handle_task(self, task_id, log_path): + logger.info("Task id: {}".format(task_id)) while not self.disconnected: if not os.path.exists(log_path): - self.send_json({'message': '.', 'task': task_id}) - time.sleep(0.5) - continue - self.send_json({'message': '\r\n'}) - try: - logger.debug('Task log path: {}'.format(log_path)) - task_log_f = open(log_path, 'rb') - return task_log_f - except OSError: - return None - - def read_log_file(self, task_id): - task_log_f = self.wait_util_log_path_exist(task_id) - if not task_log_f: - logger.debug('Task log file is None: {}'.format(task_id)) - return - - task_end_mark = [] - while not self.disconnected: - data = task_log_f.read(4096) - if data: - data = data.replace(b'\n', b'\r\n') - self.send_json( - {'message': data.decode(errors='ignore'), 'task': task_id} - ) - if data.find(b'succeeded in') != -1: - task_end_mark.append(1) - if data.find(bytes(task_id, 'utf8')) != -1: - task_end_mark.append(1) - elif len(task_end_mark) == 2: - logger.debug('Task log end: {}'.format(task_id)) + await self.send_json({'message': '.', 'task': task_id}) + await asyncio.sleep(0.5) + else: + await self.send_task_log(task_id, log_path) break - time.sleep(0.2) - task_log_f.close() - def handle_task(self, task_id): - logger.info("Task id: {}".format(task_id)) - thread = threading.Thread(target=self.read_log_file, args=(task_id,)) - thread.start() + async def send_task_log(self, task_id, log_path): + await self.send_json({'message': '\r\n'}) + try: + logger.debug('Task log path: {}'.format(log_path)) + task_end_mark = [] + async with aiofiles.open(log_path, 'rb') as task_log_f: + while not self.disconnected: + data = await task_log_f.read(4096) + if data: + data = data.replace(b'\n', b'\r\n') + await self.send_json( + {'message': data.decode(errors='ignore'), 'task': task_id} + ) + if data.find(b'succeeded in') != -1: + task_end_mark.append(1) + if data.find(bytes(task_id, 'utf8')) != -1: + task_end_mark.append(1) + elif len(task_end_mark) == 2: + logger.debug('Task log end: {}'.format(task_id)) + break + await asyncio.sleep(0.2) + except OSError as e: + logger.warn('Task log path open failed: {}'.format(e)) - def disconnect(self, close_code): + async def disconnect(self, close_code): self.disconnected = True - self.close() + await self.close() close_old_connections() diff --git a/requirements/requirements.txt b/requirements/requirements.txt index 8af2fcb0f..659add611 100644 --- a/requirements/requirements.txt +++ b/requirements/requirements.txt @@ -1,3 +1,4 @@ +aiofiles==22.1.0 amqp==5.0.9 ansible==6.4.0 ansible-runner==2.2.1