pull/418/head
vapao 2021-12-08 15:26:55 +08:00
parent 71776a4228
commit 975c632c98
1 changed files with 4 additions and 3 deletions

View File

@ -35,7 +35,6 @@ class Worker:
while True: while True:
time.sleep((counter or 1) ** 3 * 10) time.sleep((counter or 1) ** 3 * 10)
qsize = self._executor._work_queue.qsize() qsize = self._executor._work_queue.qsize()
logging.warning(f'do check: {counter} qsize: {qsize}')
if qsize > 0: if qsize > 0:
if counter > 0: if counter > 0:
content = '请检查监控、任务计划或批量执行等避免长耗时任务,必要时可重启服务清空队列。' content = '请检查监控、任务计划或批量执行等避免长耗时任务,必要时可重启服务清空队列。'
@ -52,7 +51,7 @@ class Worker:
def run(self): def run(self):
logging.warning('Running worker') logging.warning('Running worker')
Thread(target=self.queue_monitor).start() Thread(target=self.queue_monitor, daemon=True).start()
self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY) self.rds.delete(EXEC_WORKER_KEY, MONITOR_WORKER_KEY, SCHEDULE_WORKER_KEY)
while True: while True:
key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
@ -61,8 +60,10 @@ class Worker:
future = self._executor.submit(schedule_worker_handler, job) future = self._executor.submit(schedule_worker_handler, job)
elif key == MONITOR_WORKER_KEY: elif key == MONITOR_WORKER_KEY:
future = self._executor.submit(monitor_worker_handler, job) future = self._executor.submit(monitor_worker_handler, job)
else: elif key == EXEC_WORKER_KEY:
future = self._executor.submit(exec_worker_handler, job) future = self._executor.submit(exec_worker_handler, job)
else:
continue
future.add_done_callback(self.job_done) future.add_done_callback(self.job_done)