perf: update gathered account sync

pull/15029/head
ibuler 2025-03-13 17:54:05 +08:00 committed by 老广
parent 7c4931b6af
commit 996bee3afd
7 changed files with 88 additions and 48 deletions

View File

@ -3,7 +3,6 @@
from django.db.models import Q, Count from django.db.models import Q, Count
from django.http import HttpResponse from django.http import HttpResponse
from django.shortcuts import get_object_or_404 from django.shortcuts import get_object_or_404
from django.utils import timezone
from rest_framework.decorators import action from rest_framework.decorators import action
from rest_framework.exceptions import MethodNotAllowed from rest_framework.exceptions import MethodNotAllowed
from rest_framework.response import Response from rest_framework.response import Response
@ -65,13 +64,14 @@ class CheckAccountExecutionViewSet(AutomationExecutionViewSet):
return Response(status=400, data={"asset_id": "This field is required."}) return Response(status=400, data={"asset_id": "This field is required."})
get_object_or_404(Asset, pk=asset_id) get_object_or_404(Asset, pk=asset_id)
name = "Check asset risk: {}".format(asset_id)
execution = AutomationExecution() execution = AutomationExecution()
execution.snapshot = { execution.snapshot = {
"assets": [asset_id], "assets": [asset_id],
"nodes": [], "nodes": [],
"type": AutomationTypes.check_account, "type": AutomationTypes.check_account,
"engines": ["check_account_secret"], "engines": "__all__",
"name": "Check asset risk: {} {}".format(asset_id, timezone.now()), "name": name,
} }
execution.save() execution.save()
execution.start() execution.start()

View File

@ -92,12 +92,13 @@ class GatheredAccountViewSet(OrgBulkModelViewSet):
def status(self, request, *args, **kwargs): def status(self, request, *args, **kwargs):
ids = request.data.get('ids', []) ids = request.data.get('ids', [])
new_status = request.data.get("status") new_status = request.data.get("status")
updated_instances = GatheredAccount.objects.filter(id__in=ids) updated_instances = GatheredAccount.objects.filter(id__in=ids).select_related('asset')
updated_instances.update(status=new_status)
if new_status == "confirmed": if new_status == "confirmed":
GatheredAccount.sync_accounts(updated_instances) GatheredAccount.sync_accounts(updated_instances)
updated_instances.update(present=True) updated_instances.update(present=True)
updated_instances.update(status=new_status)
return Response(status=status.HTTP_200_OK) return Response(status=status.HTTP_200_OK)
def perform_destroy(self, instance): def perform_destroy(self, instance):

View File

@ -241,7 +241,11 @@ class CheckAccountManager(BaseManager):
self.commit_risks(_assets) self.commit_risks(_assets)
def do_run(self, *args, **kwargs): def do_run(self, *args, **kwargs):
for engine in self.execution.snapshot.get("engines", []): engines = self.execution.snapshot.get("engines", [])
if engines == '__all__':
engines = ['check_account_secret', 'check_account_repeat', 'check_account_leak']
for engine in engines:
if engine == "check_account_secret": if engine == "check_account_secret":
handler = CheckSecretHandler(self.assets) handler = CheckSecretHandler(self.assets)
elif engine == "check_account_repeat": elif engine == "check_account_repeat":

View File

