mirror of https://github.com/jumpserver/jumpserver
110 lines
3.2 KiB
Python
110 lines
3.2 KiB
Python
import importlib
|
|
import inspect
|
|
|
|
from abc import ABC, abstractmethod
|
|
|
|
from django.forms.models import model_to_dict
|
|
|
|
from .entries import BaseEntry
|
|
from ...const import VaultTypeChoices
|
|
|
|
|
|
class BaseVault(ABC):
|
|
def __init__(self, *args, **kwargs):
|
|
self.enabled = kwargs.get('VAULT_ENABLED')
|
|
self._entry_classes = {}
|
|
self._load_entries()
|
|
|
|
def _load_entries_import_module(self, module_name):
|
|
module = importlib.import_module(module_name)
|
|
for name, obj in inspect.getmembers(module, inspect.isclass):
|
|
self._entry_classes.setdefault(name, obj)
|
|
|
|
def _load_entries(self):
|
|
if self.type == VaultTypeChoices.local:
|
|
return
|
|
|
|
module_name = f'accounts.backends.{self.type}.entries'
|
|
if importlib.util.find_spec(module_name): # noqa
|
|
self._load_entries_import_module(module_name)
|
|
base_module = 'accounts.backends.base.entries'
|
|
self._load_entries_import_module(base_module)
|
|
|
|
@property
|
|
@abstractmethod
|
|
def type(self):
|
|
raise NotImplementedError
|
|
|
|
def get(self, instance):
|
|
""" 返回 secret 值 """
|
|
return self._get(self.build_entry(instance))
|
|
|
|
def create(self, instance):
|
|
if not instance.secret_has_save_to_vault:
|
|
entry = self.build_entry(instance)
|
|
self._create(entry)
|
|
self._clean_db_secret(instance)
|
|
self.save_metadata(entry)
|
|
|
|
def update(self, instance):
|
|
entry = self.build_entry(instance)
|
|
if not instance.secret_has_save_to_vault:
|
|
self._update(entry)
|
|
self._clean_db_secret(instance)
|
|
self.save_metadata(entry)
|
|
|
|
if instance.is_sync_metadata:
|
|
self.save_metadata(entry)
|
|
|
|
def delete(self, instance):
|
|
entry = self.build_entry(instance)
|
|
self._delete(entry)
|
|
|
|
def save_metadata(self, entry):
|
|
metadata = model_to_dict(entry.instance, fields=[
|
|
'name', 'username', 'secret_type',
|
|
'connectivity', 'su_from', 'privileged'
|
|
])
|
|
metadata = {k: str(v)[:500] for k, v in metadata.items() if v}
|
|
return self._save_metadata(entry, metadata)
|
|
|
|
def build_entry(self, instance):
|
|
if self.type == VaultTypeChoices.local:
|
|
return BaseEntry(instance)
|
|
|
|
entry_class_name = f'{instance.__class__.__name__}Entry'
|
|
entry_class = self._entry_classes.get(entry_class_name)
|
|
if not entry_class:
|
|
raise Exception(f'Entry class {entry_class_name} is not found')
|
|
return entry_class(instance)
|
|
|
|
def _clean_db_secret(self, instance):
|
|
instance.is_sync_metadata = False
|
|
instance.mark_secret_save_to_vault()
|
|
|
|
# -------- abstractmethod -------- #
|
|
|
|
@abstractmethod
|
|
def _get(self, instance):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def _create(self, entry):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def _update(self, entry):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def _delete(self, entry):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def _save_metadata(self, instance, metadata):
|
|
raise NotImplementedError
|
|
|
|
@abstractmethod
|
|
def is_active(self, *args, **kwargs) -> (bool, str):
|
|
raise NotImplementedError
|