mirror of https://github.com/jumpserver/jumpserver
156 lines
5.7 KiB
Python
156 lines
5.7 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
from django.db.models import Max, Q, Subquery, OuterRef
|
|
from rest_framework import status, mixins
|
|
from rest_framework.decorators import action
|
|
from rest_framework.response import Response
|
|
|
|
from accounts import serializers
|
|
from accounts.const import AutomationTypes, ChangeSecretRecordStatusChoice
|
|
from accounts.filters import ChangeSecretRecordFilterSet
|
|
from accounts.models import ChangeSecretAutomation, ChangeSecretRecord
|
|
from accounts.tasks import execute_automation_record_task
|
|
from authentication.permissions import UserConfirmation, ConfirmType
|
|
from orgs.mixins.api import OrgBulkModelViewSet, OrgGenericViewSet
|
|
from rbac.permissions import RBACPermission
|
|
from .base import (
|
|
AutomationAssetsListApi, AutomationRemoveAssetApi, AutomationAddAssetApi,
|
|
AutomationNodeAddRemoveApi, AutomationExecutionViewSet
|
|
)
|
|
|
|
__all__ = [
|
|
'ChangeSecretAutomationViewSet', 'ChangeSecretRecordViewSet',
|
|
'ChangSecretExecutionViewSet', 'ChangSecretAssetsListApi',
|
|
'ChangSecretRemoveAssetApi', 'ChangSecretAddAssetApi',
|
|
'ChangSecretNodeAddRemoveApi'
|
|
]
|
|
|
|
|
|
class ChangeSecretAutomationViewSet(OrgBulkModelViewSet):
|
|
model = ChangeSecretAutomation
|
|
filterset_fields = ('name', 'secret_type', 'secret_strategy')
|
|
search_fields = filterset_fields
|
|
serializer_class = serializers.ChangeSecretAutomationSerializer
|
|
|
|
|
|
class ChangeSecretRecordViewSet(mixins.ListModelMixin, OrgGenericViewSet):
|
|
filterset_class = ChangeSecretRecordFilterSet
|
|
search_fields = ('asset__address', 'account_username')
|
|
ordering_fields = ('date_finished',)
|
|
tp = AutomationTypes.change_secret
|
|
serializer_classes = {
|
|
'default': serializers.ChangeSecretRecordSerializer,
|
|
'secret': serializers.ChangeSecretRecordViewSecretSerializer,
|
|
}
|
|
rbac_perms = {
|
|
'execute': 'accounts.add_changesecretexecution',
|
|
'secret': 'accounts.view_changesecretrecord',
|
|
'dashboard': 'accounts.view_changesecretrecord',
|
|
'ignore_fail': 'accounts.view_changesecretrecord',
|
|
}
|
|
|
|
def get_permissions(self):
|
|
if self.action == 'secret':
|
|
self.permission_classes = [
|
|
RBACPermission,
|
|
UserConfirmation.require(ConfirmType.MFA)
|
|
]
|
|
return super().get_permissions()
|
|
|
|
def filter_queryset(self, queryset):
|
|
queryset = super().filter_queryset(queryset)
|
|
|
|
if self.action == 'dashboard':
|
|
return self.get_dashboard_queryset(queryset)
|
|
return queryset
|
|
|
|
@staticmethod
|
|
def get_dashboard_queryset(queryset):
|
|
recent_dates = queryset.values('account').annotate(
|
|
max_date_finished=Max('date_finished')
|
|
)
|
|
|
|
recent_success_accounts = queryset.filter(
|
|
account=OuterRef('account'),
|
|
date_finished=Subquery(
|
|
recent_dates.filter(account=OuterRef('account')).values('max_date_finished')[:1]
|
|
)
|
|
).filter(Q(status=ChangeSecretRecordStatusChoice.success) | Q(ignore_fail=True))
|
|
|
|
failed_records = queryset.filter(
|
|
~Q(account__in=Subquery(recent_success_accounts.values('account'))),
|
|
status=ChangeSecretRecordStatusChoice.failed
|
|
)
|
|
return failed_records
|
|
|
|
def get_queryset(self):
|
|
qs = ChangeSecretRecord.get_valid_records()
|
|
return qs.filter(
|
|
execution__automation__type=self.tp
|
|
)
|
|
|
|
@action(methods=['post'], detail=False, url_path='execute')
|
|
def execute(self, request, *args, **kwargs):
|
|
record_ids = request.data.get('record_ids')
|
|
records = self.get_queryset().filter(id__in=record_ids)
|
|
execution_count = records.values_list('execution_id', flat=True).distinct().count()
|
|
if execution_count != 1:
|
|
return Response(
|
|
{'detail': 'Only one execution is allowed to execute'},
|
|
status=status.HTTP_400_BAD_REQUEST
|
|
)
|
|
task = execute_automation_record_task.delay(record_ids, self.tp)
|
|
return Response({'task': task.id}, status=status.HTTP_200_OK)
|
|
|
|
@action(methods=['get'], detail=True, url_path='secret')
|
|
def secret(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
serializer = self.get_serializer(instance)
|
|
return Response(serializer.data)
|
|
|
|
@action(methods=['get'], detail=False, url_path='dashboard')
|
|
def dashboard(self, request, *args, **kwargs):
|
|
return super().list(request, *args, **kwargs)
|
|
|
|
@action(methods=['patch'], detail=True, url_path='ignore-fail')
|
|
def ignore_fail(self, request, *args, **kwargs):
|
|
instance = self.get_object()
|
|
instance.ignore_fail = True
|
|
instance.save(update_fields=['ignore_fail'])
|
|
return Response(status=status.HTTP_200_OK)
|
|
|
|
|
|
class ChangSecretExecutionViewSet(AutomationExecutionViewSet):
|
|
rbac_perms = (
|
|
("list", "accounts.view_changesecretexecution"),
|
|
("retrieve", "accounts.view_changesecretexecution"),
|
|
("create", "accounts.add_changesecretexecution"),
|
|
("report", "accounts.view_changesecretexecution"),
|
|
)
|
|
|
|
tp = AutomationTypes.change_secret
|
|
|
|
def get_queryset(self):
|
|
queryset = super().get_queryset()
|
|
queryset = queryset.filter(automation__type=self.tp)
|
|
return queryset
|
|
|
|
|
|
class ChangSecretAssetsListApi(AutomationAssetsListApi):
|
|
model = ChangeSecretAutomation
|
|
|
|
|
|
class ChangSecretRemoveAssetApi(AutomationRemoveAssetApi):
|
|
model = ChangeSecretAutomation
|
|
serializer_class = serializers.ChangeSecretUpdateAssetSerializer
|
|
|
|
|
|
class ChangSecretAddAssetApi(AutomationAddAssetApi):
|
|
model = ChangeSecretAutomation
|
|
serializer_class = serializers.ChangeSecretUpdateAssetSerializer
|
|
|
|
|
|
class ChangSecretNodeAddRemoveApi(AutomationNodeAddRemoveApi):
|
|
model = ChangeSecretAutomation
|
|
serializer_class = serializers.ChangeSecretUpdateNodeSerializer
|