@ -26,13 +26,15 @@ class AccountHistoricalRecords(HistoricalRecords):
if not self.included_fields: if not self.included_fields:
return super().post_save(instance, created, using=using, **kwargs) return super().post_save(instance, created, using=using, **kwargs)
check_fields = set(self.included_fields) - {'version'} # self.updated_version = 0
if created:
return super().post_save(instance, created, using=using, **kwargs)
history_account = instance.history.first() history_account = instance.history.first()
if history_account is None: if history_account is None:
self.updated_version = 0
return super().post_save(instance, created, using=using, **kwargs) return super().post_save(instance, created, using=using, **kwargs)
check_fields = set(self.included_fields) - {'version'}
history_attrs = {field: getattr(history_account, field) for field in check_fields} history_attrs = {field: getattr(history_account, field) for field in check_fields}
attrs = {field: getattr(instance, field) for field in check_fields} attrs = {field: getattr(instance, field) for field in check_fields}
@ -87,8 +89,10 @@ class Account(AbsConnectivity, LabeledMixin, BaseAccount, JSONFilterMixin):
on_delete=models.SET_NULL, verbose_name=_("Su from") on_delete=models.SET_NULL, verbose_name=_("Su from")
) )
version = models.IntegerField(default=0, verbose_name=_('Version')) version = models.IntegerField(default=0, verbose_name=_('Version'))
history = AccountHistoricalRecords(included_fields=['id', '_secret', 'secret_type', 'version'], history = AccountHistoricalRecords(
verbose_name=_("historical Account")) included_fields=['id', '_secret', 'secret_type', 'version'],
verbose_name=_("historical Account")
)
secret_reset = models.BooleanField(default=True, verbose_name=_('Secret reset')) secret_reset = models.BooleanField(default=True, verbose_name=_('Secret reset'))
source = models.CharField(max_length=30, default=Source.LOCAL, verbose_name=_('Source')) source = models.CharField(max_length=30, default=Source.LOCAL, verbose_name=_('Source'))
source_id = models.CharField(max_length=128, null=True, blank=True, verbose_name=_('Source ID')) source_id = models.CharField(max_length=128, null=True, blank=True, verbose_name=_('Source ID'))

View File

@ -1,5 +1,6 @@
from collections import defaultdict
from django.db import models from django.db import models
from django.db.models import Q
from django.utils.translation import gettext_lazy as _ from django.utils.translation import gettext_lazy as _
from accounts.const import AutomationTypes, Source from accounts.const import AutomationTypes, Source
@ -30,49 +31,73 @@ class GatheredAccount(JMSOrgBaseModel):
return self.asset.address return self.asset.address
@classmethod @classmethod
def update_exists_accounts(cls, gathered_account, accounts): def update_exists_accounts(cls, ga_accounts_set): # gathered_account, accounts):
if not gathered_account.date_last_login: to_updates = []
return
for account in accounts: for gathered_account, accounts in ga_accounts_set:
# 这里是否可以考虑,标记成未从堡垒机登录风险 if not gathered_account.date_last_login:
if is_date_more_than(gathered_account.date_last_login, account.date_last_login, '5m'): return
for account in accounts:
# 这里是否可以考虑,标记成未从堡垒机登录风险
if not is_date_more_than(gathered_account.date_last_login, account.date_last_login, '5m'):
continue
account.date_last_login = gathered_account.date_last_login account.date_last_login = gathered_account.date_last_login
account.login_by = '{}({})'.format('unknown', gathered_account.address_last_login) account.login_by = '{}({})'.format('unknown', gathered_account.address_last_login)
account.save(update_fields=['date_last_login', 'login_by']) to_updates.append(account)
Account.objects.bulk_update(to_updates, fields=['date_last_login', 'login_by'])
@classmethod @classmethod
def create_accounts(cls, gathered_account): def bulk_create_accounts(cls, gathered_accounts):
account_objs = [] account_objs = []
asset_id = gathered_account.asset_id
username = gathered_account.username
account = Account(
asset_id=asset_id, username=username,
name=username, source=Source.DISCOVERY,
date_last_login=gathered_account.date_last_login,
)
account_objs.append(account)
Account.objects.bulk_create(account_objs)
gathered_account.status = ConfirmOrIgnore.confirmed
gathered_account.save(update_fields=['status'])
@classmethod
def sync_accounts(cls, gathered_accounts, auto_create=True):
"""
更新为已存在的账号或者创建新的账号, 原来的 sync 重构了如果存在则自动更新一些信息
"""
for gathered_account in gathered_accounts: for gathered_account in gathered_accounts:
asset_id = gathered_account.asset_id asset_id = gathered_account.asset_id
username = gathered_account.username username = gathered_account.username
accounts = Account.objects.filter( account = Account(
Q(asset_id=asset_id, username=username) | asset_id=asset_id, username=username,
Q(asset_id=asset_id, name=username) name=username, source=Source.DISCOVERY,
date_last_login=gathered_account.date_last_login,
) )
account_objs.append(account)
Account.objects.bulk_create(account_objs, ignore_conflicts=True)
if accounts.exists(): ga_ids = [ga.id for ga in gathered_accounts]
cls.update_exists_accounts(gathered_account, accounts) GatheredAccount.objects.filter(id__in=ga_ids).update(status=ConfirmOrIgnore.confirmed)
elif auto_create:
cls.create_accounts(gathered_account) @classmethod
def sync_accounts(cls, gathered_accounts):
"""
更新为已存在的账号或者创建新的账号, 原来的 sync 重构了如果存在则自动更新一些信息
"""
assets = [gathered_account.asset_id for gathered_account in gathered_accounts]
usernames = [gathered_account.username for gathered_account in gathered_accounts]
origin_accounts = Account.objects.filter(
asset__in=assets, username__in=usernames
).select_related('asset')
origin_mapper = defaultdict(list)
for origin_account in origin_accounts:
asset_id = origin_account.asset_id
username = origin_account.username
origin_mapper[(asset_id, username)].append(origin_account)
to_update = []
to_create = []
for gathered_account in gathered_accounts:
asset_id = gathered_account.asset_id
username = gathered_account.username
accounts = origin_mapper.get((asset_id, username))
if accounts:
to_update.append((gathered_account, accounts))
else:
to_create.append(gathered_account)
cls.bulk_create_accounts(to_create)
cls.update_exists_accounts(to_update)
class Meta: class Meta:
verbose_name = _("Gather asset accounts") verbose_name = _("Gather asset accounts")
@ -82,7 +107,7 @@ class GatheredAccount(JMSOrgBaseModel):
ordering = ['asset'] ordering = ['asset']
def __str__(self): def __str__(self):
return '{}: {}'.format(self.asset, self.username) return '{}: {}'.format(self.asset_id, self.username)
class GatherAccountsAutomation(AccountBaseAutomation): class GatherAccountsAutomation(AccountBaseAutomation):

