perf: prepaire using thread timer for bulk_create_decorator

pull/14534/head
ibuler 2024-11-20 11:04:00 +08:00
parent 11975842f6
commit 886875d628
5 changed files with 66 additions and 55 deletions

View File

@ -86,7 +86,7 @@ class GatheredAccountViewSet(OrgBulkModelViewSet):
with transaction.atomic():
execution.save()
execution.start()
accounts = self.model.objects.filter(asset=asset)
accounts = self.model.objects.filter(asset=asset).prefetch_related('asset', 'asset__platform')
return self.get_paginated_response_from_queryset(accounts)
@action(methods=['post'], detail=False, url_path='sync-accounts')

View File

@ -5,6 +5,7 @@ from django.utils import timezone
from accounts.models import Account, AccountRisk
from assets.automations.base.manager import BaseManager
from common.decorators import bulk_create_decorator, bulk_update_decorator
from common.utils.strings import color_fmt
@ -27,63 +28,77 @@ def is_weak_password(password):
return True
# 正则表达式判断字符多样性(数字、字母、特殊字符)
if (not re.search(r'[A-Za-z]', password)
or not re.search(r'[0-9]', password)
or not re.search(r'[\W_]', password)):
if (
not re.search(r"[A-Za-z]", password)
or not re.search(r"[0-9]", password)
or not re.search(r"[\W_]", password)
):
return True
return False
@bulk_create_decorator(AccountRisk)
def create_risk(account, risk):
pass
@bulk_update_decorator(AccountRisk, update_fields=["details"])
def update_risk(risk):
return risk
def check_account_secrets(accounts, assets):
now = timezone.now().isoformat()
risks = []
tmpl = "Check account %s: %s"
summary = defaultdict(int)
result = defaultdict(list)
summary['accounts'] = len(accounts)
summary['assets'] = len(assets)
summary["accounts"] = len(accounts)
summary["assets"] = len(assets)
for account in accounts:
result_item = {
'asset': str(account.asset),
'username': account.username,
"asset": str(account.asset),
"username": account.username,
}
if not account.secret:
print(tmpl % (account, "no secret"))
summary['no_secret'] += 1
result['no_secret'].append(result_item)
summary["no_secret"] += 1
result["no_secret"].append(result_item)
continue
if is_weak_password(account.secret):
print(tmpl % (account, color_fmt("weak", "red")))
summary['weak_password'] += 1
result['weak_password'].append(result_item)
risks.append({
'account': account,
'risk': 'weak_password',
})
summary["weak_password"] += 1
result["weak_password"].append(result_item)
risks.append(
{
"account": account,
"risk": "weak_password",
}
)
else:
summary['ok'] += 1
result['ok'].append(result_item)
summary["ok"] += 1
result["ok"].append(result_item)
print(tmpl % (account, color_fmt("ok", "green")))
origin_risks = AccountRisk.objects.filter(asset__in=assets)
origin_risks_dict = {f'{r.asset_id}_{r.username}_{r.risk}': r for r in origin_risks}
origin_risks_dict = {f"{r.asset_id}_{r.username}_{r.risk}": r for r in origin_risks}
for d in risks:
key = f'{d["account"].asset_id}_{d["account"].username}_{d["risk"]}'
origin_risk = origin_risks_dict.get(key)
if origin_risk:
origin_risk.details.append({'datetime': now})
origin_risk.save(update_fields=['details'])
origin_risk.details.append({"datetime": now})
update_risk(origin_risk)
else:
AccountRisk.objects.create(
asset=d['account'].asset,
username=d['account'].username,
risk=d['risk'],
details=[{'datetime': now}],
)
create_risk({
"asset": d["account"].asset,
"username": d["account"].username,
"risk": d["risk"],
"details": [{"datetime": now}],
})
return summary, result
@ -98,17 +113,17 @@ class CheckAccountManager(BaseManager):
def pre_run(self):
self.assets = self.execution.get_all_assets()
self.execution.date_start = timezone.now()
self.execution.save(update_fields=['date_start'])
self.execution.save(update_fields=["date_start"])
def do_run(self, *args, **kwargs):
for engine in self.execution.snapshot.get('engines', []):
if engine == 'check_account_secret':
for engine in self.execution.snapshot.get("engines", []):
if engine == "check_account_secret":
handle = check_account_secrets
else:
continue
for i in range(0, len(self.assets), self.batch_size):
_assets = self.assets[i:i + self.batch_size]
_assets = self.assets[i : i + self.batch_size]
accounts = Account.objects.filter(asset__in=_assets)
summary, result = handle(accounts, _assets)
@ -121,10 +136,16 @@ class CheckAccountManager(BaseManager):
return "Check account report of %s" % self.execution.id
def get_report_template(self):
return 'accounts/check_account_report.html'
return "accounts/check_account_report.html"
def print_summary(self):
tmpl = "\n---\nSummary: \nok: %s, weak password: %s, no secret: %s, using time: %ss" % (
self.summary['ok'], self.summary['weak_password'], self.summary['no_secret'], int(self.duration)
tmpl = (
"\n---\nSummary: \nok: %s, weak password: %s, no secret: %s, using time: %ss"
% (
self.summary["ok"],
self.summary["weak_password"],
self.summary["no_secret"],
int(self.duration),
)
)
print(tmpl)

View File

@ -205,7 +205,7 @@ class GatherAccountsManager(AccountBasePlaybookManager):
for asset, username in accounts:
self.ori_asset_usernames[asset].add(username)
ga_accounts = GatheredAccount.objects.filter(asset__in=assets)
ga_accounts = GatheredAccount.objects.filter(asset__in=assets).prefetch_related('asset')
for account in ga_accounts:
self.ori_gathered_usernames[account.asset].add(account.username)
key = '{}_{}'.format(account.asset_id, account.username)

View File

@ -53,13 +53,18 @@ def close_old_connections():
@contextmanager
def safe_db_connection():
in_atomic_block = connection.in_atomic_block # 当前是否处于事务中
autocommit = transaction.get_autocommit() # 是否启用了自动提交
try:
close_old_connections() # 确保旧连接关闭
if connection.connection: # 如果连接已关闭,重新连接
if not connection.is_usable():
connection.close()
connection.connect()
yield
finally:
close_old_connections() # 确保最终关闭连接
# 如果不是事务中API 请求中可能需要提交事务),则关闭连接
if not in_atomic_block and autocommit:
close_old_connections()
@contextmanager

View File

@ -296,13 +296,7 @@ def cached_method(ttl=20):
return decorator
def bulk_create_decorator(instance_model, batch_size=50):
"""
装饰器用于将实例批量保存并提供 `commit` 方法提交剩余的实例
:param instance_model: Django模型类用于调用 bulk_create 方法
:param batch_size: 批量保存的阈值默认50
"""
def bulk_create_decorator(instance_model, batch_size=50, ignore_conflict=True):
def decorator(func):
cache = [] # 缓存实例的列表
@ -322,7 +316,7 @@ def bulk_create_decorator(instance_model, batch_size=50):
# 如果缓存大小达到批量保存阈值,执行保存
if len(cache) >= batch_size:
print(f"Batch size reached. Saving {len(cache)} instances...")
instance_model.objects.bulk_create(cache)
instance_model.objects.bulk_create(cache, ignore_conflict=ignore_conflict)
cache.clear()
return instance
@ -378,16 +372,7 @@ def bulk_update_decorator(instance_model, batch_size=50, update_fields=None):
nonlocal cache
if cache:
print(f"Committing remaining {len(cache)} instances..., {update_fields}")
# with transaction.atomic():
# for c in cache:
# o = instance_model.objects.get(id=str(c.id))
# print("Origin: ", o.id, o.sudoers)
# o.sudoers = c.sudoers
# o.save()
# print("New: ", c.id, c.sudoers)
instance_model.objects.bulk_update(cache, update_fields)
# print("Committing remaining instances... done, ", cache[0].sudoers, cache[0].id, instance_model)
# print(instance_model.objects.get(id=str(cache[0].id)).sudoers)
cache.clear()
# 将 commit 方法绑定到装饰后的函数