diff --git a/apps/assets/tasks/nodes_amount.py b/apps/assets/tasks/nodes_amount.py index 4d53be525..0ec0810a0 100644 --- a/apps/assets/tasks/nodes_amount.py +++ b/apps/assets/tasks/nodes_amount.py @@ -1,14 +1,13 @@ from celery import shared_task +from ops.celery.decorator import register_as_period_task from assets.utils import check_node_assets_amount from common.utils import get_logger -from common.utils.timezone import now logger = get_logger(__file__) -@shared_task() +@register_as_period_task(crontab='0 2 * * *') +@shared_task(queue='node_assets_amount') def check_node_assets_amount_celery_task(): - logger.info(f'>>> {now()} begin check_node_assets_amount_celery_task ...') check_node_assets_amount() - logger.info(f'>>> {now()} end check_node_assets_amount_celery_task ...') diff --git a/apps/assets/utils.py b/apps/assets/utils.py index 7fc5a19ac..f04c06d0b 100644 --- a/apps/assets/utils.py +++ b/apps/assets/utils.py @@ -1,5 +1,7 @@ # ~*~ coding: utf-8 ~*~ # +import time + from django.db.models import Q from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none @@ -12,15 +14,18 @@ logger = get_logger(__file__) def check_node_assets_amount(): for node in Node.objects.all(): + logger.info(f'Check node assets amount: {node}') assets_amount = Asset.objects.filter( Q(nodes__key__istartswith=f'{node.key}:') | Q(nodes=node) ).distinct().count() if node.assets_amount != assets_amount: - print(f'>>> wrong assets amount ' - f'{node.assets_amount} right is {assets_amount}') + logger.warn(f'Node wrong assets amount ' + f'{node.assets_amount} right is {assets_amount}') node.assets_amount = assets_amount node.save() + # 防止自检程序给数据库的压力太大 + time.sleep(2) def is_asset_exists_in_node(asset_pk, node_key): diff --git a/apps/ops/celery/__init__.py b/apps/ops/celery/__init__.py index b8ed56be1..0ded6bc52 100644 --- a/apps/ops/celery/__init__.py +++ b/apps/ops/celery/__init__.py @@ -29,16 +29,3 @@ configs["CELERY_ROUTES"] = { app.namespace = 'CELERY' app.conf.update(configs) app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) - -app.conf.beat_schedule = { - 'check-asset-permission-expired': { - 'task': 'perms.tasks.check_asset_permission_expired', - 'schedule': settings.PERM_EXPIRED_CHECK_PERIODIC, - 'args': () - }, - 'check-node-assets-amount': { - 'task': 'assets.tasks.nodes_amount.check_node_assets_amount_celery_task', - 'schedule': crontab(minute=0, hour=0), - 'args': () - }, -} diff --git a/apps/perms/tasks.py b/apps/perms/tasks.py index 7e940d594..fbf2ce8be 100644 --- a/apps/perms/tasks.py +++ b/apps/perms/tasks.py @@ -5,10 +5,12 @@ from datetime import timedelta from django.db import transaction from django.db.models import Q from django.db.transaction import atomic +from django.conf import settings from celery import shared_task from common.utils import get_logger from common.utils.timezone import now, dt_formater, dt_parser from users.models import User +from ops.celery.decorator import register_as_period_task from assets.models import Node from perms.models import RebuildUserTreeTask, AssetPermission from perms.utils.asset.user_permission import rebuild_user_mapping_nodes_if_need_with_lock, lock @@ -33,7 +35,8 @@ def dispatch_mapping_node_tasks(): rebuild_user_mapping_nodes_celery_task.delay(id) -@shared_task(queue='check_asset_perm_expired') +@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC) +@shared_task(queue='celery_check_asset_perm_expired') @atomic() def check_asset_permission_expired(): """ diff --git a/jms b/jms index 3969b5418..d3ede99ff 100755 --- a/jms +++ b/jms @@ -156,7 +156,10 @@ def is_running(s, unlink=True): def parse_service(s): web_services = ['gunicorn', 'flower', 'daphne'] - celery_services = ["celery_ansible", "celery_default", "celery_node_tree", "check_asset_perm_expired"] + celery_services = [ + "celery_ansible", "celery_default", "celery_node_tree", + "celery_check_asset_perm_expired", "celery_node_assets_amount" + ] task_services = celery_services + ['beat'] all_services = web_services + task_services if s == 'all': @@ -225,9 +228,14 @@ def get_start_celery_node_tree_kwargs(): return get_start_worker_kwargs('node_tree', 2) +def get_start_celery_node_assets_amount_kwargs(): + print("\n- Start Celery as Distributed Task Queue: NodeAssetsAmount") + return get_start_worker_kwargs('celery_node_assets_amount', 1) + + def get_start_celery_check_asset_perm_expired_kwargs(): print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired") - return get_start_worker_kwargs('check_asset_perm_expired', 1) + return get_start_worker_kwargs('celery_check_asset_perm_expired', 1) def get_start_worker_kwargs(queue, num): @@ -366,7 +374,8 @@ def start_service(s): "celery_ansible": get_start_celery_ansible_kwargs, "celery_default": get_start_celery_default_kwargs, "celery_node_tree": get_start_celery_node_tree_kwargs, - "check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs, + "celery_node_assets_amount": get_start_celery_node_assets_amount_kwargs, + "celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs, "beat": get_start_beat_kwargs, "flower": get_start_flower_kwargs, "daphne": get_start_daphne_kwargs,