pref: 修改 activity log (#9571)

* pref: 修改 activity log

* perf: 优化 acitivity

* pref: 修改 activity

* fix: 修复一些运行问题

* fix: app.py 中添加 tasks import

* fix: 添加 activity_callback

* fix: 添加 execute_account_backup_plan activity_callback

* fix: 添加 activity_callback -> gather_asset_accounts

* fix: 对 celery 任务添加 activity_callback 回调

* fix: 修改翻译

---------

Co-authored-by: ibuler <ibuler@qq.com>
Co-authored-by: jiangweidong <weidong.jiang@fit2cloud.com>
Co-authored-by: Bai <baijiangjie@gmail.com>
pull/9608/head^2
fit2bot 2023-02-17 17:14:53 +08:00 committed by GitHub
parent 01c2e7128d
commit 9a62a7aaab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
46 changed files with 700 additions and 653 deletions

View File

@ -110,6 +110,6 @@ class AutomationExecutionViewSet(
serializer.is_valid(raise_exception=True)
automation = serializer.validated_data.get('automation')
task = execute_automation.delay(
pid=str(automation.pk), trigger=Trigger.manual, tp=self.tp
automation=automation, trigger=Trigger.manual
)
return Response({'task': task.id}, status=status.HTTP_201_CREATED)

View File

@ -7,4 +7,5 @@ class AccountsConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import tasks
__all__ = signal_handlers

View File

@ -77,15 +77,10 @@ class AssetAccountHandler(BaseAccountHandler):
return filename
@classmethod
def create_data_map(cls, types: list):
def create_data_map(cls, accounts):
data_map = defaultdict(list)
# TODO 可以优化一下查询 在账号上做 category 的缓存 避免数据量大时连表操作
qs = Account.objects.filter(
asset__platform__type__in=types
).annotate(type=F('asset__platform__type'))
if not qs.exists():
if not accounts.exists():
return data_map
type_dict = {}
@ -93,18 +88,18 @@ class AssetAccountHandler(BaseAccountHandler):
for j in i['children']:
type_dict[j['value']] = j['display_name']
header_fields = cls.get_header_fields(AccountSecretSerializer(qs.first()))
header_fields = cls.get_header_fields(AccountSecretSerializer(accounts.first()))
account_type_map = defaultdict(list)
for account in qs:
for account in accounts:
account_type_map[account.type].append(account)
data_map = {}
for tp, accounts in account_type_map.items():
for tp, _accounts in account_type_map.items():
sheet_name = type_dict.get(tp, tp)
data = AccountSecretSerializer(accounts, many=True).data
data = AccountSecretSerializer(_accounts, many=True).data
data_map.update(cls.add_rows(data, header_fields, sheet_name))
logger.info('\n\033[33m- 共收集 {} 条账号\033[0m'.format(qs.count()))
logger.info('\n\033[33m- 共备份 {} 条账号\033[0m'.format(accounts.count()))
return data_map
@ -123,9 +118,8 @@ class AccountBackupHandler:
# Print task start date
time_start = time.time()
files = []
types = self.execution.types
data_map = AssetAccountHandler.create_data_map(types)
accounts = self.execution.backup_accounts
data_map = AssetAccountHandler.create_data_map(accounts)
if not data_map:
return files

View File

@ -5,7 +5,9 @@ import uuid
from celery import current_task
from django.db import models
from django.db.models import F
from django.utils.translation import ugettext_lazy as _
from common.utils import lazyproperty
from common.const.choices import Trigger
from common.db.encoder import ModelJSONFieldEncoder
@ -70,6 +72,10 @@ class AccountBackupAutomation(PeriodTaskModelMixin, JMSOrgBaseModel):
)
return execution.start()
@lazyproperty
def latest_execution(self):
return self.execution.first()
class AccountBackupExecution(OrgModelMixin):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
@ -112,6 +118,15 @@ class AccountBackupExecution(OrgModelMixin):
return []
return recipients.values()
@lazyproperty
def backup_accounts(self):
from accounts.models import Account
# TODO 可以优化一下查询 在账号上做 category 的缓存 避免数据量大时连表操作
qs = Account.objects.filter(
asset__platform__type__in=self.types
).annotate(type=F('asset__platform__type'))
return qs
@property
def manager_type(self):
return 'backup_account'

View File

@ -1,20 +1,21 @@
from celery import shared_task
from django.utils.translation import gettext_lazy as _
from orgs.utils import tmp_to_root_org, tmp_to_org
from common.utils import get_logger, get_object_or_none
from accounts.const import AutomationTypes
from common.utils import get_logger
from orgs.utils import tmp_to_org
logger = get_logger(__file__)
@shared_task(queue='ansible', verbose_name=_('Account execute automation'))
def execute_automation(pid, trigger, tp):
model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org():
instance = get_object_or_none(model, pk=pid)
if not instance:
logger.error("No automation task found: {}".format(pid))
return
def task_activity_callback(self, instance, *args):
asset_ids = instance.get_all_asset_ids()
return asset_ids, instance.org_id
@shared_task(
queue='ansible', verbose_name=_('Account execute automation'),
activity_callback=task_activity_callback
)
def execute_automation(instance, trigger):
with tmp_to_org(instance.org):
instance.execute(trigger)

View File

@ -9,7 +9,19 @@ from orgs.utils import tmp_to_org, tmp_to_root_org
logger = get_logger(__file__)
@shared_task(verbose_name=_('Execute account backup plan'))
def task_activity_callback(self, pid, trigger):
with tmp_to_root_org():
plan = get_object_or_none(AccountBackupAutomation, pk=pid)
if not plan:
return
if not plan.latest_execution:
return
resource_ids = plan.latest_execution.backup_accounts
org_id = plan.org_id
return resource_ids, org_id
@shared_task(verbose_name=_('Execute account backup plan'), activity_callback=task_activity_callback)
def execute_account_backup_plan(pid, trigger):
from accounts.models import AccountBackupAutomation
with tmp_to_root_org():

View File

@ -25,7 +25,10 @@ def gather_asset_accounts_util(nodes, task_name):
automation_execute_start(task_name, tp, task_snapshot)
@shared_task(queue="ansible", verbose_name=_('Gather asset accounts'))
@shared_task(
queue="ansible", verbose_name=_('Gather asset accounts'),
activity_callback=lambda self, node_ids, task_name=None: (node_ids, None)
)
def gather_asset_accounts(node_ids, task_name=None):
if task_name is None:
task_name = gettext_noop("Gather assets accounts")

View File

@ -33,7 +33,10 @@ def push_accounts_to_assets_util(accounts, assets):
push_util(account, assets, task_name)
@shared_task(queue="ansible", verbose_name=_('Push accounts to assets'))
@shared_task(
queue="ansible", verbose_name=_('Push accounts to assets'),
activity_callback=lambda self, account_ids, asset_ids: (account_ids, None)
)
def push_accounts_to_assets(account_ids, asset_ids):
from assets.models import Asset
from accounts.models import Account

View File

@ -38,7 +38,10 @@ def verify_accounts_connectivity_util(accounts, assets, task_name):
)
@shared_task(queue="ansible", verbose_name=_('Verify asset account availability'))
@shared_task(
queue="ansible", verbose_name=_('Verify asset account availability'),
activity_callback=lambda self, account_ids, asset_ids: (account_ids, None)
)
def verify_accounts_connectivity(account_ids, asset_ids):
from assets.models import Asset
from accounts.models import Account, VerifyAccountAutomation

