perf: vault 同步速度问题 (#11277)

Co-authored-by: feng <1304903146@qq.com>
pull/11278/head
fit2bot 2023-08-14 22:32:53 +08:00 committed by GitHub
parent a6e49b730b
commit 1f8428ac1c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 50 additions and 32 deletions

View File

@ -43,7 +43,7 @@ class BaseVault(ABC):
'name', 'username', 'secret_type',
'connectivity', 'su_from', 'privileged'
])
metadata = {field: str(value) for field, value in metadata.items()}
metadata = {k: str(v)[:500] for k, v in metadata.items() if v}
return self._save_metadata(instance, metadata)
# -------- abstractmethod -------- #

View File

@ -1,9 +1,12 @@
from common.db.utils import get_logger
from .entries import build_entry
from .service import VaultKVClient
from ..base import BaseVault
__all__ = ['Vault']
logger = get_logger(__name__)
class Vault(BaseVault):
def __init__(self, *args, **kwargs):
@ -43,5 +46,8 @@ class Vault(BaseVault):
instance.mark_secret_save_to_vault()
def _save_metadata(self, instance, metadata):
entry = build_entry(instance)
self.client.update_metadata(path=entry.full_path, metadata=metadata)
try:
entry = build_entry(instance)
self.client.update_metadata(path=entry.full_path, metadata=metadata)
except Exception as e:
logger.error(f'save metadata error: {e}')

View File

@ -1,4 +1,5 @@
import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from celery import shared_task
from django.utils.translation import gettext_lazy as _
@ -12,6 +13,22 @@ from ..const import VaultTypeChoices
logger = get_logger(__name__)
def sync_instance(instance):
instance_desc = f'[{instance._meta.verbose_name}-{instance.id}-{instance}]'
if instance.secret_has_save_to_vault:
msg = f'\033[32m- 跳过同步: {instance_desc}, 原因: [已同步]'
return "skipped", msg
try:
vault_client.create(instance)
except Exception as e:
msg = f'\033[31m- 同步失败: {instance_desc}, 原因: [{e}]'
return "failed", msg
else:
msg = f'\033[32m- 同步成功: {instance_desc}'
return "succeeded", msg
@shared_task(verbose_name=_('Sync secret to vault'))
def sync_secret_to_vault():
if vault_client.is_type(VaultTypeChoices.local):
@ -19,39 +36,34 @@ def sync_secret_to_vault():
print('\033[35m>>> 当前 Vault 类型为本地数据库, 不需要同步')
return
failed, skipped, succeeded = 0, 0, 0
to_sync_models = [Account, AccountTemplate, Account.history.model]
print('\033[33m>>> 开始同步密钥数据到 Vault ({})'.format(datetime.datetime.now()))
with tmp_to_root_org():
to_sync_models = [Account, AccountTemplate, Account.history.model]
instances = []
for model in to_sync_models:
print(f'\033[33m>>> 开始同步: {model.__module__}')
succeeded = []
failed = []
skipped = []
instances = model.objects.all()
verbose_name = model._meta.original_attrs['verbose_name']
for instance in instances:
instance_desc = f'[{verbose_name}-{instance.id}-{instance}]'
if instance.secret_has_save_to_vault:
print(f'\033[32m- 跳过同步: {instance_desc}, 原因: [已同步]')
skipped.append(instance)
continue
try:
vault_client.create(instance)
except Exception as e:
failed.append(instance)
print(f'\033[31m- 同步失败: {instance_desc}, 原因: [{e}]')
else:
succeeded.append(instance)
print(f'\033[32m- 同步成功: {instance_desc}')
instances += list(model.objects.all())
total = len(succeeded) + len(failed) + len(skipped)
print(
f'\033[33m>>> 同步完成: {model.__module__}, '
f'共计: {total}, '
f'成功: {len(succeeded)}, '
f'失败: {len(failed)}, '
f'跳过: {len(skipped)}'
)
with ThreadPoolExecutor(max_workers=10) as executor:
tasks = [executor.submit(sync_instance, instance) for instance in instances]
for future in as_completed(tasks):
status, msg = future.result()
print(msg)
if status == "succeeded":
succeeded += 1
elif status == "failed":
failed += 1
elif status == "skipped":
skipped += 1
total = succeeded + failed + skipped
print(
f'\033[33m>>> 同步完成: {model.__module__}, '
f'共计: {total}, '
f'成功: {succeeded}, '
f'失败: {failed}, '
f'跳过: {skipped}'
)
print('\033[33m>>> 全部同步完成 ({})'.format(datetime.datetime.now()))
print('\033[0m')