View File

@ -6,6 +6,7 @@ from django.db.models.signals import post_save
from django.utils.translation import gettext_lazy as _, gettext_noop from django.utils.translation import gettext_lazy as _, gettext_noop
from audits.models import ActivityLog from audits.models import ActivityLog
from common.decorators import bulk_create_decorator
from common.utils import i18n_fmt, get_logger from common.utils import i18n_fmt, get_logger
from jumpserver.utils import current_request from jumpserver.utils import current_request
from ops.celery import app from ops.celery import app
@ -67,21 +68,26 @@ class ActivityLogHandler:
return resource_ids, detail, org_id return resource_ids, detail, org_id
@bulk_create_decorator(ActivityLog)
def create_activity(data):
return ActivityLog(**data)
def create_activities(resource_ids, detail, detail_id, action, org_id): def create_activities(resource_ids, detail, detail_id, action, org_id):
if not resource_ids: if not resource_ids:
return return
if not org_id: if not org_id:
org_id = Organization.ROOT_ID org_id = Organization.ROOT_ID
activities = [ activities = [
ActivityLog( dict(
resource_id=getattr(resource_id, 'pk', resource_id), resource_id=getattr(resource_id, 'pk', resource_id),
type=action, detail=detail, detail_id=detail_id, org_id=org_id type=action, detail=detail, detail_id=detail_id, org_id=org_id
) )
for resource_id in resource_ids for resource_id in resource_ids
] ]
with tmp_to_org(org_id): with tmp_to_org(org_id):
ActivityLog.objects.bulk_create(activities) for activity in activities:
return activities create_activity(activity)
@signals.after_task_publish.connect @signals.after_task_publish.connect

View File

@ -1264,7 +1264,7 @@
"SupportedProtocolHelpText": "Set supported protocols for the asset, you can modify the custom configurations, such as sftp directory, rdp ad domain, etc., by clicking on the set button", "SupportedProtocolHelpText": "Set supported protocols for the asset, you can modify the custom configurations, such as sftp directory, rdp ad domain, etc., by clicking on the set button",
"Sync": "Sync", "Sync": "Sync",
"SyncAction": "Synchronized action", "SyncAction": "Synchronized action",
"SyncDelete": "Sync deletion", "SyncDelete": "Sync delete",
"SyncDeleteSelected": "Sync deletion selected", "SyncDeleteSelected": "Sync deletion selected",
"SyncErrorMsg": "Sync failed", "SyncErrorMsg": "Sync failed",
"SyncInstanceTaskCreate": "Create sync task", "SyncInstanceTaskCreate": "Create sync task",