diff --git a/apps/common/utils/encode.py b/apps/common/utils/encode.py index fc9467cba..d4e9c4cbe 100644 --- a/apps/common/utils/encode.py +++ b/apps/common/utils/encode.py @@ -182,3 +182,7 @@ def encrypt_password(password, salt=None): def get_signer(): signer = Signer(settings.SECRET_KEY) return signer + + +def ensure_last_char_is_ascii(data): + remain = '' diff --git a/apps/ops/ws.py b/apps/ops/ws.py index 1d2bea3c5..d6bff86a7 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -3,11 +3,13 @@ import os import threading import json -from celery.result import AsyncResult +from common.utils import get_logger from .celery.utils import get_celery_task_log_path from channels.generic.websocket import JsonWebsocketConsumer +logger = get_logger(__name__) + class CeleryLogWebsocket(JsonWebsocketConsumer): disconnected = False @@ -22,6 +24,7 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): self.handle_task(task_id) def handle_task(self, task_id): + logger.info("Task id: {}".format(task_id)) log_path = get_celery_task_log_path(task_id) def func(): @@ -34,19 +37,24 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): continue self.send_json({'message': '\r\n'}) try: - task_log_f = open(log_path) + logger.debug('Task log path: {}'.format(log_path)) + task_log_f = open(log_path, 'rb') break except OSError: return + if not task_log_f: + return + while not self.disconnected: data = task_log_f.readline() + if data: - data = data.replace('\n', '\r\n') - self.send_json({'message': data, 'task': task_id}) - if data.startswith('Task') and data.find('succeeded'): + data = data.replace(b'\n', b'\r\n') + self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) + if data.startswith(b'Task') and data.find(b'succeeded'): break - time.sleep(0.2) + time.sleep(0.1) task_log_f.close() thread = threading.Thread(target=func) thread.start()