feat: Check for leaked duplicate passwords. (#14711)

* feat: Check for leaked duplicate passwords.

* perf: Use SQLite instead of txt as leak password database

---------

Co-authored-by: jiangweidong <1053570670@qq.com>
Co-authored-by: 老广 <ibuler@qq.com>
pull/14780/head
fit2bot 2025-01-07 16:47:47 +08:00 committed by GitHub
parent ca10fc5f20
commit 22b4ea2965
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 148 additions and 9 deletions

2
.gitattributes vendored
View File

@ -1,4 +1,4 @@
*.mmdb filter=lfs diff=lfs merge=lfs -text
*.mo filter=lfs diff=lfs merge=lfs -text
*.ipdb filter=lfs diff=lfs merge=lfs -text
leak_passwords.db filter=lfs diff=lfs merge=lfs -text

View File

@ -0,0 +1,3 @@
version https://git-lfs.github.com/spec/v1
oid sha256:8526225e13163cc92535a0ed3ef1c12cfde471173394504aea2b0583fe68efb5
size 98037760

View File

@ -1,6 +1,8 @@
import os
import re
from collections import defaultdict
import sqlite3
from django.conf import settings
from django.utils import timezone
from accounts.models import Account, AccountRisk, RiskChoice
@ -104,13 +106,136 @@ def check_account_secrets(accounts, assets):
class CheckAccountManager(BaseManager):
batch_size = 100
tmpl = 'Checked the status of account %s: %s'
def __init__(self, execution):
super().__init__(execution)
self.accounts = []
self.assets = []
self.global_origin_risks_dict = dict()
self.db_conn = None
self.db_cursor = None
def init_leak_password_db(self):
default_path = os.path.join(
settings.APPS_DIR, 'accounts', 'automations', 'check_account'
)
create_table = '''
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
asset_id TEXT, asset_name TEXT, name TEXT,
username TEXT, password TEXT
)
'''
if (settings.LEAK_PASSWORD_DB_PATH
and os.path.exists(settings.LEAK_PASSWORD_DB_PATH)):
db_path = settings.LEAK_PASSWORD_DB_PATH
else:
db_path = os.path.join(default_path, 'leak_passwords.db')
self.db_conn = sqlite3.connect(db_path)
self.db_cursor = self.db_conn.cursor()
self.db_cursor.execute(create_table)
def drop_account_table(self):
sql = 'DROP TABLE IF EXISTS accounts'
self.db_cursor.execute(sql)
self.db_conn.commit()
def close_db(self):
try:
self.db_cursor.close()
self.db_conn.close()
except Exception: # noqa
pass
@staticmethod
def create_or_update_risk(risks, origin_risks_dict):
now = timezone.now().isoformat()
for d in risks:
key = f'{d["account"].asset_id}_{d["account"].username}_{d["risk"]}'
origin_risk = origin_risks_dict.get(key)
if origin_risk:
origin_risk.details.append({"datetime": now, 'type': 'refind'})
update_risk(origin_risk)
else:
create_risk({
"asset": d["account"].asset,
"username": d["account"].username,
"risk": d["risk"],
"details": [{"datetime": now, 'type': 'init'}],
})
def is_leak_password(self, password):
sql = 'SELECT 1 FROM leak_passwords WHERE password = ? LIMIT 1'
self.db_cursor.execute(sql, (password,))
return self.db_cursor.fetchone() is not None
def check_account_secrets(self, accounts, assets):
risks = []
for account in accounts:
if not account.secret:
print(self.tmpl % (account, "no secret"))
self.risk_record('no_secret', account)
continue
if is_weak_password(account.secret):
key = RiskChoice.weak_password
print(self.tmpl % (account, color_fmt(key.value, "red")))
risks.append(self.risk_record(key, account))
elif self.is_leak_password(account.secret):
key = RiskChoice.leaked_password
print(self.tmpl % (account, color_fmt(key.value, "red")))
risks.append(self.risk_record(key, account))
else:
sql = ("INSERT INTO accounts (name, username, password, asset_id, asset_name) "
"VALUES (?, ?, ?, ?, ?)")
self.db_cursor.execute(
sql, [
account.name, account.username, account.secret,
str(account.asset_id), account.asset.name
]
)
self.db_conn.commit()
origin_risks = AccountRisk.objects.filter(asset__in=assets)
origin_risks_dict = {f"{r.asset_id}_{r.username}_{r.risk}": r for r in origin_risks}
self.global_origin_risks_dict.update(origin_risks_dict)
self.create_or_update_risk(risks, origin_risks_dict)
def risk_record(self, key, account):
self.summary[key] += 1
self.result[key].append({
'asset': str(account.asset), 'username': account.username,
})
return {'account': account, 'risk': key}
def check_repeat_secrets(self):
risks = []
sql = '''
SELECT name, username, password, asset_id, asset_name,
CASE WHEN password IN (
SELECT password FROM accounts GROUP BY password HAVING COUNT(*) > 1
) THEN 1 ELSE 0 END AS is_repeated FROM accounts
'''
self.db_cursor.execute(sql)
for results in self.db_cursor.fetchall():
name, username, *_, asset_id, asset_name, is_repeat = results
account = Account(asset_id=asset_id, username=username, name=name)
account_display = f'{name}({asset_name})'
if is_repeat:
key = RiskChoice.repeated_password
print(self.tmpl % (account_display, color_fmt(key.value, "red")))
risks.append(self.risk_record(key, account))
else:
key = 'ok'
print(self.tmpl % (account_display, color_fmt("ok", "green")))
self.risk_record(key, account)
self.create_or_update_risk(risks, self.global_origin_risks_dict)
def pre_run(self):
self.init_leak_password_db()
self.assets = self.execution.get_all_assets()
self.execution.date_start = timezone.now()
self.execution.save(update_fields=["date_start"])
@ -118,19 +243,22 @@ class CheckAccountManager(BaseManager):
def do_run(self, *args, **kwargs):
for engine in self.execution.snapshot.get("engines", []):
if engine == "check_account_secret":
handle = check_account_secrets
batch_handle = self.check_account_secrets
global_handle = self.check_repeat_secrets
else:
continue
for i in range(0, len(self.assets), self.batch_size):
_assets = self.assets[i: i + self.batch_size]
accounts = Account.objects.filter(asset__in=_assets)
summary, result = handle(accounts, _assets)
batch_handle(accounts, _assets)
for k, v in summary.items():
self.summary[k] = self.summary.get(k, 0) + v
for k, v in result.items():
self.result[k].extend(v)
global_handle()
def post_run(self):
super().post_run()
self.drop_account_table()
self.close_db()
def get_report_subject(self):
return "Check account report of %s" % self.execution.id
@ -140,10 +268,14 @@ class CheckAccountManager(BaseManager):
def print_summary(self):
tmpl = (
"\n---\nSummary: \nok: %s, weak password: %s, no secret: %s, using time: %ss"
"\n---\nSummary: \nok: %s, weak password: %s, leaked password: %s, "
"repeated password: %s, no secret: %s, using time: %ss"
% (
self.summary["ok"],
self.summary[RiskChoice.weak_password],
self.summary[RiskChoice.leaked_password],
self.summary[RiskChoice.repeated_password],
self.summary["no_secret"],
int(self.duration),
)

View File

@ -50,6 +50,8 @@ class RiskChoice(TextChoices):
long_time_password = 'long_time_password', _('Long time no change') # 好久没改密码的账号, 改密码
weak_password = 'weak_password', _('Weak password') # 弱密码, 改密
leaked_password = 'leaked_password', _('Leaked password') # 可能泄露的密码, 改密
repeated_password = 'repeated_password', _('Repeated password') # 重复度高的密码, 改密
password_error = 'password_error', _('Password error') # 密码错误, 修改账号
no_admin_account = 'no_admin_account', _('No admin account') # 无管理员账号, 设置账号
others = 'others', _('Others') # 其他风险, 确认

View File

@ -195,3 +195,5 @@ DJANGO_REDIS_SCAN_ITERSIZE = 1000
# GM DEVICE
PIICO_DEVICE_ENABLE = CONFIG.PIICO_DEVICE_ENABLE
PIICO_DRIVER_PATH = CONFIG.PIICO_DRIVER_PATH
LEAK_PASSWORD_DB_PATH = CONFIG.LEAK_PASSWORD_DB_PATH