diff --git a/apps/ops/ws.py b/apps/ops/ws.py index d6bff86a7..cd2fa33bb 100644 --- a/apps/ops/ws.py +++ b/apps/ops/ws.py @@ -23,40 +23,46 @@ class CeleryLogWebsocket(JsonWebsocketConsumer): if task_id: self.handle_task(task_id) + def wait_util_log_path_exist(self, task_id): + log_path = get_celery_task_log_path(task_id) + while not self.disconnected: + if not os.path.exists(log_path): + self.send_json({'message': '.', 'task': task_id}) + time.sleep(0.5) + continue + self.send_json({'message': '\r\n'}) + try: + logger.debug('Task log path: {}'.format(log_path)) + task_log_f = open(log_path, 'rb') + return task_log_f + except OSError: + return None + + def read_log_file(self, task_id): + task_log_f = self.wait_util_log_path_exist(task_id) + if not task_log_f: + return + + task_end_mark = [] + while not self.disconnected: + data = task_log_f.read(4096) + if data: + data = data.replace(b'\n', b'\r\n') + self.send_json( + {'message': data.decode(errors='ignore'), 'task': task_id}) + if data.find(b'succeeded in') != -1: + task_end_mark.append(1) + if data.find(bytes(task_id, 'utf8')) != -1: + task_end_mark.append(1) + elif len(task_end_mark) == 2: + logger.debug('Task log end: {}'.format(task_id)) + break + time.sleep(0.2) + task_log_f.close() + def handle_task(self, task_id): logger.info("Task id: {}".format(task_id)) - log_path = get_celery_task_log_path(task_id) - - def func(): - task_log_f = None - - while not self.disconnected: - if not os.path.exists(log_path): - self.send_json({'message': '.', 'task': task_id}) - time.sleep(0.5) - continue - self.send_json({'message': '\r\n'}) - try: - logger.debug('Task log path: {}'.format(log_path)) - task_log_f = open(log_path, 'rb') - break - except OSError: - return - - if not task_log_f: - return - - while not self.disconnected: - data = task_log_f.readline() - - if data: - data = data.replace(b'\n', b'\r\n') - self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) - if data.startswith(b'Task') and data.find(b'succeeded'): - break - time.sleep(0.1) - task_log_f.close() - thread = threading.Thread(target=func) + thread = threading.Thread(target=self.read_log_file, args=(task_id,)) thread.start() def disconnect(self, close_code):