|
|
|
@ -23,47 +23,46 @@ class CeleryLogWebsocket(JsonWebsocketConsumer):
|
|
|
|
|
if task_id: |
|
|
|
|
self.handle_task(task_id) |
|
|
|
|
|
|
|
|
|
def handle_task(self, task_id): |
|
|
|
|
logger.info("Task id: {}".format(task_id)) |
|
|
|
|
def wait_util_log_path_exist(self, task_id): |
|
|
|
|
log_path = get_celery_task_log_path(task_id) |
|
|
|
|
while not self.disconnected: |
|
|
|
|
if not os.path.exists(log_path): |
|
|
|
|
self.send_json({'message': '.', 'task': task_id}) |
|
|
|
|
time.sleep(0.5) |
|
|
|
|
continue |
|
|
|
|
self.send_json({'message': '\r\n'}) |
|
|
|
|
try: |
|
|
|
|
logger.debug('Task log path: {}'.format(log_path)) |
|
|
|
|
task_log_f = open(log_path, 'rb') |
|
|
|
|
return task_log_f |
|
|
|
|
except OSError: |
|
|
|
|
return None |
|
|
|
|
|
|
|
|
|
def read_log_file(self, task_id): |
|
|
|
|
task_log_f = self.wait_util_log_path_exist(task_id) |
|
|
|
|
if not task_log_f: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
task_end_mark = [] |
|
|
|
|
while not self.disconnected: |
|
|
|
|
data = task_log_f.read(4096) |
|
|
|
|
if data: |
|
|
|
|
data = data.replace(b'\n', b'\r\n') |
|
|
|
|
self.send_json( |
|
|
|
|
{'message': data.decode(errors='ignore'), 'task': task_id}) |
|
|
|
|
if data.find(b'succeeded in') != -1: |
|
|
|
|
task_end_mark.append(1) |
|
|
|
|
if data.find(bytes(task_id, 'utf8')) != -1: |
|
|
|
|
task_end_mark.append(1) |
|
|
|
|
elif len(task_end_mark) == 2: |
|
|
|
|
logger.debug('Task log end: {}'.format(task_id)) |
|
|
|
|
break |
|
|
|
|
time.sleep(0.2) |
|
|
|
|
task_log_f.close() |
|
|
|
|
|
|
|
|
|
def func(): |
|
|
|
|
task_log_f = None |
|
|
|
|
|
|
|
|
|
while not self.disconnected: |
|
|
|
|
if not os.path.exists(log_path): |
|
|
|
|
self.send_json({'message': '.', 'task': task_id}) |
|
|
|
|
time.sleep(0.5) |
|
|
|
|
continue |
|
|
|
|
self.send_json({'message': '\r\n'}) |
|
|
|
|
try: |
|
|
|
|
logger.debug('Task log path: {}'.format(log_path)) |
|
|
|
|
task_log_f = open(log_path, 'rb') |
|
|
|
|
break |
|
|
|
|
except OSError: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
if not task_log_f: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
task_end_mark = [] |
|
|
|
|
|
|
|
|
|
while not self.disconnected: |
|
|
|
|
data = task_log_f.read(4096) |
|
|
|
|
|
|
|
|
|
if data: |
|
|
|
|
data = data.replace(b'\n', b'\r\n') |
|
|
|
|
self.send_json({'message': data.decode(errors='ignore'), 'task': task_id}) |
|
|
|
|
if data.find(b'succeeded in') != -1: |
|
|
|
|
task_end_mark.append(1) |
|
|
|
|
if data.find(bytes(task_id, 'utf8')) != -1: |
|
|
|
|
task_end_mark.append(1) |
|
|
|
|
if len(task_end_mark) == 2: |
|
|
|
|
logger.debug('Task log end: {}'.format(task_id)) |
|
|
|
|
break |
|
|
|
|
time.sleep(0.1) |
|
|
|
|
task_log_f.close() |
|
|
|
|
thread = threading.Thread(target=func) |
|
|
|
|
def handle_task(self, task_id): |
|
|
|
|
logger.info("Task id: {}".format(task_id)) |
|
|
|
|
thread = threading.Thread(target=self.read_log_file, args=(task_id,)) |
|
|
|
|
thread.start() |
|
|
|
|
|
|
|
|
|
def disconnect(self, close_code): |
|
|
|
|