mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			194 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			194 lines
		
	
	
		
			8.3 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
#
 | 
						|
 | 
						|
from smtplib import SMTPSenderRefused
 | 
						|
from rest_framework import generics
 | 
						|
from rest_framework.views import Response, APIView
 | 
						|
from rest_framework.permissions import AllowAny
 | 
						|
from django.conf import settings
 | 
						|
from django.core.mail import send_mail, get_connection
 | 
						|
from django.utils.translation import ugettext_lazy as _
 | 
						|
from django.templatetags.static import static
 | 
						|
 | 
						|
from jumpserver.utils import has_valid_xpack_license
 | 
						|
from common.permissions import IsSuperUser
 | 
						|
from common.utils import get_logger
 | 
						|
from .. import serializers
 | 
						|
from ..models import Setting
 | 
						|
 | 
						|
logger = get_logger(__file__)
 | 
						|
 | 
						|
 | 
						|
class MailTestingAPI(APIView):
 | 
						|
    permission_classes = (IsSuperUser,)
 | 
						|
    serializer_class = serializers.MailTestSerializer
 | 
						|
    success_message = _("Test mail sent to {}, please check")
 | 
						|
 | 
						|
    def post(self, request):
 | 
						|
        serializer = self.serializer_class(data=request.data)
 | 
						|
        serializer.is_valid(raise_exception=True)
 | 
						|
 | 
						|
        email_host = serializer.validated_data['EMAIL_HOST']
 | 
						|
        email_port = serializer.validated_data['EMAIL_PORT']
 | 
						|
        email_host_user = serializer.validated_data["EMAIL_HOST_USER"]
 | 
						|
        email_host_password = serializer.validated_data['EMAIL_HOST_PASSWORD']
 | 
						|
        email_from = serializer.validated_data["EMAIL_FROM"]
 | 
						|
        email_recipient = serializer.validated_data["EMAIL_RECIPIENT"]
 | 
						|
        email_use_ssl = serializer.validated_data['EMAIL_USE_SSL']
 | 
						|
        email_use_tls = serializer.validated_data['EMAIL_USE_TLS']
 | 
						|
 | 
						|
        # 设置 settings 的值,会导致动态配置在当前进程失效
 | 
						|
        # for k, v in serializer.validated_data.items():
 | 
						|
        #     if k.startswith('EMAIL'):
 | 
						|
        #         setattr(settings, k, v)
 | 
						|
        try:
 | 
						|
            subject = "Test"
 | 
						|
            message = "Test smtp setting"
 | 
						|
            email_from = email_from or email_host_user
 | 
						|
            email_recipient = email_recipient or email_from
 | 
						|
            connection = get_connection(
 | 
						|
                host=email_host, port=email_port,
 | 
						|
                username=email_host_user, password=email_host_password,
 | 
						|
                use_tls=email_use_tls, use_ssl=email_use_ssl,
 | 
						|
            )
 | 
						|
            send_mail(
 | 
						|
                subject, message, email_from, [email_recipient],
 | 
						|
                connection=connection
 | 
						|
            )
 | 
						|
        except SMTPSenderRefused as e:
 | 
						|
            error = e.smtp_error
 | 
						|
            if isinstance(error, bytes):
 | 
						|
                for coding in ('gbk', 'utf8'):
 | 
						|
                    try:
 | 
						|
                        error = error.decode(coding)
 | 
						|
                    except UnicodeDecodeError:
 | 
						|
                        continue
 | 
						|
                    else:
 | 
						|
                        break
 | 
						|
            return Response({"error": str(error)}, status=400)
 | 
						|
        except Exception as e:
 | 
						|
            logger.error(e)
 | 
						|
            return Response({"error": str(e)}, status=400)
 | 
						|
        return Response({"msg": self.success_message.format(email_recipient)})
 | 
						|
 | 
						|
 | 
						|
