fix: 无效的 ES 会卡住

pull/5780/head
xinwen 2021-03-17 15:26:47 +08:00 committed by Jiangjie.Bai
parent 9b85aafa52
commit f1f5017be3
5 changed files with 47 additions and 10 deletions

View File

@ -11,7 +11,7 @@ from rest_framework.response import Response
from rest_framework.decorators import action
from django.template import loader
from terminal.models import CommandStorage
from terminal.models import CommandStorage, Command
from terminal.filters import CommandFilter
from orgs.utils import current_org
from common.permissions import IsOrgAdminOrAppUser, IsOrgAuditor, IsAppUser
@ -19,6 +19,7 @@ from common.const.http import GET
from common.utils import get_logger
from terminal.utils import send_command_alert_mail
from terminal.serializers import InsecureCommandAlertSerializer
from terminal.exceptions import StorageInvalid
from ..backends import (
get_command_storage, get_multi_command_storage,
SessionCommandSerializer,
@ -116,9 +117,12 @@ class CommandViewSet(viewsets.ModelViewSet):
storages = CommandStorage.objects.all()
for storage in storages:
if not storage.is_valid():
continue
qs = storage.get_command_queryset()
commands = self.filter_queryset(qs)
merged_commands.extend(commands)
merged_commands.extend(commands[:]) # ES 默认只取 10 条数据
merged_commands.sort(key=lambda command: command.timestamp, reverse=True)
page = self.paginate_queryset(merged_commands)
@ -126,7 +130,7 @@ class CommandViewSet(viewsets.ModelViewSet):
serializer = self.get_serializer(page, many=True)
return self.get_paginated_response(serializer.data)
serializer = self.get_serializer(queryset, many=True)
serializer = self.get_serializer(merged_commands, many=True)
return Response(serializer.data)
def list(self, request, *args, **kwargs):
@ -141,7 +145,10 @@ class CommandViewSet(viewsets.ModelViewSet):
def get_queryset(self):
command_storage_id = self.request.query_params.get('command_storage_id')
storage = CommandStorage.objects.get(id=command_storage_id)
qs = storage.get_command_queryset()
if not storage.is_valid():
raise StorageInvalid
else:
qs = storage.get_command_queryset()
return qs
def create(self, request, *args, **kwargs):

View File

@ -46,7 +46,13 @@ class CommandStorageViewSet(BaseStorageViewSetMixin, viewsets.ModelViewSet):
def tree(self, request: Request):
storage_qs = self.get_queryset().exclude(name='null')
storages_with_count = []
invalid_storages = []
for storage in storage_qs:
if not storage.is_valid():
invalid_storages.append(storage)
continue
command_qs = storage.get_command_queryset()
filterset = CommandFilter(
data=request.query_params, queryset=command_qs,
@ -70,6 +76,7 @@ class CommandStorageViewSet(BaseStorageViewSetMixin, viewsets.ModelViewSet):
'open': True,
}
invalid = _('Invalid')
nodes = [
{
'id': storage.id,
@ -78,7 +85,18 @@ class CommandStorageViewSet(BaseStorageViewSetMixin, viewsets.ModelViewSet):
'pId': 'root',
'isParent': False,
'open': False,
'valid': True,
} for storage, command_count in storages_with_count
] + [
{
'id': storage.id,
'name': f'{storage.name}({storage.type}) *{invalid}',
'title': f'{storage.name}({storage.type})',
'pId': 'root',
'isParent': False,
'open': False,
'valid': False,
} for storage in invalid_storages
]
nodes.append(root)
return Response(data=nodes)

View File

@ -25,7 +25,7 @@ class CommandStore():
kwargs = config.get("OTHER", {})
self.index = config.get("INDEX") or 'jumpserver'
self.doc_type = config.get("DOC_TYPE") or 'command_store'
self.es = Elasticsearch(hosts=hosts, **kwargs)
self.es = Elasticsearch(hosts=hosts, max_retries=0, **kwargs)
@staticmethod
def make_data(command):
@ -81,9 +81,9 @@ class CommandStore():
"""返回所有数据"""
raise NotImplementedError("Not support")
def ping(self):
def ping(self, timeout=None):
try:
return self.es.ping()
return self.es.ping(request_timeout=timeout)
except Exception:
return False
@ -256,7 +256,7 @@ class QuerySet(DJQuerySet):
clone = self.__clone()
from_ = item.start or 0
if item.stop is None:
size = 10
size = self.max_result_window - from_
else:
size = item.stop - from_

View File

@ -6,3 +6,9 @@ from common.exceptions import JMSException
class BulkCreateNotSupport(JMSException):
default_code = 'bulk_create_not_support'
default_detail = _('Bulk create not support')
class StorageInvalid(JMSException):
default_code = 'storage_invalid'
default_detail = _('Storage is invalid')

View File

@ -52,8 +52,14 @@ class CommandStorage(CommonModelMixin):
def is_valid(self):
if self.type_null_or_server:
return True
storage = jms_storage.get_log_storage(self.config)
return storage.ping()
if self.type not in TYPE_ENGINE_MAPPING:
logger.error(f'Command storage `{self.type}` not support')
return False
engine_mod = import_module(TYPE_ENGINE_MAPPING[self.type])
store = engine_mod.CommandStore(self.config)
return store.ping(timeout=3)
def is_use(self):
return Terminal.objects.filter(command_storage=self.name, is_deleted=False).exists()