jumpserver/apps/authentication/serializers/connect_token_secret.py

164 lines
6.1 KiB
Python

from django.utils.translation import gettext_lazy as _
from rest_framework import serializers
from accounts.const import SecretType
from accounts.models import Account
from acls.models import CommandGroup, CommandFilterACL
from assets.models import Asset, Platform, Gateway, Domain
from assets.serializers.asset import AssetProtocolsSerializer
from assets.serializers.platform import PlatformSerializer
from common.serializers.fields import LabeledChoiceField
from common.serializers.fields import ObjectRelatedField
from orgs.mixins.serializers import OrgResourceModelSerializerMixin
from perms.serializers.permission import ActionChoicesField
from users.models import User
from ..models import ConnectionToken
__all__ = [
'ConnectionTokenSecretSerializer', 'ConnectTokenAppletOptionSerializer'
]
class _ConnectionTokenUserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'name', 'username', 'email']
class _ConnectionTokenAssetSerializer(serializers.ModelSerializer):
protocols = AssetProtocolsSerializer(many=True, required=False, label=_('Protocols'))
info = serializers.DictField()
class Meta:
model = Asset
fields = [
'id', 'name', 'address', 'protocols', 'category',
'type', 'org_id', 'info', 'secret_info', 'spec_info'
]
class _SimpleAccountSerializer(serializers.ModelSerializer):
secret_type = LabeledChoiceField(choices=SecretType.choices, required=False, label=_('Secret type'))
class Meta:
model = Account
fields = ['name', 'username', 'secret_type', 'secret']
class _ConnectionTokenAccountSerializer(serializers.ModelSerializer):
su_from = serializers.SerializerMethodField(label=_('Su from'))
secret_type = LabeledChoiceField(choices=SecretType.choices, required=False, label=_('Secret type'))
class Meta:
model = Account
fields = [
'id', 'name', 'username', 'secret_type',
'secret', 'su_from', 'privileged'
]
@staticmethod
def get_su_from(account):
if not hasattr(account, 'asset'):
return {}
su_enabled = account.asset.platform.su_enabled
su_from = account.su_from
if not su_from or not su_enabled:
return
return _SimpleAccountSerializer(su_from).data
class _ConnectionTokenGatewaySerializer(serializers.ModelSerializer):
account = _SimpleAccountSerializer(
required=False, source='select_account', read_only=True
)
protocols = AssetProtocolsSerializer(many=True, required=False, label=_('Protocols'))
class Meta:
model = Gateway
fields = [
'id', 'name', 'address', 'protocols', 'account'
]
class _ConnectionTokenCommandFilterACLSerializer(serializers.ModelSerializer):
command_groups = ObjectRelatedField(
many=True, required=False, queryset=CommandGroup.objects,
attrs=('id', 'name', 'type', 'content', 'ignore_case', 'pattern'),
label=_('Command group')
)
reviewers = ObjectRelatedField(
many=True, queryset=User.objects, label=_("Reviewers"), required=False
)
class Meta:
model = CommandFilterACL
fields = [
'id', 'name', 'command_groups', 'action',
'reviewers', 'priority', 'is_active'
]
class _ConnectionTokenPlatformSerializer(PlatformSerializer):
class Meta(PlatformSerializer.Meta):
model = Platform
def get_field_names(self, declared_fields, info):
names = super().get_field_names(declared_fields, info)
names = [n for n in names if n not in ['automation']]
return names
class _ConnectionTokenConnectMethodSerializer(serializers.Serializer):
name = serializers.CharField(label=_('Name'))
protocol = serializers.CharField(label=_('Protocol'))
os = serializers.CharField(label=_('OS'))
is_builtin = serializers.BooleanField(label=_('Is builtin'))
is_active = serializers.BooleanField(label=_('Is active'))
platform = _ConnectionTokenPlatformSerializer(label=_('Platform'))
action = ActionChoicesField(label=_('Action'))
options = serializers.JSONField(label=_('Options'))
class _ConnectTokenConnectMethodSerializer(serializers.Serializer):
label = serializers.CharField(label=_('Label'))
value = serializers.CharField(label=_('Value'))
type = serializers.CharField(label=_('Type'))
component = serializers.CharField(label=_('Component'))
class ConnectionTokenSecretSerializer(OrgResourceModelSerializerMixin):
user = _ConnectionTokenUserSerializer(read_only=True)
asset = _ConnectionTokenAssetSerializer(read_only=True)
account = _ConnectionTokenAccountSerializer(read_only=True, source='account_object')
gateway = _ConnectionTokenGatewaySerializer(read_only=True)
platform = _ConnectionTokenPlatformSerializer(read_only=True)
domain = ObjectRelatedField(queryset=Domain.objects, required=False, label=_('Domain'))
command_filter_acls = _ConnectionTokenCommandFilterACLSerializer(read_only=True, many=True)
expire_now = serializers.BooleanField(label=_('Expired now'), write_only=True, default=True)
connect_method = _ConnectTokenConnectMethodSerializer(read_only=True, source='connect_method_object')
connect_options = serializers.JSONField(read_only=True)
actions = ActionChoicesField()
expire_at = serializers.IntegerField()
class Meta:
model = ConnectionToken
fields = [
'id', 'value', 'user', 'asset', 'account',
'platform', 'command_filter_acls', 'protocol',
'domain', 'gateway', 'actions', 'expire_at',
'from_ticket', 'expire_now', 'connect_method',
'connect_options',
]
extra_kwargs = {
'value': {'read_only': True},
}
class ConnectTokenAppletOptionSerializer(serializers.Serializer):
id = serializers.CharField(label=_('ID'))
applet = ObjectRelatedField(read_only=True)
host = _ConnectionTokenAssetSerializer(read_only=True)
account = _ConnectionTokenAccountSerializer(read_only=True)
gateway = _ConnectionTokenGatewaySerializer(read_only=True)
remote_app_option = serializers.JSONField(read_only=True)