class PublicSettingApi(generics.RetrieveAPIView):
 | 
						|
    permission_classes = (AllowAny,)
 | 
						|
    serializer_class = serializers.PublicSettingSerializer
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_logo_urls():
 | 
						|
        logo_urls = {
 | 
						|
            'logo_logout': static('img/logo.png'),
 | 
						|
            'logo_index': static('img/logo_text.png'),
 | 
						|
            'login_image': static('img/login_image.jpg'),
 | 
						|
            'favicon': static('img/facio.ico')
 | 
						|
        }
 | 
						|
        if not settings.XPACK_ENABLED:
 | 
						|
            return logo_urls
 | 
						|
        from xpack.plugins.interface.models import Interface
 | 
						|
        obj = Interface.interface()
 | 
						|
        if not obj:
 | 
						|
            return logo_urls
 | 
						|
        for attr in ['logo_logout', 'logo_index', 'login_image', 'favicon']:
 | 
						|
            if getattr(obj, attr, '') and getattr(obj, attr).url:
 | 
						|
                logo_urls.update({attr: getattr(obj, attr).url})
 | 
						|
        return logo_urls
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_login_title():
 | 
						|
        default_title = _('Welcome to the JumpServer open source Bastion Host')
 | 
						|
        if not settings.XPACK_ENABLED:
 | 
						|
            return default_title
 | 
						|
        from xpack.plugins.interface.models import Interface
 | 
						|
        return Interface.get_login_title()
 | 
						|
 | 
						|
    def get_object(self):
 | 
						|
        instance = {
 | 
						|
            "data": {
 | 
						|
                "WINDOWS_SKIP_ALL_MANUAL_PASSWORD": settings.WINDOWS_SKIP_ALL_MANUAL_PASSWORD,
 | 
						|
                "SECURITY_MAX_IDLE_TIME": settings.SECURITY_MAX_IDLE_TIME,
 | 
						|
                "XPACK_ENABLED": settings.XPACK_ENABLED,
 | 
						|
                "LOGIN_CONFIRM_ENABLE": settings.LOGIN_CONFIRM_ENABLE,
 | 
						|
                "SECURITY_VIEW_AUTH_NEED_MFA": settings.SECURITY_VIEW_AUTH_NEED_MFA,
 | 
						|
                "SECURITY_MFA_VERIFY_TTL": settings.SECURITY_MFA_VERIFY_TTL,
 | 
						|
                "OLD_PASSWORD_HISTORY_LIMIT_COUNT": settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT,
 | 
						|
                "SECURITY_COMMAND_EXECUTION": settings.SECURITY_COMMAND_EXECUTION,
 | 
						|
                "SECURITY_PASSWORD_EXPIRATION_TIME": settings.SECURITY_PASSWORD_EXPIRATION_TIME,
 | 
						|
                "SECURITY_LUNA_REMEMBER_AUTH": settings.SECURITY_LUNA_REMEMBER_AUTH,
 | 
						|
                "XPACK_LICENSE_IS_VALID": has_valid_xpack_license(),
 | 
						|
                "LOGIN_TITLE": self.get_login_title(),
 | 
						|
                "LOGO_URLS": self.get_logo_urls(),
 | 
						|
                "TICKETS_ENABLED": settings.TICKETS_ENABLED,
 | 
						|
                "PASSWORD_RULE": {
 | 
						|
                    'SECURITY_PASSWORD_MIN_LENGTH': settings.SECURITY_PASSWORD_MIN_LENGTH,
 | 
						|
                    'SECURITY_ADMIN_USER_PASSWORD_MIN_LENGTH': settings.SECURITY_ADMIN_USER_PASSWORD_MIN_LENGTH,
 | 
						|
                    'SECURITY_PASSWORD_UPPER_CASE': settings.SECURITY_PASSWORD_UPPER_CASE,
 | 
						|
                    'SECURITY_PASSWORD_LOWER_CASE': settings.SECURITY_PASSWORD_LOWER_CASE,
 | 
						|
                    'SECURITY_PASSWORD_NUMBER': settings.SECURITY_PASSWORD_NUMBER,
 | 
						|
                    'SECURITY_PASSWORD_SPECIAL_CHAR': settings.SECURITY_PASSWORD_SPECIAL_CHAR,
 | 
						|
                },
 | 
						|
                "AUTH_WECOM": settings.AUTH_WECOM,
 | 
						|
                "AUTH_DINGTALK": settings.AUTH_DINGTALK,
 | 
						|
                "AUTH_FEISHU": settings.AUTH_FEISHU,
 | 
						|
                'SECURITY_WATERMARK_ENABLED': settings.SECURITY_WATERMARK_ENABLED,
 | 
						|
                'SECURITY_SESSION_SHARE': settings.SECURITY_SESSION_SHARE
 | 
						|
            }
 | 
						|
        }
 | 
						|
        return instance
 | 
						|
 | 
						|
 | 
						|
