mirror of https://github.com/jumpserver/jumpserver
188 lines
7.9 KiB
Python
188 lines
7.9 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
|
|
from smtplib import SMTPSenderRefused
|
|
from rest_framework import generics
|
|
from rest_framework.views import Response, APIView
|
|
from rest_framework.permissions import AllowAny
|
|
from django.conf import settings
|
|
from django.core.mail import send_mail, get_connection
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.templatetags.static import static
|
|
|
|
from jumpserver.utils import has_valid_xpack_license
|
|
from common.permissions import IsSuperUser
|
|
from common.utils import get_logger
|
|
from .. import serializers
|
|
from ..models import Setting
|
|
|
|
logger = get_logger(__file__)
|
|
|
|
|
|
class MailTestingAPI(APIView):
|
|
permission_classes = (IsSuperUser,)
|
|
serializer_class = serializers.MailTestSerializer
|
|
success_message = _("Test mail sent to {}, please check")
|
|
|
|
def post(self, request):
|
|
serializer = self.serializer_class(data=request.data)
|
|
serializer.is_valid(raise_exception=True)
|
|
|
|
email_host = serializer.validated_data['EMAIL_HOST']
|
|
email_port = serializer.validated_data['EMAIL_PORT']
|
|
email_host_user = serializer.validated_data["EMAIL_HOST_USER"]
|
|
email_host_password = serializer.validated_data['EMAIL_HOST_PASSWORD']
|
|
email_from = serializer.validated_data["EMAIL_FROM"]
|
|
email_recipient = serializer.validated_data["EMAIL_RECIPIENT"]
|
|
email_use_ssl = serializer.validated_data['EMAIL_USE_SSL']
|
|
email_use_tls = serializer.validated_data['EMAIL_USE_TLS']
|
|
|
|
# 设置 settings 的值,会导致动态配置在当前进程失效
|
|
# for k, v in serializer.validated_data.items():
|
|
# if k.startswith('EMAIL'):
|
|
# setattr(settings, k, v)
|
|
try:
|
|
subject = "Test"
|
|
message = "Test smtp setting"
|
|
email_from = email_from or email_host_user
|
|
email_recipient = email_recipient or email_from
|
|
connection = get_connection(
|
|
host=email_host, port=email_port,
|
|
username=email_host_user, password=email_host_password,
|
|
use_tls=email_use_tls, use_ssl=email_use_ssl,
|
|
)
|
|
send_mail(
|
|
subject, message, email_from, [email_recipient],
|
|
connection=connection
|
|
)
|
|
except SMTPSenderRefused as e:
|
|
error = e.smtp_error
|
|
if isinstance(error, bytes):
|
|
for coding in ('gbk', 'utf8'):
|
|
try:
|
|
error = error.decode(coding)
|
|
except UnicodeDecodeError:
|
|
continue
|
|
else:
|
|
break
|
|
return Response({"error": str(error)}, status=400)
|
|
except Exception as e:
|
|
logger.error(e)
|
|
return Response({"error": str(e)}, status=400)
|
|
return Response({"msg": self.success_message.format(email_recipient)})
|
|
|
|
|
|
class PublicSettingApi(generics.RetrieveAPIView):
|
|
permission_classes = (AllowAny,)
|
|
serializer_class = serializers.PublicSettingSerializer
|
|
|
|
@staticmethod
|
|
def get_logo_urls():
|
|
logo_urls = {
|
|
'logo_logout': static('img/logo.png'),
|
|
'logo_index': static('img/logo_text.png'),
|
|
'login_image': static('img/login_image.jpg'),
|
|
'favicon': static('img/facio.ico')
|
|
}
|
|
if not settings.XPACK_ENABLED:
|
|
return logo_urls
|
|
from xpack.plugins.interface.models import Interface
|
|
obj = Interface.interface()
|
|
if not obj:
|
|
return logo_urls
|
|
for attr in ['logo_logout', 'logo_index', 'login_image', 'favicon']:
|
|
if getattr(obj, attr, '') and getattr(obj, attr).url:
|
|
logo_urls.update({attr: getattr(obj, attr).url})
|
|
return logo_urls
|
|
|
|
@staticmethod
|
|
def get_login_title():
|
|
default_title = _('Welcome to the JumpServer open source Bastion Host')
|
|
if not settings.XPACK_ENABLED:
|
|
return default_title
|
|
from xpack.plugins.interface.models import Interface
|
|
return Interface.get_login_title()
|
|
|
|
def get_object(self):
|
|
instance = {
|
|
"data": {
|
|
"WINDOWS_SKIP_ALL_MANUAL_PASSWORD": settings.WINDOWS_SKIP_ALL_MANUAL_PASSWORD,
|
|
"SECURITY_MAX_IDLE_TIME": settings.SECURITY_MAX_IDLE_TIME,
|
|
"XPACK_ENABLED": settings.XPACK_ENABLED,
|
|
"LOGIN_CONFIRM_ENABLE": settings.LOGIN_CONFIRM_ENABLE,
|
|
"SECURITY_VIEW_AUTH_NEED_MFA": settings.SECURITY_VIEW_AUTH_NEED_MFA,
|
|
"SECURITY_MFA_VERIFY_TTL": settings.SECURITY_MFA_VERIFY_TTL,
|
|
"OLD_PASSWORD_HISTORY_LIMIT_COUNT": settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT,
|
|
"SECURITY_COMMAND_EXECUTION": settings.SECURITY_COMMAND_EXECUTION,
|
|
"SECURITY_PASSWORD_EXPIRATION_TIME": settings.SECURITY_PASSWORD_EXPIRATION_TIME,
|
|
"SECURITY_LUNA_REMEMBER_AUTH": settings.SECURITY_LUNA_REMEMBER_AUTH,
|
|
"XPACK_LICENSE_IS_VALID": has_valid_xpack_license(),
|
|
"LOGIN_TITLE": self.get_login_title(),
|
|
"LOGO_URLS": self.get_logo_urls(),
|
|
"TICKETS_ENABLED": settings.TICKETS_ENABLED,
|
|
"PASSWORD_RULE": {
|
|
'SECURITY_PASSWORD_MIN_LENGTH': settings.SECURITY_PASSWORD_MIN_LENGTH,
|
|
'SECURITY_PASSWORD_UPPER_CASE': settings.SECURITY_PASSWORD_UPPER_CASE,
|
|
'SECURITY_PASSWORD_LOWER_CASE': settings.SECURITY_PASSWORD_LOWER_CASE,
|
|
'SECURITY_PASSWORD_NUMBER': settings.SECURITY_PASSWORD_NUMBER,
|
|
'SECURITY_PASSWORD_SPECIAL_CHAR': settings.SECURITY_PASSWORD_SPECIAL_CHAR,
|
|
},
|
|
"AUTH_WECOM": settings.AUTH_WECOM,
|
|
"AUTH_DINGTALK": settings.AUTH_DINGTALK,
|
|
}
|
|
}
|
|
return instance
|
|
|
|
|
|
class SettingsApi(generics.RetrieveUpdateAPIView):
|
|
permission_classes = (IsSuperUser,)
|
|
serializer_class_mapper = {
|
|
'all': serializers.SettingsSerializer,
|
|
'basic': serializers.BasicSettingSerializer,
|
|
'terminal': serializers.TerminalSettingSerializer,
|
|
'security': serializers.SecuritySettingSerializer,
|
|
'ldap': serializers.LDAPSettingSerializer,
|
|
'email': serializers.EmailSettingSerializer,
|
|
'email_content': serializers.EmailContentSettingSerializer,
|
|
'wecom': serializers.WeComSettingSerializer,
|
|
'dingtalk': serializers.DingTalkSettingSerializer,
|
|
}
|
|
|
|
def get_serializer_class(self):
|
|
category = self.request.query_params.get('category', serializers.BasicSettingSerializer)
|
|
return self.serializer_class_mapper.get(category, serializers.BasicSettingSerializer)
|
|
|
|
def get_fields(self):
|
|
serializer = self.get_serializer_class()()
|
|
fields = serializer.get_fields()
|
|
return fields
|
|
|
|
def get_object(self):
|
|
items = self.get_fields().keys()
|
|
return {item: getattr(settings, item) for item in items}
|
|
|
|
def parse_serializer_data(self, serializer):
|
|
data = []
|
|
fields = self.get_fields()
|
|
encrypted_items = [name for name, field in fields.items() if field.write_only]
|
|
category = self.request.query_params.get('category', '')
|
|
for name, value in serializer.validated_data.items():
|
|
encrypted = name in encrypted_items
|
|
if encrypted and value in ['', None]:
|
|
continue
|
|
data.append({
|
|
'name': name, 'value': value,
|
|
'encrypted': encrypted, 'category': category
|
|
})
|
|
return data
|
|
|
|
def perform_update(self, serializer):
|
|
settings_items = self.parse_serializer_data(serializer)
|
|
serializer_data = getattr(serializer, 'data', {})
|
|
for item in settings_items:
|
|
changed, setting = Setting.update_or_create(**item)
|
|
if not changed:
|
|
continue
|
|
serializer_data[setting.name] = setting.cleaned_value
|
|
setattr(serializer, '_data', serializer_data)
|