perf(celery-task): 优化检查节点资产数量的 Celery 任务

pull/5063/head
xinwen 2020-11-20 16:12:24 +08:00 committed by 老广
parent 21993b0d89
commit a7c704bea3
5 changed files with 26 additions and 23 deletions

View File

@ -1,14 +1,13 @@
from celery import shared_task
from ops.celery.decorator import register_as_period_task
from assets.utils import check_node_assets_amount
from common.utils import get_logger
from common.utils.timezone import now
logger = get_logger(__file__)
@shared_task()
@register_as_period_task(crontab='0 2 * * *')
@shared_task(queue='node_assets_amount')
def check_node_assets_amount_celery_task():
logger.info(f'>>> {now()} begin check_node_assets_amount_celery_task ...')
check_node_assets_amount()
logger.info(f'>>> {now()} end check_node_assets_amount_celery_task ...')

View File

@ -1,5 +1,7 @@
# ~*~ coding: utf-8 ~*~
#
import time
from django.db.models import Q
from common.utils import get_logger, dict_get_any, is_uuid, get_object_or_none
@ -12,15 +14,18 @@ logger = get_logger(__file__)
def check_node_assets_amount():
for node in Node.objects.all():
logger.info(f'Check node assets amount: {node}')
assets_amount = Asset.objects.filter(
Q(nodes__key__istartswith=f'{node.key}:') | Q(nodes=node)
).distinct().count()
if node.assets_amount != assets_amount:
print(f'>>> <Node:{node.key}> wrong assets amount '
logger.warn(f'Node wrong assets amount <Node:{node.key}> '
f'{node.assets_amount} right is {assets_amount}')
node.assets_amount = assets_amount
node.save()
# 防止自检程序给数据库的压力太大
time.sleep(2)
def is_asset_exists_in_node(asset_pk, node_key):

View File

@ -29,16 +29,3 @@ configs["CELERY_ROUTES"] = {
app.namespace = 'CELERY'
app.conf.update(configs)
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
app.conf.beat_schedule = {
'check-asset-permission-expired': {
'task': 'perms.tasks.check_asset_permission_expired',
'schedule': settings.PERM_EXPIRED_CHECK_PERIODIC,
'args': ()
},
'check-node-assets-amount': {
'task': 'assets.tasks.nodes_amount.check_node_assets_amount_celery_task',
'schedule': crontab(minute=0, hour=0),
'args': ()
},
}

View File

@ -5,10 +5,12 @@ from datetime import timedelta
from django.db import transaction
from django.db.models import Q
from django.db.transaction import atomic
from django.conf import settings
from celery import shared_task
from common.utils import get_logger
from common.utils.timezone import now, dt_formater, dt_parser
from users.models import User
from ops.celery.decorator import register_as_period_task
from assets.models import Node
from perms.models import RebuildUserTreeTask, AssetPermission
from perms.utils.asset.user_permission import rebuild_user_mapping_nodes_if_need_with_lock, lock
@ -33,7 +35,8 @@ def dispatch_mapping_node_tasks():
rebuild_user_mapping_nodes_celery_task.delay(id)
@shared_task(queue='check_asset_perm_expired')
@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC)
@shared_task(queue='celery_check_asset_perm_expired')
@atomic()
def check_asset_permission_expired():
"""

15
jms
View File

@ -156,7 +156,10 @@ def is_running(s, unlink=True):
def parse_service(s):
web_services = ['gunicorn', 'flower', 'daphne']
celery_services = ["celery_ansible", "celery_default", "celery_node_tree", "check_asset_perm_expired"]
celery_services = [
"celery_ansible", "celery_default", "celery_node_tree",
"celery_check_asset_perm_expired", "celery_node_assets_amount"
]
task_services = celery_services + ['beat']
all_services = web_services + task_services
if s == 'all':
@ -225,9 +228,14 @@ def get_start_celery_node_tree_kwargs():
return get_start_worker_kwargs('node_tree', 2)
def get_start_celery_node_assets_amount_kwargs():
print("\n- Start Celery as Distributed Task Queue: NodeAssetsAmount")
return get_start_worker_kwargs('celery_node_assets_amount', 1)
def get_start_celery_check_asset_perm_expired_kwargs():
print("\n- Start Celery as Distributed Task Queue: CheckAseetPermissionExpired")
return get_start_worker_kwargs('check_asset_perm_expired', 1)
return get_start_worker_kwargs('celery_check_asset_perm_expired', 1)
def get_start_worker_kwargs(queue, num):
@ -366,7 +374,8 @@ def start_service(s):
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"celery_node_tree": get_start_celery_node_tree_kwargs,
"check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"celery_node_assets_amount": get_start_celery_node_assets_amount_kwargs,
"celery_check_asset_perm_expired": get_start_celery_check_asset_perm_expired_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,