class SettingsApi(generics.RetrieveUpdateAPIView):
 | 
						|
    permission_classes = (IsSuperUser,)
 | 
						|
    serializer_class_mapper = {
 | 
						|
        'all': serializers.SettingsSerializer,
 | 
						|
        'basic': serializers.BasicSettingSerializer,
 | 
						|
        'terminal': serializers.TerminalSettingSerializer,
 | 
						|
        'security': serializers.SecuritySettingSerializer,
 | 
						|
        'ldap': serializers.LDAPSettingSerializer,
 | 
						|
        'email': serializers.EmailSettingSerializer,
 | 
						|
        'email_content': serializers.EmailContentSettingSerializer,
 | 
						|
        'wecom': serializers.WeComSettingSerializer,
 | 
						|
        'dingtalk': serializers.DingTalkSettingSerializer,
 | 
						|
        'feishu': serializers.FeiShuSettingSerializer,
 | 
						|
    }
 | 
						|
 | 
						|
    def get_serializer_class(self):
 | 
						|
        category = self.request.query_params.get('category', serializers.BasicSettingSerializer)
 | 
						|
        return self.serializer_class_mapper.get(category, serializers.BasicSettingSerializer)
 | 
						|
 | 
						|
    def get_fields(self):
 | 
						|
        serializer = self.get_serializer_class()()
 | 
						|
        fields = serializer.get_fields()
 | 
						|
        return fields
 | 
						|
 | 
						|
    def get_object(self):
 | 
						|
        items = self.get_fields().keys()
 | 
						|
        obj = {item: getattr(settings, item) for item in items}
 | 
						|
        return obj
 | 
						|
 | 
						|
    def parse_serializer_data(self, serializer):
 | 
						|
        data = []
 | 
						|
        fields = self.get_fields()
 | 
						|
        encrypted_items = [name for name, field in fields.items() if field.write_only]
 | 
						|
        category = self.request.query_params.get('category', '')
 | 
						|
        for name, value in serializer.validated_data.items():
 | 
						|
            encrypted = name in encrypted_items
 | 
						|
            if encrypted and value in ['', None]:
 | 
						|
                continue
 | 
						|
            data.append({
 | 
						|
                'name': name, 'value': value,
 | 
						|
                'encrypted': encrypted, 'category': category
 | 
						|
            })
 | 
						|
        return data
 | 
						|
 | 
						|
    def perform_update(self, serializer):
 | 
						|
        settings_items = self.parse_serializer_data(serializer)
 | 
						|
        serializer_data = getattr(serializer, 'data', {})
 | 
						|
        for item in settings_items:
 | 
						|
            changed, setting = Setting.update_or_create(**item)
 | 
						|
            if not changed:
 | 
						|
                continue
 | 
						|
            serializer_data[setting.name] = setting.cleaned_value
 | 
						|
        setattr(serializer, '_data', serializer_data)
 |