View File

@ -151,12 +151,11 @@ class AssetsTaskMixin:
def perform_assets_task(self, serializer):
data = serializer.validated_data
assets = data.get("assets", [])
asset_ids = [asset.id for asset in assets]
if data["action"] == "refresh":
task = update_assets_hardware_info_manual(asset_ids)
task = update_assets_hardware_info_manual(assets)
else:
task = test_assets_connectivity_manual(asset_ids)
task = test_assets_connectivity_manual(assets)
return task
def perform_create(self, serializer):

View File

@ -11,7 +11,6 @@ from rest_framework.response import Response
from rest_framework.serializers import ValidationError
from assets.models import Asset
from rbac.permissions import RBACPermission
from common.api import SuggestionMixin
from common.const.http import POST
from common.const.signals import PRE_REMOVE, POST_REMOVE
@ -20,6 +19,7 @@ from common.utils import get_logger
from orgs.mixins import generics
from orgs.mixins.api import OrgBulkModelViewSet
from orgs.utils import current_org
from rbac.permissions import RBACPermission
from .. import serializers
from ..models import Node
from ..tasks import (
@ -28,7 +28,6 @@ from ..tasks import (
check_node_assets_amount_task
)
logger = get_logger(__file__)
__all__ = [
'NodeViewSet', 'NodeAssetsApi', 'NodeAddAssetsApi',
@ -224,7 +223,7 @@ class NodeTaskCreateApi(generics.CreateAPIView):
return
if action == "refresh":
task = update_node_assets_hardware_info_manual(node.id)
task = update_node_assets_hardware_info_manual(node)
else:
task = test_node_assets_connectivity_manual(node.id)
task = test_node_assets_connectivity_manual(node)
self.set_serializer_data(serializer, task)

View File

@ -14,3 +14,5 @@ class AssetsConfig(AppConfig):
def ready(self):
super().ready()
from . import signal_handlers
from . import tasks

View File

@ -80,6 +80,10 @@ class BaseAutomation(PeriodTaskModelMixin, JMSOrgBaseModel):
def executed_amount(self):
return self.executions.count()
@property
def latest_execution(self):
return self.executions.first()
def execute(self, trigger=Trigger.manual):
try:
eid = current_task.request.id
@ -125,12 +129,16 @@ class AutomationExecution(OrgModelMixin):
def manager_type(self):
return self.snapshot['type']
def get_all_assets(self):
def get_all_asset_ids(self):
node_ids = self.snapshot['nodes']
asset_ids = self.snapshot['assets']
nodes = Node.objects.filter(id__in=node_ids)
node_asset_ids = Node.get_nodes_all_assets(*nodes).values_list('id', flat=True)
asset_ids = set(list(asset_ids) + list(node_asset_ids))
return asset_ids
def get_all_assets(self):
asset_ids = self.get_all_asset_ids()
return Asset.objects.filter(id__in=asset_ids)
def all_assets_group_by_platform(self):

View File

@ -68,8 +68,8 @@ class AssetAccountSerializer(
class Meta:
model = Account
fields_mini = [
'id', 'name', 'username', 'privileged', 'is_active',
'version', 'secret_type',
'id', 'name', 'username', 'privileged',
'is_active', 'version', 'secret_type',
]
fields_write_only = [
'secret', 'push_now', 'template'
@ -259,8 +259,6 @@ class AssetSerializer(BulkOrgResourceModelSerializer, WritableNestedModelSeriali
def accounts_create(accounts_data, asset):
for data in accounts_data:
data['asset'] = asset
secret = data.get('secret')
data['secret'] = decrypt_password(secret) if secret else secret
AssetAccountSerializer().create(data)
@atomic
@ -274,8 +272,6 @@ class AssetSerializer(BulkOrgResourceModelSerializer, WritableNestedModelSeriali
@atomic
def update(self, instance, validated_data):
if not validated_data.get('accounts'):
validated_data.pop('accounts', None)
nodes_display = validated_data.pop('nodes_display', '')
instance = super().update(instance, validated_data)
self.perform_nodes_display_create(instance, nodes_display)

View File

@ -11,6 +11,7 @@ from assets.tasks import test_assets_connectivity_task, gather_assets_facts_task
from common.const.signals import POST_REMOVE, PRE_REMOVE
from common.decorators import on_transaction_commit, merge_delay_run, key_by_org
from common.utils import get_logger
from orgs.utils import current_org
logger = get_logger(__file__)
@ -23,7 +24,7 @@ def on_node_pre_save(sender, instance: Node, **kwargs):
@merge_delay_run(ttl=5, key=key_by_org)
def test_assets_connectivity_handler(assets=()):
task_name = gettext_noop("Test assets connectivity ")
test_assets_connectivity_task.delay(assets, task_name)
test_assets_connectivity_task.delay(assets, str(current_org.id), task_name)
@merge_delay_run(ttl=5, key=key_by_org)
@ -32,7 +33,7 @@ def gather_assets_facts_handler(assets=()):
logger.info("No assets to update hardware info")
return
name = gettext_noop("Gather asset hardware info")
gather_assets_facts_task.delay(assets=assets, task_name=name)
gather_assets_facts_task.delay(assets, str(current_org.id), task_name=name)
@merge_delay_run(ttl=5, key=key_by_org)

View File

@ -8,7 +8,22 @@ from assets.const import AutomationTypes
logger = get_logger(__file__)
@shared_task(queue='ansible', verbose_name=_('Asset execute automation'))
def task_activity_callback(self, pid, trigger, tp):
model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org():
instance = get_object_or_none(model, pk=pid)
if not instance:
return
if not instance.latest_execution:
return
resource_ids = instance.latest_execution.get_all_asset_ids()
return resource_ids, instance.org_id
@shared_task(
queue='ansible', verbose_name=_('Asset execute automation'),
activity_callback=task_activity_callback
)
def execute_automation(pid, trigger, tp):
model = AutomationTypes.get_type_model(tp)
with tmp_to_root_org():

View File

@ -1,13 +1,12 @@
# -*- coding: utf-8 -*-
#
from itertools import chain
from celery import shared_task
from django.utils.translation import gettext_noop, gettext_lazy as _
from assets.const import AutomationTypes
from common.utils import get_logger
from orgs.utils import tmp_to_org
from orgs.utils import tmp_to_org, current_org
from .common import quickstart_automation
logger = get_logger(__file__)
@ -18,22 +17,17 @@ __all__ = [
]
@shared_task(queue="ansible", verbose_name=_('Gather assets facts'))
def gather_assets_facts_task(assets=None, nodes=None, task_name=None):
@shared_task(
queue="ansible", verbose_name=_('Gather assets facts'),
activity_callback=lambda self, asset_ids, org_id, *args, **kwargs: (asset_ids, org_id)
)
def gather_assets_facts_task(asset_ids, org_id, task_name=None):
from assets.models import GatherFactsAutomation
if task_name is None:
task_name = gettext_noop("Gather assets facts")
task_name = GatherFactsAutomation.generate_unique_name(task_name)
nodes = nodes or []
assets = assets or []
resources = chain(assets, nodes)
if not resources:
raise ValueError("nodes or assets must be given")
org_id = list(resources)[0].org_id
task_snapshot = {
'assets': [str(asset.id) for asset in assets],
'nodes': [str(node.id) for node in nodes],
'assets': asset_ids,
}
tp = AutomationTypes.gather_facts
@ -41,15 +35,14 @@ def gather_assets_facts_task(assets=None, nodes=None, task_name=None):
quickstart_automation(task_name, tp, task_snapshot)
def update_assets_hardware_info_manual(asset_ids):
from assets.models import Asset
assets = Asset.objects.filter(id__in=asset_ids)
def update_assets_hardware_info_manual(assets):
task_name = gettext_noop("Update assets hardware info: ")
return gather_assets_facts_task.delay(assets=assets, task_name=task_name)
asset_ids = [str(i.id) for i in assets]
return gather_assets_facts_task.delay(asset_ids, str(current_org.id), task_name=task_name)
def update_node_assets_hardware_info_manual(node_id):
from assets.models import Node
node = Node.objects.get(id=node_id)
def update_node_assets_hardware_info_manual(node):
asset_ids = node.get_all_asset_ids()
asset_ids = [str(i) for i in asset_ids]
task_name = gettext_noop("Update node asset hardware information: ")
return gather_assets_facts_task.delay(nodes=[node], task_name=task_name)
return gather_assets_facts_task.delay(asset_ids, str(current_org.id), task_name=task_name)

View File

@ -30,7 +30,7 @@ def check_node_assets_amount_task(org_id=None):
logger.error(error)
@register_as_period_task(crontab=CRONTAB_AT_AM_TWO)
@shared_task(verbose_name=_('Periodic check the amount of assets under the node'))
@register_as_period_task(crontab=CRONTAB_AT_AM_TWO)
def check_node_assets_amount_period_task():
check_node_assets_amount_task()

View File

@ -1,10 +1,10 @@
# ~*~ coding: utf-8 ~*~
from celery import shared_task
from django.utils.translation import gettext_noop
from django.utils.translation import gettext_noop, gettext_lazy as _
from assets.const import AutomationTypes
from common.utils import get_logger
from orgs.utils import org_aware_func
from orgs.utils import tmp_to_org, current_org
from .common import quickstart_automation
logger = get_logger(__file__)
@ -16,28 +16,31 @@ __all__ = [
]
@shared_task
@org_aware_func('assets')
def test_assets_connectivity_task(assets, task_name=None):
@shared_task(
verbose_name=_('Test assets connectivity'), queue='ansible',
activity_callback=lambda self, asset_ids, org_id, *args, **kwargs: (asset_ids, org_id)
)
def test_assets_connectivity_task(asset_ids, org_id, task_name=None):
from assets.models import PingAutomation
if task_name is None:
task_name = gettext_noop("Test assets connectivity ")
task_name = gettext_noop("Test assets connectivity")
task_name = PingAutomation.generate_unique_name(task_name)
task_snapshot = {'assets': [str(asset.id) for asset in assets]}
quickstart_automation(task_name, AutomationTypes.ping, task_snapshot)
task_snapshot = {'assets': asset_ids}
with tmp_to_org(org_id):
quickstart_automation(task_name, AutomationTypes.ping, task_snapshot)
def test_assets_connectivity_manual(asset_ids):
from assets.models import Asset
assets = Asset.objects.filter(id__in=asset_ids)
def test_assets_connectivity_manual(assets):
task_name = gettext_noop("Test assets connectivity ")
return test_assets_connectivity_task.delay(assets, task_name)
asset_ids = [str(i.id) for i in assets]
org_id = str(current_org.id)
return test_assets_connectivity_task.delay(asset_ids, org_id, task_name)
def test_node_assets_connectivity_manual(node_id):
from assets.models import Node
node = Node.objects.get(id=node_id)
def test_node_assets_connectivity_manual(node):
task_name = gettext_noop("Test if the assets under the node are connectable ")
assets = node.get_all_assets()
return test_assets_connectivity_task.delay(assets, task_name)
asset_ids = node.get_all_asset_ids()
asset_ids = [str(i) for i in asset_ids]
org_id = str(current_org.id)
return test_assets_connectivity_task.delay(asset_ids, org_id, task_name)

View File

@ -1,10 +1,10 @@
# ~*~ coding: utf-8 ~*~
from celery import shared_task
from django.utils.translation import gettext_noop
from django.utils.translation import gettext_noop, ugettext_lazy as _
from assets.const import AutomationTypes
from common.utils import get_logger
from orgs.utils import org_aware_func
from orgs.utils import org_aware_func, tmp_to_org, current_org
from .common import quickstart_automation
logger = get_logger(__file__)
@ -15,20 +15,24 @@ __all__ = [
]
@shared_task
@shared_task(
verbose_name=_('Test gateways connectivity'), queue='ansible',
activity_callback=lambda self, asset_ids, org_id, *args, **kwargs: (asset_ids, org_id)
)
@org_aware_func('assets')
def test_gateways_connectivity_task(assets, local_port, task_name=None):
def test_gateways_connectivity_task(asset_ids, org_id, local_port, task_name=None):
from assets.models import PingAutomation
if task_name is None:
task_name = gettext_noop("Test gateways connectivity ")
task_name = gettext_noop("Test gateways connectivity")
task_name = PingAutomation.generate_unique_name(task_name)
task_snapshot = {'assets': [str(asset.id) for asset in assets], 'local_port': local_port}
quickstart_automation(task_name, AutomationTypes.ping_gateway, task_snapshot)
task_snapshot = {'assets': asset_ids, 'local_port': local_port}
with tmp_to_org(org_id):
quickstart_automation(task_name, AutomationTypes.ping_gateway, task_snapshot)
def test_gateways_connectivity_manual(gateway_ids, local_port):
from assets.models import Asset
gateways = Asset.objects.filter(id__in=gateway_ids)
task_name = gettext_noop("Test gateways connectivity ")
return test_gateways_connectivity_task.delay(gateways, local_port, task_name)
gateways = Asset.objects.filter(id__in=gateway_ids).values_list('id', flat=True)
task_name = gettext_noop("Test gateways connectivity")
return test_gateways_connectivity_task.delay(gateways, str(current_org.id), local_port, task_name)

View File

@ -3,7 +3,7 @@
from importlib import import_module
from django.conf import settings
from django.db.models import F, Value, CharField
from django.db.models import F, Value, CharField, Q
from rest_framework import generics
from rest_framework.mixins import ListModelMixin, CreateModelMixin, RetrieveModelMixin
from rest_framework.permissions import IsAuthenticated
@ -14,6 +14,7 @@ from common.plugins.es import QuerySet as ESQuerySet
from ops.models.job import JobAuditLog
from orgs.mixins.api import OrgGenericViewSet, OrgBulkModelViewSet
from orgs.utils import current_org, tmp_to_root_org
from users.models import User
from .backends import TYPE_ENGINE_MAPPING
from .const import ActivityChoices
from .models import FTPLog, UserLoginLog, OperateLog, PasswordChangeLog, ActivityLog
@ -92,8 +93,12 @@ class ResourceActivityAPIView(generics.ListAPIView):
}
@staticmethod
def get_operate_log_qs(fields, limit=30, **filters):
queryset = OperateLog.objects.filter(**filters).annotate(
def get_operate_log_qs(fields, limit=30, resource_id=None):
q = Q(resource_id=resource_id)
user = User.objects.filter(id=resource_id).first()
if user:
q |= Q(user=str(user))
queryset = OperateLog.objects.filter(q).annotate(
r_type=Value(ActivityChoices.operate_log, CharField()),
r_detail_id=F('id'), r_detail=Value(None, CharField()),
r_user=F('user'), r_action=F('action'),

View File

@ -10,5 +10,7 @@ class AuditsConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import tasks
if settings.SYSLOG_ENABLE:
post_save.connect(signal_handlers.on_audits_log_create)

View File

@ -107,7 +107,9 @@ class ActivityLog(OrgModelMixin):
datetime = models.DateTimeField(
auto_now=True, verbose_name=_('Datetime'), db_index=True
)
# 日志的描述信息
detail = models.TextField(default='', blank=True, verbose_name=_('Detail'))
# 详情ID, 结合 type 来使用, (实例ID 和 CeleryTaskID)
detail_id = models.CharField(
max_length=36, default=None, null=True, verbose_name=_('Detail ID')
)

View File

@ -107,6 +107,7 @@ class SessionAuditSerializer(serializers.ModelSerializer):
class ActivityUnionLogSerializer(serializers.Serializer):
id = serializers.CharField()
timestamp = serializers.SerializerMethodField()
detail_url = serializers.SerializerMethodField()
content = serializers.SerializerMethodField()
@ -120,7 +121,7 @@ class ActivityUnionLogSerializer(serializers.Serializer):
def get_content(obj):
if not obj['r_detail']:
action = obj['r_action'].replace('_', ' ').capitalize()
ctn = _('User {} {} this resource').format(obj['r_user'], _(action))
ctn = _('User %s %s this resource') % (obj['r_user'], _(action))
else:
ctn = i18n_trans(obj['r_detail'])
return ctn

View File

@ -8,18 +8,20 @@ from accounts.const import AutomationTypes
from accounts.models import AccountBackupAutomation
from assets.models import Asset, Node
from audits.models import ActivityLog
from common.utils import get_object_or_none, i18n_fmt
from common.utils import get_object_or_none, i18n_fmt, get_logger
from jumpserver.utils import current_request
from ops.celery import app
from orgs.utils import tmp_to_root_org
from orgs.models import Organization
from orgs.utils import tmp_to_root_org, current_org
from terminal.models import Session
from users.models import User
from ..const import ActivityChoices
from ..models import UserLoginLog
logger = get_logger(__name__)
class ActivityLogHandler(object):
class TaskActivityHandler(object):
@staticmethod
def _func_accounts_execute_automation(*args, **kwargs):
@ -80,12 +82,6 @@ class ActivityLogHandler(object):
asset_ids = node.get_all_assets().values_list('id', flat=True)
return '', asset_ids
def get_celery_task_info(self, task_name, *args, **kwargs):
task_display, resource_ids = self.get_info_by_task_name(
task_name, *args, **kwargs
)
return task_display, resource_ids
@staticmethod
def get_task_display(task_name, **kwargs):
task = app.tasks.get(task_name)
@ -107,6 +103,8 @@ class ActivityLogHandler(object):
task_display = '%s-%s' % (task_display, task_type)
return task_display, resource_ids
class ActivityLogHandler:
@staticmethod
def session_for_activity(obj):
detail = i18n_fmt(
@ -119,75 +117,84 @@ class ActivityLogHandler(object):
def login_log_for_activity(obj):
login_status = gettext_noop('Success') if obj.status else gettext_noop('Failed')
detail = i18n_fmt(gettext_noop('User %s login system %s'), obj.username, login_status)
user_id = User.objects.filter(username=obj.username).values('id').first()
username = obj.username
user_id = User.objects.filter(username=username) \
.values_list('id', flat=True).first()
resource_list = []
if user_id:
resource_list = [user_id['id']]
resource_list = [user_id]
return resource_list, detail, ActivityChoices.login_log, Organization.SYSTEM_ID
activity_handler = ActivityLogHandler()
@staticmethod
def task_log_for_celery(headers, body):
task_id, task_name = headers.get('id'), headers.get('task')
task = app.tasks.get(task_name)
if not task:
raise ValueError('Task not found: {}'.format(task_name))
activity_callback = getattr(task, 'activity_callback', None)
if not callable(activity_callback):
return [], '', ''
args, kwargs = body[:2]
data = activity_callback(*args, **kwargs)
if data is None:
return [], '', ''
resource_ids, org_id, user = data + ('',) * (3 - len(data))
if not user:
user = str(current_request.user) if current_request else 'System'
if org_id is None:
org_id = current_org.org_id
task_display = getattr(task, 'verbose_name', _('Unknown'))
detail = i18n_fmt(
gettext_noop('User %s perform a task for this resource: %s'),
user, task_display
)
return resource_ids, detail, org_id
@signals.before_task_publish.connect
def before_task_publish_for_activity_log(headers=None, **kwargs):
task_id, task_name = headers.get('id'), headers.get('task')
args, kwargs = kwargs['body'][:2]
task_display, resource_ids = activity_handler.get_celery_task_info(
task_name, args, **kwargs
)
if not current_request:
user = 'System'
else:
user = str(current_request.user)
detail = i18n_fmt(
gettext_noop('User %s perform a task (%s) for this resource'),
user, task_display
)
def create_activities(resource_ids, detail, detail_id, action, org_id):
if not resource_ids:
return
activities = [
ActivityLog(resource_id=resource_id, type=ActivityChoices.task, detail=detail)
ActivityLog(
resource_id=getattr(resource_id, 'pk', resource_id),
type=action, detail=detail, detail_id=detail_id, org_id=org_id
)
for resource_id in resource_ids
]
ActivityLog.objects.bulk_create(activities)
activity_info = {
'activity_ids': [a.id for a in activities]
}
kwargs['activity_info'] = activity_info
return activities
@signals.task_prerun.connect
def on_celery_task_pre_run_for_activity_log(task_id='', **kwargs):
activity_info = kwargs['kwargs'].pop('activity_info', None)
if activity_info is None:
return
@signals.after_task_publish.connect
def after_task_publish_for_activity_log(headers=None, body=None, **kwargs):
""" Tip: https://docs.celeryq.dev/en/stable/internals/protocol.html#message-protocol-task-v2 """
try:
task_id = headers.get('id')
resource_ids, detail, org_id = ActivityLogHandler.task_log_for_celery(headers, body)
except Exception as e:
logger.error(f'Get celery task info error: {e}', exc_info=True)
else:
logger.debug(f'Create activity log for celery task: {task_id}')
create_activities(resource_ids, detail, task_id, action=ActivityChoices.task, org_id=org_id)
activities = []
for activity_id in activity_info['activity_ids']:
activities.append(
ActivityLog(id=activity_id, detail_id=task_id)
)
ActivityLog.objects.bulk_update(activities, ('detail_id',))
model_activity_handler_map = {
Session: ActivityLogHandler.session_for_activity,
UserLoginLog: ActivityLogHandler.login_log_for_activity,
}
def on_session_or_login_log_created(sender, instance=None, created=False, **kwargs):
handler_mapping = {
'Session': activity_handler.session_for_activity,
'UserLoginLog': activity_handler.login_log_for_activity
}
model_name = sender._meta.object_name
if not created or model_name not in handler_mapping:
if not created:
return
resource_ids, detail, act_type, org_id = handler_mapping[model_name](instance)
activities = [
ActivityLog(
resource_id=i, type=act_type, detail=detail,
detail_id=instance.id, org_id=org_id
)
for i in resource_ids
]
ActivityLog.objects.bulk_create(activities)
func = model_activity_handler_map.get(sender)
if not func:
logger.error('Activity log handler not found: {}'.format(sender))
resource_ids, detail, act_type, org_id = func(instance)
return create_activities(resource_ids, detail, instance.id, act_type, org_id)
for sd in [Session, UserLoginLog]:

View File

@ -49,6 +49,8 @@ def clean_ftp_log_period():
FTPLog.objects.filter(date_start__lt=expired_day).delete()
@shared_task(verbose_name=_('Clean audits log'))
@register_as_period_task(interval=3600 * 24)
def clean_celery_tasks_period():
logger.debug("Start clean celery task history")
expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS')

View File

@ -9,5 +9,7 @@ class AuthenticationConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import notifications
from . import tasks
super().ready()

View File

@ -8,7 +8,7 @@ from django.utils import timezone
from django.utils.translation import gettext_lazy as _
@register_as_period_task(interval=3600 * 24)
@shared_task(verbose_name=_('Clean expired session'))
@register_as_period_task(interval=3600 * 24)
def clean_django_sessions():
Session.objects.filter(expire_date__lt=timezone.now()).delete()

View File

@ -9,6 +9,7 @@ class CommonConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import tasks
from .signals import django_ready
excludes = ['migrate', 'compilemessages', 'makemigrations']
for i in excludes:

View File

@ -10,7 +10,14 @@ from .utils import get_logger
logger = get_logger(__file__)
@shared_task(verbose_name=_("Send email"))
def task_activity_callback(self, subject, message, from_email, recipient_list, **kwargs):
from users.models import User
email_list = recipient_list
resource_ids = list(User.objects.filter(email__in=email_list).values_list('id', flat=True))
return resource_ids
@shared_task(verbose_name=_("Send email"), activity_callback=task_activity_callback)
def send_mail_async(*args, **kwargs):
""" Using celery to send email async
@ -19,7 +26,7 @@ def send_mail_async(*args, **kwargs):
Example:
send_mail_sync.delay(subject, message, from_mail, recipient_list, fail_silently=False, html_message=None)
Also you can ignore the from_mail, unlike django send_mail, from_email is not a require args:
Also, you can ignore the from_mail, unlike django send_mail, from_email is not a required args:
Example:
send_mail_sync.delay(subject, message, recipient_list, fail_silently=False, html_message=None)
@ -37,7 +44,7 @@ def send_mail_async(*args, **kwargs):
logger.error("Sending mail error: {}".format(e))
@shared_task(verbose_name=_("Send email attachment"))
@shared_task(verbose_name=_("Send email attachment"), activity_callback=task_activity_callback)
def send_mail_attachment_async(subject, message, recipient_list, attachment_list=None):
if attachment_list is None:
attachment_list = []

View File

@ -92,7 +92,6 @@ INSTALLED_APPS = [
'acls.apps.AclsConfig',
'notifications.apps.NotificationsConfig',
'rbac.apps.RBACConfig',
'common.apps.CommonConfig',
'jms_oidc_rp',
'rest_framework',
'rest_framework_swagger',
@ -111,6 +110,7 @@ INSTALLED_APPS = [
'django.contrib.messages',
'django.contrib.staticfiles',
'django.forms',
'common.apps.CommonConfig', # 这个放到内置的最后, django ready
'simple_history', # 这个要放到最后,别特么瞎改顺序
]

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:6bbad43c6fd75da933f86aa609c54641cfbf45099564b07e5090d007287af984
size 135540
oid sha256:6717c42fb411e6469fc755487eea88e926088a5a73dcf00a0d52e1b9986ae147
size 136360

File diff suppressed because it is too large Load Diff

View File

@ -1,3 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:910ef049b67560a279f8e0e702d74c1343f36c53d95ef84a32782c08c92a184a
size 111866
oid sha256:8109162896c38e80e2a22895cd42e48b174e7b44897d7ea34c4c19f8eb865e81
size 112043

File diff suppressed because it is too large Load Diff

View File

@ -10,6 +10,7 @@ from django.utils import translation, timezone
from django.utils.translation import gettext as _
from common.db.utils import close_old_connections, get_logger
from common.signals import django_ready
from .celery import app
from .models import CeleryTaskExecution, CeleryTask, Job
@ -45,6 +46,17 @@ def sync_registered_tasks(*args, **kwargs):
pass
@receiver(django_ready)
def check_registered_tasks(*args, **kwargs):
attrs = ['verbose_name', 'activity_callback']
for name, task in app.tasks.items():
if name.startswith('celery.'):
continue
for attr in attrs:
if not hasattr(task, attr):
print('>>> Task {} has no attribute {}'.format(name, attr))
@signals.before_task_publish.connect
def before_task_publish(headers=None, **kwargs):
task_id = headers.get('id')

View File

@ -19,7 +19,19 @@ from .notifications import ServerPerformanceCheckUtil
logger = get_logger(__file__)
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"))
def job_task_activity_callback(self, job_id, trigger):
job = get_object_or_none(Job, id=job_id)
if not job:
return
resource_ids = [job.id]
org_id = job.org_id
return resource_ids, org_id
@shared_task(
soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"),
activity_callback=job_task_activity_callback
)
def run_ops_job(job_id):
job = get_object_or_none(Job, id=job_id)
with tmp_to_org(job.org):
@ -36,7 +48,19 @@ def run_ops_job(job_id):
logger.error("Start adhoc execution error: {}".format(e))
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task execution"))
def job_execution_task_activity_callback(self, execution_id, trigger):
execution = get_object_or_none(JobExecution, id=execution_id)
if not execution:
return
resource_ids = [execution.id]
org_id = execution.org_id
return resource_ids, org_id
@shared_task(
soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task execution"),
activity_callback=job_execution_task_activity_callback
)
def run_ops_job_execution(execution_id, **kwargs):
execution = get_object_or_none(JobExecution, id=execution_id)
try:

View File

@ -7,4 +7,5 @@ class OrgsConfig(AppConfig):
verbose_name = _('App organizations')
def ready(self):
from . import tasks
from . import signal_handlers

View File

@ -87,7 +87,7 @@ class OrgResourceStatisticsRefreshUtil:
if not cache_field_name:
return
org = getattr(instance, 'org', None)
cls.refresh_org_fields((org, cache_field_name))
cls.refresh_org_fields(((org, cache_field_name),))
@receiver(post_save)

View File

@ -12,3 +12,4 @@ class PermsConfig(AppConfig):
super().ready()
from . import signal_handlers
from . import notifications
from . import tasks

View File

@ -24,8 +24,8 @@ from django.utils.translation import gettext_lazy as _
logger = get_logger(__file__)
@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC)
@shared_task(verbose_name=_('Check asset permission expired'))
@register_as_period_task(interval=settings.PERM_EXPIRED_CHECK_PERIODIC)
@atomic()
@tmp_to_root_org()
def check_asset_permission_expired():
@ -36,8 +36,8 @@ def check_asset_permission_expired():
UserPermTreeExpireUtil().expire_perm_tree_for_perms(perm_ids)
@register_as_period_task(crontab=CRONTAB_AT_AM_TEN)
@shared_task(verbose_name=_('Send asset permission expired notification'))
@register_as_period_task(crontab=CRONTAB_AT_AM_TEN)
@atomic()
@tmp_to_root_org()
def check_asset_permission_will_expired():

View File

@ -7,4 +7,5 @@ class SettingsConfig(AppConfig):
verbose_name = _('Settings')
def ready(self):
from . import tasks
from . import signal_handlers

View File

@ -11,4 +11,5 @@ class TerminalConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import notifications
from . import tasks
return super().ready()

View File

@ -181,6 +181,7 @@ class ConnectMethodUtil:
'match': 'm2m'
},
TerminalType.magnus: {
'web_methods': [],
'listen': [],
'support': [
Protocol.mysql, Protocol.postgresql,
@ -190,6 +191,7 @@ class ConnectMethodUtil:
'match': 'map'
},
TerminalType.razor: {
'web_methods': [],
'listen': [Protocol.rdp],
'support': [Protocol.rdp],
'match': 'map'

View File

@ -80,14 +80,14 @@ def upload_session_replay_to_external_storage(session_id):
return
@shared_task(verbose_name=_('Run applet host deployment'))
@shared_task(verbose_name=_('Run applet host deployment'), activity_callback=lambda did: ([did], ))
def run_applet_host_deployment(did):
with tmp_to_builtin_org(system=1):
deployment = AppletHostDeployment.objects.get(id=did)
deployment.start()
@shared_task(verbose_name=_('Install applet'))
@shared_task(verbose_name=_('Install applet'), activity_callback=lambda did, applet_id: ([did],))
def run_applet_host_deployment_install_applet(did, applet_id):
with tmp_to_builtin_org(system=1):
deployment = AppletHostDeployment.objects.get(id=did)

View File

@ -11,4 +11,5 @@ class UsersConfig(AppConfig):
def ready(self):
from . import signal_handlers
from . import notifications
from . import tasks
super().ready()