From 886875d628e38f6dedd70535ee98eafb085e814f Mon Sep 17 00:00:00 2001 From: ibuler Date: Wed, 20 Nov 2024 11:04:00 +0800 Subject: [PATCH] perf: prepaire using thread timer for bulk_create_decorator --- .../api/automations/gather_account.py | 2 +- .../automations/check_account/manager.py | 87 ++++++++++++------- .../automations/gather_account/manager.py | 2 +- apps/common/db/utils.py | 11 ++- apps/common/decorators.py | 19 +--- 5 files changed, 66 insertions(+), 55 deletions(-) diff --git a/apps/accounts/api/automations/gather_account.py b/apps/accounts/api/automations/gather_account.py index c015116ce..b0a8e24a7 100644 --- a/apps/accounts/api/automations/gather_account.py +++ b/apps/accounts/api/automations/gather_account.py @@ -86,7 +86,7 @@ class GatheredAccountViewSet(OrgBulkModelViewSet): with transaction.atomic(): execution.save() execution.start() - accounts = self.model.objects.filter(asset=asset) + accounts = self.model.objects.filter(asset=asset).prefetch_related('asset', 'asset__platform') return self.get_paginated_response_from_queryset(accounts) @action(methods=['post'], detail=False, url_path='sync-accounts') diff --git a/apps/accounts/automations/check_account/manager.py b/apps/accounts/automations/check_account/manager.py index 011196e5a..29be1fea0 100644 --- a/apps/accounts/automations/check_account/manager.py +++ b/apps/accounts/automations/check_account/manager.py @@ -5,6 +5,7 @@ from django.utils import timezone from accounts.models import Account, AccountRisk from assets.automations.base.manager import BaseManager +from common.decorators import bulk_create_decorator, bulk_update_decorator from common.utils.strings import color_fmt @@ -27,63 +28,77 @@ def is_weak_password(password): return True # 正则表达式判断字符多样性(数字、字母、特殊字符) - if (not re.search(r'[A-Za-z]', password) - or not re.search(r'[0-9]', password) - or not re.search(r'[\W_]', password)): + if ( + not re.search(r"[A-Za-z]", password) + or not re.search(r"[0-9]", password) + or not re.search(r"[\W_]", password) + ): return True return False +@bulk_create_decorator(AccountRisk) +def create_risk(account, risk): + pass + + +@bulk_update_decorator(AccountRisk, update_fields=["details"]) +def update_risk(risk): + return risk + + def check_account_secrets(accounts, assets): now = timezone.now().isoformat() risks = [] tmpl = "Check account %s: %s" summary = defaultdict(int) result = defaultdict(list) - summary['accounts'] = len(accounts) - summary['assets'] = len(assets) + summary["accounts"] = len(accounts) + summary["assets"] = len(assets) for account in accounts: result_item = { - 'asset': str(account.asset), - 'username': account.username, + "asset": str(account.asset), + "username": account.username, } if not account.secret: print(tmpl % (account, "no secret")) - summary['no_secret'] += 1 - result['no_secret'].append(result_item) + summary["no_secret"] += 1 + result["no_secret"].append(result_item) continue if is_weak_password(account.secret): print(tmpl % (account, color_fmt("weak", "red"))) - summary['weak_password'] += 1 - result['weak_password'].append(result_item) - risks.append({ - 'account': account, - 'risk': 'weak_password', - }) + summary["weak_password"] += 1 + result["weak_password"].append(result_item) + risks.append( + { + "account": account, + "risk": "weak_password", + } + ) else: - summary['ok'] += 1 - result['ok'].append(result_item) + summary["ok"] += 1 + result["ok"].append(result_item) print(tmpl % (account, color_fmt("ok", "green"))) origin_risks = AccountRisk.objects.filter(asset__in=assets) - origin_risks_dict = {f'{r.asset_id}_{r.username}_{r.risk}': r for r in origin_risks} + origin_risks_dict = {f"{r.asset_id}_{r.username}_{r.risk}": r for r in origin_risks} for d in risks: key = f'{d["account"].asset_id}_{d["account"].username}_{d["risk"]}' origin_risk = origin_risks_dict.get(key) if origin_risk: - origin_risk.details.append({'datetime': now}) - origin_risk.save(update_fields=['details']) + origin_risk.details.append({"datetime": now}) + update_risk(origin_risk) else: - AccountRisk.objects.create( - asset=d['account'].asset, - username=d['account'].username, - risk=d['risk'], - details=[{'datetime': now}], - ) + create_risk({ + "asset": d["account"].asset, + "username": d["account"].username, + "risk": d["risk"], + "details": [{"datetime": now}], + }) return summary, result @@ -98,17 +113,17 @@ class CheckAccountManager(BaseManager): def pre_run(self): self.assets = self.execution.get_all_assets() self.execution.date_start = timezone.now() - self.execution.save(update_fields=['date_start']) + self.execution.save(update_fields=["date_start"]) def do_run(self, *args, **kwargs): - for engine in self.execution.snapshot.get('engines', []): - if engine == 'check_account_secret': + for engine in self.execution.snapshot.get("engines", []): + if engine == "check_account_secret": handle = check_account_secrets else: continue for i in range(0, len(self.assets), self.batch_size): - _assets = self.assets[i:i + self.batch_size] + _assets = self.assets[i : i + self.batch_size] accounts = Account.objects.filter(asset__in=_assets) summary, result = handle(accounts, _assets) @@ -121,10 +136,16 @@ class CheckAccountManager(BaseManager): return "Check account report of %s" % self.execution.id def get_report_template(self): - return 'accounts/check_account_report.html' + return "accounts/check_account_report.html" def print_summary(self): - tmpl = "\n---\nSummary: \nok: %s, weak password: %s, no secret: %s, using time: %ss" % ( - self.summary['ok'], self.summary['weak_password'], self.summary['no_secret'], int(self.duration) + tmpl = ( + "\n---\nSummary: \nok: %s, weak password: %s, no secret: %s, using time: %ss" + % ( + self.summary["ok"], + self.summary["weak_password"], + self.summary["no_secret"], + int(self.duration), + ) ) print(tmpl) diff --git a/apps/accounts/automations/gather_account/manager.py b/apps/accounts/automations/gather_account/manager.py index a20dc4533..482deda85 100644 --- a/apps/accounts/automations/gather_account/manager.py +++ b/apps/accounts/automations/gather_account/manager.py @@ -205,7 +205,7 @@ class GatherAccountsManager(AccountBasePlaybookManager): for asset, username in accounts: self.ori_asset_usernames[asset].add(username) - ga_accounts = GatheredAccount.objects.filter(asset__in=assets) + ga_accounts = GatheredAccount.objects.filter(asset__in=assets).prefetch_related('asset') for account in ga_accounts: self.ori_gathered_usernames[account.asset].add(account.username) key = '{}_{}'.format(account.asset_id, account.username) diff --git a/apps/common/db/utils.py b/apps/common/db/utils.py index 4fa84dd7f..774cbc444 100644 --- a/apps/common/db/utils.py +++ b/apps/common/db/utils.py @@ -53,13 +53,18 @@ def close_old_connections(): @contextmanager def safe_db_connection(): + in_atomic_block = connection.in_atomic_block # 当前是否处于事务中 + autocommit = transaction.get_autocommit() # 是否启用了自动提交 + try: - close_old_connections() # 确保旧连接关闭 - if connection.connection: # 如果连接已关闭,重新连接 + if not connection.is_usable(): + connection.close() connection.connect() yield finally: - close_old_connections() # 确保最终关闭连接 + # 如果不是事务中(API 请求中可能需要提交事务),则关闭连接 + if not in_atomic_block and autocommit: + close_old_connections() @contextmanager diff --git a/apps/common/decorators.py b/apps/common/decorators.py index d46f5489f..11f5f49f8 100644 --- a/apps/common/decorators.py +++ b/apps/common/decorators.py @@ -296,13 +296,7 @@ def cached_method(ttl=20): return decorator -def bulk_create_decorator(instance_model, batch_size=50): - """ - 装饰器,用于将实例批量保存,并提供 `commit` 方法提交剩余的实例。 - - :param instance_model: Django模型类,用于调用 bulk_create 方法。 - :param batch_size: 批量保存的阈值,默认50。 - """ +def bulk_create_decorator(instance_model, batch_size=50, ignore_conflict=True): def decorator(func): cache = [] # 缓存实例的列表 @@ -322,7 +316,7 @@ def bulk_create_decorator(instance_model, batch_size=50): # 如果缓存大小达到批量保存阈值,执行保存 if len(cache) >= batch_size: print(f"Batch size reached. Saving {len(cache)} instances...") - instance_model.objects.bulk_create(cache) + instance_model.objects.bulk_create(cache, ignore_conflict=ignore_conflict) cache.clear() return instance @@ -378,16 +372,7 @@ def bulk_update_decorator(instance_model, batch_size=50, update_fields=None): nonlocal cache if cache: print(f"Committing remaining {len(cache)} instances..., {update_fields}") - # with transaction.atomic(): - # for c in cache: - # o = instance_model.objects.get(id=str(c.id)) - # print("Origin: ", o.id, o.sudoers) - # o.sudoers = c.sudoers - # o.save() - # print("New: ", c.id, c.sudoers) instance_model.objects.bulk_update(cache, update_fields) - # print("Committing remaining instances... done, ", cache[0].sudoers, cache[0].id, instance_model) - # print(instance_model.objects.get(id=str(cache[0].id)).sudoers) cache.clear() # 将 commit 方法绑定到装饰后的函数