jumpserver/apps/accounts/api/account/application.py

75 lines
2.9 KiB
Python
Raw Normal View History

import os
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from django.utils import translation
from rest_framework.decorators import action
from rest_framework.response import Response
from accounts import serializers
2024-12-04 10:32:49 +00:00
from accounts.models import IntegrationApplication
from audits.models import IntegrationApplicationLog
from authentication.permissions import UserConfirmation, ConfirmType
from common.exceptions import JMSException
from common.permissions import IsValidUser
from common.utils import get_request_ip
from orgs.mixins.api import OrgBulkModelViewSet
from rbac.permissions import RBACPermission
2024-12-04 10:32:49 +00:00
class IntegrationApplicationViewSet(OrgBulkModelViewSet):
model = IntegrationApplication
search_fields = ('name', 'comment')
serializer_classes = {
2024-12-04 10:32:49 +00:00
'default': serializers.IntegrationApplicationSerializer,
'get_account_secret': serializers.IntegrationAccountSecretSerializer
}
rbac_perms = {
2024-12-04 10:32:49 +00:00
'get_once_secret': 'accounts.change_integrationapplication',
'get_account_secret': 'view_integrationapplication',
}
@action(
['GET'], detail=False, url_path='sdks',
permission_classes=[IsValidUser]
)
def get_sdks_info(self, request, *args, **kwargs):
readme = ''
sdk_language = self.request.query_params.get('language', 'python')
filename = f'readme.{translation.get_language()}.md'
readme_path = os.path.join(
settings.APPS_DIR, 'accounts', 'demos', sdk_language, filename
)
if os.path.exists(readme_path):
with open(readme_path, 'r') as f:
readme = f.read()
return Response(data={'readme': readme })
@action(
['GET'], detail=True, url_path='secret',
permission_classes=[RBACPermission, UserConfirmation.require(ConfirmType.MFA)]
)
def get_once_secret(self, request, *args, **kwargs):
instance = self.get_object()
secret = instance.get_secret()
return Response(data={'id': instance.id, 'secret': secret})
@action(['GET'], detail=False, url_path='account-secret')
def get_account_secret(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.query_params)
if not serializer.is_valid():
return Response({'error': serializer.errors}, status=400)
service = request.user
account = service.get_account(**serializer.data)
if not account:
msg = _('Account not found')
raise JMSException(code='Not found', detail='%s' % msg)
asset = account.asset
2024-12-04 10:32:49 +00:00
IntegrationApplicationLog.objects.create(
remote_addr=get_request_ip(request), service=service.name, service_id=service.id,
account=f'{account.name}({account.username})', asset=f'{asset.name}({asset.address})',
)
return Response(data={'id': request.user.id, 'secret': account.secret})