jumpserver/apps/accounts/api/account/application.py

85 lines
3.3 KiB
Python
Raw Normal View History

import os
2025-02-11 10:14:38 +00:00
from django.utils.translation import gettext_lazy as _, get_language
from django.conf import settings
from rest_framework.decorators import action
from rest_framework.response import Response
from accounts import serializers
2024-12-04 10:32:49 +00:00
from accounts.models import IntegrationApplication
from audits.models import IntegrationApplicationLog
from authentication.permissions import UserConfirmation, ConfirmType
from common.exceptions import JMSException
from common.permissions import IsValidUser
from common.utils import get_request_ip
from orgs.mixins.api import OrgBulkModelViewSet
from rbac.permissions import RBACPermission
2024-12-04 10:32:49 +00:00
class IntegrationApplicationViewSet(OrgBulkModelViewSet):
model = IntegrationApplication
search_fields = ('name', 'comment')
serializer_classes = {
2024-12-04 10:32:49 +00:00
'default': serializers.IntegrationApplicationSerializer,
'get_account_secret': serializers.IntegrationAccountSecretSerializer
}
rbac_perms = {
2024-12-04 10:32:49 +00:00
'get_once_secret': 'accounts.change_integrationapplication',
'get_account_secret': 'accounts.view_integrationapplication'
}
def read_file(self, path):
if os.path.exists(path):
with open(path, 'r', encoding='utf-8') as file:
return file.read()
return ''
@action(
['GET'], detail=False, url_path='sdks',
permission_classes=[IsValidUser]
)
def get_sdks_info(self, request, *args, **kwargs):
2025-02-11 10:14:38 +00:00
code_suffix_mapper = {
'python': 'py',
'java': 'java',
'go': 'go',
'node': 'js',
'curl': 'sh',
2025-02-11 10:14:38 +00:00
}
sdk_language = request.query_params.get('language','python')
sdk_path = os.path.join(settings.APPS_DIR, 'accounts', 'demos', sdk_language)
readme_path = os.path.join(sdk_path, f'readme.{get_language()}.md')
demo_path = os.path.join(sdk_path, f'demo.{code_suffix_mapper[sdk_language]}')
readme_content = self.read_file(readme_path)
demo_content = self.read_file(demo_path)
2025-02-11 10:14:38 +00:00
return Response(data={'readme': readme_content, 'code': demo_content})
@action(
['GET'], detail=True, url_path='secret',
permission_classes=[RBACPermission, UserConfirmation.require(ConfirmType.MFA)]
)
def get_once_secret(self, request, *args, **kwargs):
instance = self.get_object()
secret = instance.get_secret()
return Response(data={'id': instance.id, 'secret': secret})
@action(['GET'], detail=False, url_path='account-secret',
permission_classes=[RBACPermission])
def get_account_secret(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.query_params)
if not serializer.is_valid():
return Response({'error': serializer.errors}, status=400)
service = request.user
account = service.get_account(**serializer.data)
if not account:
msg = _('Account not found')
raise JMSException(code='Not found', detail='%s' % msg)
asset = account.asset
2024-12-04 10:32:49 +00:00
IntegrationApplicationLog.objects.create(
remote_addr=get_request_ip(request), service=service.name, service_id=service.id,
account=f'{account.name}({account.username})', asset=f'{asset.name}({asset.address})',
)
return Response(data={'id': request.user.id, 'secret': account.secret})