mirror of https://github.com/jumpserver/jumpserver
perf: optimize the connection of operation logs to ES to prevent ES downtime from causing the core component to become unhealthy. (#14283)
* perf: optimize the connection of operation logs to ES to prevent ES downtime from causing the core component to become unhealthy. * perf: sync publish message --------- Co-authored-by: jiangweidong <1053570670@qq.com>pull/14342/head
parent
63824d3491
commit
1ed1c3a536
|
@ -28,7 +28,7 @@ from orgs.utils import current_org, tmp_to_root_org
|
|||
from rbac.permissions import RBACPermission
|
||||
from terminal.models import default_storage
|
||||
from users.models import User
|
||||
from .backends import TYPE_ENGINE_MAPPING
|
||||
from .backends import get_operate_log_storage
|
||||
from .const import ActivityChoices
|
||||
from .filters import UserSessionFilterSet, OperateLogFilterSet
|
||||
from .models import (
|
||||
|
@ -224,13 +224,11 @@ class OperateLogViewSet(OrgReadonlyModelViewSet):
|
|||
if self.is_action_detail:
|
||||
with tmp_to_root_org():
|
||||
qs |= OperateLog.objects.filter(org_id=Organization.SYSTEM_ID)
|
||||
es_config = settings.OPERATE_LOG_ELASTICSEARCH_CONFIG
|
||||
if es_config:
|
||||
engine_mod = import_module(TYPE_ENGINE_MAPPING['es'])
|
||||
store = engine_mod.OperateLogStore(es_config)
|
||||
if store.ping(timeout=2):
|
||||
qs = ESQuerySet(store)
|
||||
qs.model = OperateLog
|
||||
|
||||
storage = get_operate_log_storage()
|
||||
if storage.get_type() == 'es':
|
||||
qs = ESQuerySet(storage)
|
||||
qs.model = OperateLog
|
||||
return qs
|
||||
|
||||
|
||||
|
|
|
@ -1,18 +1,62 @@
|
|||
from importlib import import_module
|
||||
|
||||
from django.conf import settings
|
||||
from django.core.cache import cache
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from common.utils import get_logger
|
||||
from .base import BaseOperateStorage
|
||||
from .es import OperateLogStore as ESOperateLogStore
|
||||
from .db import OperateLogStore as DBOperateLogStore
|
||||
|
||||
|
||||
TYPE_ENGINE_MAPPING = {
|
||||
'db': 'audits.backends.db',
|
||||
'es': 'audits.backends.es',
|
||||
logger = get_logger(__file__)
|
||||
|
||||
_global_op_log_storage: None | ESOperateLogStore | DBOperateLogStore = None
|
||||
op_log_type_mapping = {
|
||||
'server': DBOperateLogStore, 'es': ESOperateLogStore
|
||||
}
|
||||
|
||||
|
||||
def get_operate_log_storage(default=False):
|
||||
engine_mod = import_module(TYPE_ENGINE_MAPPING['db'])
|
||||
es_config = settings.OPERATE_LOG_ELASTICSEARCH_CONFIG
|
||||
if not default and es_config:
|
||||
engine_mod = import_module(TYPE_ENGINE_MAPPING['es'])
|
||||
storage = engine_mod.OperateLogStore(es_config)
|
||||
return storage
|
||||
def _send_es_unavailable_alarm_msg():
|
||||
from terminal.notifications import StorageConnectivityMessage
|
||||
from terminal.const import CommandStorageType
|
||||
|
||||
key = 'OPERATE_LOG_ES_UNAVAILABLE_KEY'
|
||||
if cache.get(key):
|
||||
return
|
||||
|
||||
cache.set(key, 1, 60)
|
||||
errors = [{
|
||||
'msg': _("Connect failed"), 'name': f"{_('Operate log')}",
|
||||
'type': CommandStorageType.es.label
|
||||
}]
|
||||
StorageConnectivityMessage(errors).publish_async()
|
||||
|
||||
|
||||
def refresh_log_storage():
|
||||
global _global_op_log_storage
|
||||
_global_op_log_storage = None
|
||||
|
||||
if settings.OPERATE_LOG_ELASTICSEARCH_CONFIG.get('HOSTS'):
|
||||
try:
|
||||
config = settings.OPERATE_LOG_ELASTICSEARCH_CONFIG
|
||||
log_storage = op_log_type_mapping['es'](config)
|
||||
_global_op_log_storage = log_storage
|
||||
except Exception as e:
|
||||
_send_es_unavailable_alarm_msg()
|
||||
logger.warning('Invalid logs storage type: es, error: %s' % str(e))
|
||||
|
||||
if not _global_op_log_storage:
|
||||
_global_op_log_storage = op_log_type_mapping['server']()
|
||||
|
||||
|
||||
def get_operate_log_storage():
|
||||
if _global_op_log_storage is None:
|
||||
refresh_log_storage()
|
||||
|
||||
log_storage = _global_op_log_storage
|
||||
if not log_storage.ping(timeout=3):
|
||||
if log_storage.get_type() == 'es':
|
||||
_send_es_unavailable_alarm_msg()
|
||||
logger.warning('Switch default operate log storage.')
|
||||
log_storage = op_log_type_mapping['server']()
|
||||
return log_storage
|
||||
|
|
|
@ -0,0 +1,15 @@
|
|||
from perms.const import ActionChoices
|
||||
|
||||
|
||||
class BaseOperateStorage(object):
|
||||
@staticmethod
|
||||
def get_type():
|
||||
return 'base'
|
||||
|
||||
@staticmethod
|
||||
def _get_special_handler(resource_type):
|
||||
# 根据资源类型,处理特殊字段
|
||||
resource_map = {
|
||||
'Asset permission': lambda k, v: ActionChoices.display(int(v)) if k == 'Actions' else v
|
||||
}
|
||||
return resource_map.get(resource_type, lambda k, v: v)
|
|
@ -2,14 +2,14 @@
|
|||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from audits.models import OperateLog
|
||||
from perms.const import ActionChoices
|
||||
from .base import BaseOperateStorage
|
||||
|
||||
|
||||
class OperateLogStore(object):
|
||||
class OperateLogStore(BaseOperateStorage):
|
||||
# 用不可见字符分割前后数据,节省存储-> diff: {'key': 'before\0after'}
|
||||
SEP = '\0'
|
||||
|
||||
def __init__(self, config):
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.model = OperateLog
|
||||
self.max_length = 2048
|
||||
self.max_length_tip_msg = _(
|
||||
|
@ -17,9 +17,13 @@ class OperateLogStore(object):
|
|||
)
|
||||
|
||||
@staticmethod
|
||||
def ping(timeout=None):
|
||||
def ping(*args, **kwargs):
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def get_type():
|
||||
return 'db'
|
||||
|
||||
@classmethod
|
||||
def convert_before_after_to_diff(cls, before, after):
|
||||
if not isinstance(before, dict):
|
||||
|
@ -46,20 +50,9 @@ class OperateLogStore(object):
|
|||
before[k], after[k] = before_value, after_value
|
||||
return before, after
|
||||
|
||||
@staticmethod
|
||||
def _get_special_handler(resource_type):
|
||||
# 根据资源类型,处理特殊字段
|
||||
resource_map = {
|
||||
'Asset permission': lambda k, v: ActionChoices.display(int(v)) if k == 'Actions' else v
|
||||
}
|
||||
return resource_map.get(resource_type, lambda k, v: _(v))
|
||||
|
||||
@classmethod
|
||||
def convert_diff_friendly(cls, op_log):
|
||||
diff_list = list()
|
||||
# 标记翻译字符串
|
||||
labels = _("labels")
|
||||
operate_log_id = _("operate_log_id")
|
||||
handler = cls._get_special_handler(op_log.resource_type)
|
||||
for k, v in op_log.diff.items():
|
||||
before, after = v.split(cls.SEP, 1)
|
||||
|
|
|
@ -2,16 +2,17 @@
|
|||
#
|
||||
import uuid
|
||||
|
||||
from django.utils.translation import gettext_lazy as _
|
||||
|
||||
from common.utils.timezone import local_now_display
|
||||
from common.utils import get_logger
|
||||
from common.utils.encode import Singleton
|
||||
from common.plugins.es import ES
|
||||
|
||||
from .base import BaseOperateStorage
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
||||
class OperateLogStore(ES, metaclass=Singleton):
|
||||
class OperateLogStore(BaseOperateStorage, ES):
|
||||
def __init__(self, config):
|
||||
properties = {
|
||||
"id": {
|
||||
|
@ -48,7 +49,26 @@ class OperateLogStore(ES, metaclass=Singleton):
|
|||
self.pre_use_check()
|
||||
|
||||
@staticmethod
|
||||
def make_data(data):
|
||||
def get_type():
|
||||
return 'es'
|
||||
|
||||
@classmethod
|
||||
def convert_diff_friendly(cls, op_log):
|
||||
diff_list = []
|
||||
handler = cls._get_special_handler(op_log.get('resource_type'))
|
||||
before = op_log.get('before') or {}
|
||||
after = op_log.get('after') or {}
|
||||
keys = set(before.keys()) | set(after.keys())
|
||||
for key in keys:
|
||||
before_v, after_v = before.get(key), after.get(key)
|
||||
diff_list.append({
|
||||
'field': _(key),
|
||||
'before': handler(key, before_v) if before_v else _('empty'),
|
||||
'after': handler(key, after_v) if after_v else _('empty'),
|
||||
})
|
||||
return diff_list
|
||||
|
||||
def make_data(self, data):
|
||||
op_id = data.get('id', str(uuid.uuid4()))
|
||||
datetime_param = data.get('datetime', local_now_display())
|
||||
data = {
|
||||
|
|
|
@ -7,7 +7,6 @@ from django.utils.translation import gettext_lazy as _
|
|||
|
||||
from common.local import encrypted_field_set
|
||||
from common.utils import get_request_ip, get_logger
|
||||
from common.utils.encode import Singleton
|
||||
from common.utils.timezone import as_current_tz
|
||||
from jumpserver.utils import current_request
|
||||
from orgs.models import Organization
|
||||
|
@ -21,17 +20,9 @@ from .backends import get_operate_log_storage
|
|||
logger = get_logger(__name__)
|
||||
|
||||
|
||||
class OperatorLogHandler(metaclass=Singleton):
|
||||
class OperatorLogHandler(object):
|
||||
CACHE_KEY = 'OPERATOR_LOG_CACHE_KEY'
|
||||
|
||||
def __init__(self):
|
||||
self.log_client = self.get_storage_client()
|
||||
|
||||
@staticmethod
|
||||
def get_storage_client():
|
||||
client = get_operate_log_storage()
|
||||
return client
|
||||
|
||||
@staticmethod
|
||||
def _consistent_type_to_str(value1, value2):
|
||||
if isinstance(value1, datetime):
|
||||
|
@ -164,13 +155,8 @@ class OperatorLogHandler(metaclass=Singleton):
|
|||
'remote_addr': remote_addr, 'before': before, 'after': after,
|
||||
}
|
||||
with transaction.atomic():
|
||||
if self.log_client.ping(timeout=1):
|
||||
client = self.log_client
|
||||
else:
|
||||
logger.info('Switch default operate log storage save.')
|
||||
client = get_operate_log_storage(default=True)
|
||||
|
||||
try:
|
||||
client = get_operate_log_storage()
|
||||
client.save(**data)
|
||||
except Exception as e:
|
||||
error_msg = 'An error occurred saving OperateLog.' \
|
||||
|
|
|
@ -3,7 +3,7 @@
|
|||
from django.utils.translation import gettext_lazy as _
|
||||
from rest_framework import serializers
|
||||
|
||||
from audits.backends.db import OperateLogStore
|
||||
from audits.backends import get_operate_log_storage
|
||||
from common.serializers.fields import LabeledChoiceField, ObjectRelatedField
|
||||
from common.utils import reverse, i18n_trans
|
||||
from common.utils.timezone import as_current_tz
|
||||
|
@ -77,7 +77,7 @@ class OperateLogActionDetailSerializer(serializers.ModelSerializer):
|
|||
fields = ('diff',)
|
||||
|
||||
def to_representation(self, instance):
|
||||
return {'diff': OperateLogStore.convert_diff_friendly(instance)}
|
||||
return {'diff': get_operate_log_storage().convert_diff_friendly(instance)}
|
||||
|
||||
|
||||
class OperateLogSerializer(BulkOrgResourceModelSerializer):
|
||||
|
|
|
@ -189,7 +189,7 @@ def on_django_start_set_operate_log_monitor_models(sender, **kwargs):
|
|||
'ApplyCommandTicket', 'ApplyLoginAssetTicket',
|
||||
'FavoriteAsset', 'ChangeSecretRecord'
|
||||
}
|
||||
include_models = {'UserSession'}
|
||||
include_models = set()
|
||||
for i, app in enumerate(apps.get_models(), 1):
|
||||
app_name = app._meta.app_label
|
||||
model_name = app._meta.object_name
|
||||
|
|
|
@ -14,7 +14,8 @@ class SiteMessageUtil:
|
|||
def send_msg(cls, subject, message, user_ids=(), group_ids=(),
|
||||
sender=None, is_broadcast=False):
|
||||
if not any((user_ids, group_ids, is_broadcast)):
|
||||
raise ValueError('No recipient is specified')
|
||||
logger.warning(f'No recipient is specified, message subject: {subject}')
|
||||
return
|
||||
|
||||
with transaction.atomic():
|
||||
site_msg = SiteMessageModel.objects.create(
|
||||
|
|
Loading…
Reference in New Issue