diff --git a/apps/ops/celery/beat/__init__.py b/apps/ops/celery/beat/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/ops/celery/beat/schedulers.py b/apps/ops/celery/beat/schedulers.py new file mode 100644 index 000000000..7be8d1cda --- /dev/null +++ b/apps/ops/celery/beat/schedulers.py @@ -0,0 +1,80 @@ +import logging +from celery.utils.log import get_logger +from django.db import close_old_connections +from django.core.exceptions import ObjectDoesNotExist +from django.db.utils import DatabaseError, InterfaceError + +from django_celery_beat.schedulers import DatabaseScheduler as DJDatabaseScheduler + +logger = get_logger(__name__) +debug, info, warning = logger.debug, logger.info, logger.warning + + +__all__ = ['DatabaseScheduler'] + + +class DatabaseScheduler(DJDatabaseScheduler): + + def sync(self): + if logger.isEnabledFor(logging.DEBUG): + debug('Writing entries...') + _tried = set() + _failed = set() + try: + close_old_connections() + + while self._dirty: + name = self._dirty.pop() + try: + # 源码 + # self.schedule[name].save() + # _tried.add(name) + + """ + ::Debug Description (2023.07.10):: + + 如果调用 self.schedule 可能会导致 self.save() 方法之前重新获取数据库中的数据, 而不是临时设置的 last_run_at 数据 + + 如果这里调用 self.schedule + 那么可能会导致调用 save 的 self.schedule[name] 的 last_run_at 是从数据库中获取回来的老数据 + 而不是任务执行后临时设置的 last_run_at (在 __next__() 方法中设置的) + 当 `max_interval` 间隔之后, 下一个任务检测周期还是会再次执行任务 + + ::Demo:: + 任务信息: + beat config: max_interval = 60s + + 任务名称: cap + 任务执行周期: 每 3 分钟执行一次 + 任务最后执行时间: 18:00 + + 任务第一次执行: 18:03 (执行时设置 last_run_at = 18:03, 此时在内存中) + + 任务执行完成后, + 检测到需要 sync, sync 中调用了 self.schedule, + self.schedule 中发现 schedule_changed() 为 True, 需要调用 all_as_schedule() + 此时,sync 中调用的 self.schedule[name] 的 last_run_at 是 18:00 + 这时候在 self.sync() 进行 self.save() + + + beat: Waking up 60s ... + + 任务第二次执行: 18:04 (因为获取回来的 last_run_at 是 18:00, entry.is_due() == True) + + ::解决方法:: + 所以这里为了避免从数据库中获取,直接使用 _schedule # + """ + self._schedule[name].save() + _tried.add(name) + except (KeyError, ObjectDoesNotExist): + _failed.add(name) + except DatabaseError as exc: + logger.exception('Database error while sync: %r', exc) + except InterfaceError: + warning( + 'DatabaseScheduler: InterfaceError in sync(), ' + 'waiting to retry in next call...' + ) + finally: + # retry later, only for the failed ones + self._dirty |= _failed diff --git a/utils/start_celery_beat.py b/utils/start_celery_beat.py index 23aa1cb83..19d578e70 100644 --- a/utils/start_celery_beat.py +++ b/utils/start_celery_beat.py @@ -54,7 +54,7 @@ else: connection_params['port'] = settings.REDIS_PORT redis_client = Redis(**connection_params) -scheduler = "django_celery_beat.schedulers:DatabaseScheduler" +scheduler = "ops.celery.beat.schedulers:DatabaseScheduler" processes = [] cmd = [ 'celery',