From 0dbf2ab2e5ef5a97003b1a5aa5e3cdff1b45680d Mon Sep 17 00:00:00 2001 From: ibuler Date: Tue, 22 Oct 2019 19:45:03 +0800 Subject: [PATCH 1/2] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9websocket?= =?UTF-8?q?=E8=AF=BB=E5=8F=96=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/ws.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/apps/ops/ws.py b/apps/ops/ws.py index d6bff86a7..7d318bf85 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -46,13 +46,20 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): if not task_log_f: return + task_end_mark = [] + while not self.disconnected: - data = task_log_f.readline() + data = task_log_f.read(4096) if data: data = data.replace(b'\n', b'\r\n') self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) - if data.startswith(b'Task') and data.find(b'succeeded'): + if data.find(b'succeeded in') != -1: + task_end_mark.append(1) + if data.find(bytes(task_id, 'utf8')) != -1: + task_end_mark.append(1) + if len(task_end_mark) == 2: + logger.debug('Task log end: {}'.format(task_id)) break time.sleep(0.1) task_log_f.close() From c995050ed7328d56389d4e2849b9afd8817620ce Mon Sep 17 00:00:00 2001 From: ibuler Date: Wed, 23 Oct 2019 10:14:07 +0800 Subject: [PATCH 2/2] =?UTF-8?q?[Update]=20=E4=BF=AE=E6=94=B9ws=20read=20ce?= =?UTF-8?q?lery=20log=20file?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/ops/ws.py | 77 +++++++++++++++++++++++++------------------------- 1 file changed, 38 insertions(+), 39 deletions(-) diff --git a/apps/ops/ws.py b/apps/ops/ws.py index 7d318bf85..cd2fa33bb 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -23,47 +23,46 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): if task_id: self.handle_task(task_id) + def wait_util_log_path_exist(self, task_id): + log_path = get_celery_task_log_path(task_id) + while not self.disconnected: + if not os.path.exists(log_path): + self.send_json({'message': '.', 'task': task_id}) + time.sleep(0.5) + continue + self.send_json({'message': '\r\n'}) + try: + logger.debug('Task log path: {}'.format(log_path)) + task_log_f = open(log_path, 'rb') + return task_log_f + except OSError: + return None + + def read_log_file(self, task_id): + task_log_f = self.wait_util_log_path_exist(task_id) + if not task_log_f: + return + + task_end_mark = [] + while not self.disconnected: + data = task_log_f.read(4096) + if data: + data = data.replace(b'\n', b'\r\n') + self.send_json( + {'message': data.decode(errors='ignore'), 'task': task_id}) + if data.find(b'succeeded in') != -1: + task_end_mark.append(1) + if data.find(bytes(task_id, 'utf8')) != -1: + task_end_mark.append(1) + elif len(task_end_mark) == 2: + logger.debug('Task log end: {}'.format(task_id)) + break + time.sleep(0.2) + task_log_f.close() + def handle_task(self, task_id): logger.info("Task id: {}".format(task_id)) - log_path = get_celery_task_log_path(task_id) - - def func(): - task_log_f = None - - while not self.disconnected: - if not os.path.exists(log_path): - self.send_json({'message': '.', 'task': task_id}) - time.sleep(0.5) - continue - self.send_json({'message': '\r\n'}) - try: - logger.debug('Task log path: {}'.format(log_path)) - task_log_f = open(log_path, 'rb') - break - except OSError: - return - - if not task_log_f: - return - - task_end_mark = [] - - while not self.disconnected: - data = task_log_f.read(4096) - - if data: - data = data.replace(b'\n', b'\r\n') - self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) - if data.find(b'succeeded in') != -1: - task_end_mark.append(1) - if data.find(bytes(task_id, 'utf8')) != -1: - task_end_mark.append(1) - if len(task_end_mark) == 2: - logger.debug('Task log end: {}'.format(task_id)) - break - time.sleep(0.1) - task_log_f.close() - thread = threading.Thread(target=func) + thread = threading.Thread(target=self.read_log_file, args=(task_id,)) thread.start() def disconnect(self, close_code):