jumpserver/apps/assets/serializers/base.py

73 lines
2.5 KiB
Python
Raw Normal View History

# -*- coding: utf-8 -*-
#
2021-12-30 02:47:46 +00:00
from io import StringIO
2022-01-27 07:06:15 +00:00
from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
2021-12-30 02:47:46 +00:00
from common.utils import ssh_pubkey_gen, ssh_private_key_gen, validate_ssh_private_key
from common.drf.fields import EncryptedField
from .utils import validate_password_for_ansible
2022-09-06 11:57:03 +00:00
class AuthValidateMixin(serializers.Serializer):
2022-05-16 09:28:02 +00:00
password = EncryptedField(
2022-09-06 11:57:03 +00:00
label=_('Password'), required=False, allow_blank=True, allow_null=True,
max_length=1024, validators=[validate_password_for_ansible]
2022-05-16 09:28:02 +00:00
)
private_key = EncryptedField(
2022-09-06 11:57:03 +00:00
label=_('SSH private key'), required=False, allow_blank=True,
allow_null=True, max_length=16384
2022-05-16 09:28:02 +00:00
)
2021-12-30 02:47:46 +00:00
passphrase = serializers.CharField(
allow_blank=True, allow_null=True, required=False, max_length=512,
write_only=True, label=_('Key password')
)
def validate_private_key(self, private_key):
if not private_key:
return
2021-12-30 02:47:46 +00:00
passphrase = self.initial_data.get('passphrase')
passphrase = passphrase if passphrase else None
2021-12-30 02:47:46 +00:00
valid = validate_ssh_private_key(private_key, password=passphrase)
if not valid:
2021-12-30 02:47:46 +00:00
raise serializers.ValidationError(_("private key invalid or passphrase error"))
private_key = ssh_private_key_gen(private_key, password=passphrase)
string_io = StringIO()
private_key.write_private_key(string_io)
private_key = string_io.getvalue()
return private_key
@staticmethod
def clean_auth_fields(validated_data):
for field in ('password', 'private_key', 'public_key'):
value = validated_data.get(field)
if not value:
validated_data.pop(field, None)
2021-12-30 02:47:46 +00:00
validated_data.pop('passphrase', None)
2022-08-22 10:48:07 +00:00
@staticmethod
def _validate_gen_key(attrs):
2022-08-19 10:49:00 +00:00
private_key = attrs.get('private_key')
if not private_key:
return attrs
password = attrs.get('passphrase')
username = attrs.get('username')
public_key = ssh_pubkey_gen(private_key, password=password, username=username)
attrs['public_key'] = public_key
return attrs
2022-09-06 11:57:03 +00:00
def validate(self, attrs):
attrs = self._validate_gen_key(attrs)
return super().validate(attrs)
def create(self, validated_data):
self.clean_auth_fields(validated_data)
return super().create(validated_data)
def update(self, instance, validated_data):
self.clean_auth_fields(validated_data)
return super().update(instance, validated_data)