From 3cbce63c5445a8d50397b63c6771b17b8fbbb9d5 Mon Sep 17 00:00:00 2001 From: fit2bot <68588906+fit2bot@users.noreply.github.com> Date: Tue, 28 Jun 2022 17:39:13 +0800 Subject: [PATCH] =?UTF-8?q?perf:=20=E6=8B=86=E5=88=86=E7=99=BB=E5=BD=95=20?= =?UTF-8?q?View=20(#8502)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * perf: 拆分登录 View * perf: 修改 code Co-authored-by: ibuler --- .gitignore | 1 + apps/authentication/errors.py | 367 ------------------------- apps/authentication/errors/__init__.py | 4 + apps/authentication/errors/const.py | 67 +++++ apps/authentication/errors/failed.py | 167 +++++++++++ apps/authentication/errors/mfa.py | 38 +++ apps/authentication/errors/redirect.py | 109 ++++++++ apps/authentication/mixins.py | 4 +- apps/authentication/views/login.py | 203 +++++++------- 9 files changed, 490 insertions(+), 470 deletions(-) delete mode 100644 apps/authentication/errors.py create mode 100644 apps/authentication/errors/__init__.py create mode 100644 apps/authentication/errors/const.py create mode 100644 apps/authentication/errors/failed.py create mode 100644 apps/authentication/errors/mfa.py create mode 100644 apps/authentication/errors/redirect.py diff --git a/.gitignore b/.gitignore index 491009b9d..d4f2e57bf 100644 --- a/.gitignore +++ b/.gitignore @@ -34,6 +34,7 @@ celerybeat-schedule.db data/static docs/_build/ xpack +xpack.bak logs/* ### Vagrant ### .vagrant/ diff --git a/apps/authentication/errors.py b/apps/authentication/errors.py deleted file mode 100644 index 9a75b4691..000000000 --- a/apps/authentication/errors.py +++ /dev/null @@ -1,367 +0,0 @@ -# -*- coding: utf-8 -*- -# -from django.utils.translation import ugettext_lazy as _ -from django.urls import reverse -from django.conf import settings -from rest_framework import status - -from common.exceptions import JMSException -from .signals import post_auth_failed -from users.utils import LoginBlockUtil, MFABlockUtils, LoginIpBlockUtil - -reason_password_failed = 'password_failed' -reason_password_decrypt_failed = 'password_decrypt_failed' -reason_mfa_failed = 'mfa_failed' -reason_mfa_unset = 'mfa_unset' -reason_user_not_exist = 'user_not_exist' -reason_password_expired = 'password_expired' -reason_user_invalid = 'user_invalid' -reason_user_inactive = 'user_inactive' -reason_user_expired = 'user_expired' -reason_backend_not_match = 'backend_not_match' -reason_acl_not_allow = 'acl_not_allow' -only_local_users_are_allowed = 'only_local_users_are_allowed' - -reason_choices = { - reason_password_failed: _('Username/password check failed'), - reason_password_decrypt_failed: _('Password decrypt failed'), - reason_mfa_failed: _('MFA failed'), - reason_mfa_unset: _('MFA unset'), - reason_user_not_exist: _("Username does not exist"), - reason_password_expired: _("Password expired"), - reason_user_invalid: _('Disabled or expired'), - reason_user_inactive: _("This account is inactive."), - reason_user_expired: _("This account is expired"), - reason_backend_not_match: _("Auth backend not match"), - reason_acl_not_allow: _("ACL is not allowed"), - only_local_users_are_allowed: _("Only local users are allowed") -} -old_reason_choices = { - '0': '-', - '1': reason_choices[reason_password_failed], - '2': reason_choices[reason_mfa_failed], - '3': reason_choices[reason_user_not_exist], - '4': reason_choices[reason_password_expired], -} - -session_empty_msg = _("No session found, check your cookie") -invalid_login_msg = _( - "The username or password you entered is incorrect, " - "please enter it again. " - "You can also try {times_try} times " - "(The account will be temporarily locked for {block_time} minutes)" -) -block_user_login_msg = _( - "The account has been locked " - "(please contact admin to unlock it or try again after {} minutes)" -) -block_ip_login_msg = _( - "The ip has been locked " - "(please contact admin to unlock it or try again after {} minutes)" -) -block_mfa_msg = _( - "The account has been locked " - "(please contact admin to unlock it or try again after {} minutes)" -) -mfa_error_msg = _( - "{error}, " - "You can also try {times_try} times " - "(The account will be temporarily locked for {block_time} minutes)" -) -mfa_required_msg = _("MFA required") -mfa_unset_msg = _("MFA not set, please set it first") -login_confirm_required_msg = _("Login confirm required") -login_confirm_wait_msg = _("Wait login confirm ticket for accept") -login_confirm_error_msg = _("Login confirm ticket was {}") - - -class AuthFailedNeedLogMixin: - username = '' - request = None - error = '' - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - post_auth_failed.send( - sender=self.__class__, username=self.username, - request=self.request, reason=self.error - ) - - -class AuthFailedNeedBlockMixin: - username = '' - ip = '' - - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - LoginBlockUtil(self.username, self.ip).incr_failed_count() - - -class AuthFailedError(Exception): - username = '' - msg = '' - error = '' - request = None - ip = '' - - def __init__(self, **kwargs): - for k, v in kwargs.items(): - setattr(self, k, v) - - def as_data(self): - return { - 'error': self.error, - 'msg': self.msg, - } - - def __str__(self): - return str(self.msg) - - -class BlockGlobalIpLoginError(AuthFailedError): - error = 'block_global_ip_login' - - def __init__(self, username, ip, **kwargs): - self.msg = block_ip_login_msg.format(settings.SECURITY_LOGIN_IP_LIMIT_TIME) - LoginIpBlockUtil(ip).set_block_if_need() - super().__init__(username=username, ip=ip, **kwargs) - - -class CredentialError( - AuthFailedNeedLogMixin, AuthFailedNeedBlockMixin, BlockGlobalIpLoginError, AuthFailedError -): - def __init__(self, error, username, ip, request): - super().__init__(error=error, username=username, ip=ip, request=request) - util = LoginBlockUtil(username, ip) - times_remainder = util.get_remainder_times() - block_time = settings.SECURITY_LOGIN_LIMIT_TIME - - if times_remainder < 1: - self.msg = block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) - return - - default_msg = invalid_login_msg.format( - times_try=times_remainder, block_time=block_time - ) - if error == reason_password_failed: - self.msg = default_msg - else: - self.msg = reason_choices.get(error, default_msg) - - -class MFAFailedError(AuthFailedNeedLogMixin, AuthFailedError): - error = reason_mfa_failed - msg: str - - def __init__(self, username, request, ip, mfa_type, error): - super().__init__(username=username, request=request) - - util = MFABlockUtils(username, ip) - times_remainder = util.incr_failed_count() - block_time = settings.SECURITY_LOGIN_LIMIT_TIME - - if times_remainder: - self.msg = mfa_error_msg.format( - error=error, times_try=times_remainder, block_time=block_time - ) - else: - self.msg = block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) - - -class BlockMFAError(AuthFailedNeedLogMixin, AuthFailedError): - error = 'block_mfa' - - def __init__(self, username, request, ip): - self.msg = block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) - super().__init__(username=username, request=request, ip=ip) - - -class MFAUnsetError(Exception): - error = reason_mfa_unset - msg = mfa_unset_msg - - def __init__(self, user, request, url): - self.url = url - - -class BlockLoginError(AuthFailedNeedBlockMixin, AuthFailedError): - error = 'block_login' - - def __init__(self, username, ip): - self.msg = block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) - super().__init__(username=username, ip=ip) - - -class SessionEmptyError(AuthFailedError): - msg = session_empty_msg - error = 'session_empty' - - -class NeedMoreInfoError(Exception): - error = '' - msg = '' - - def __init__(self, error='', msg=''): - if error: - self.error = error - if msg: - self.msg = msg - - def as_data(self): - return { - 'error': self.error, - 'msg': self.msg, - } - - -class MFARequiredError(NeedMoreInfoError): - msg = mfa_required_msg - error = 'mfa_required' - - def __init__(self, error='', msg='', mfa_types=()): - super().__init__(error=error, msg=msg) - self.choices = mfa_types - - def as_data(self): - return { - 'error': self.error, - 'msg': self.msg, - 'data': { - 'choices': self.choices, - 'url': reverse('api-auth:mfa-challenge') - } - } - - -class ACLError(AuthFailedNeedLogMixin, AuthFailedError): - msg = reason_acl_not_allow - error = 'acl_error' - - def __init__(self, msg, **kwargs): - self.msg = msg - super().__init__(**kwargs) - - def as_data(self): - return { - "error": reason_acl_not_allow, - "msg": self.msg - } - - -class LoginIPNotAllowed(ACLError): - def __init__(self, username, request, **kwargs): - self.username = username - self.request = request - super().__init__(_("IP is not allowed"), **kwargs) - - -class TimePeriodNotAllowed(ACLError): - def __init__(self, username, request, **kwargs): - self.username = username - self.request = request - super().__init__(_("Time Period is not allowed"), **kwargs) - - -class LoginConfirmBaseError(NeedMoreInfoError): - def __init__(self, ticket_id, **kwargs): - self.ticket_id = ticket_id - super().__init__(**kwargs) - - def as_data(self): - return { - "error": self.error, - "msg": self.msg, - "data": { - "ticket_id": self.ticket_id - } - } - - -class LoginConfirmWaitError(LoginConfirmBaseError): - msg = login_confirm_wait_msg - error = 'login_confirm_wait' - - -class LoginConfirmOtherError(LoginConfirmBaseError): - error = 'login_confirm_error' - - def __init__(self, ticket_id, status): - msg = login_confirm_error_msg.format(status) - super().__init__(ticket_id=ticket_id, msg=msg) - - -class SSOAuthClosed(JMSException): - default_code = 'sso_auth_closed' - default_detail = _('SSO auth closed') - - -class PasswordTooSimple(JMSException): - default_code = 'passwd_too_simple' - default_detail = _('Your password is too simple, please change it for security') - - def __init__(self, url, *args, **kwargs): - super().__init__(*args, **kwargs) - self.url = url - - -class PasswordNeedUpdate(JMSException): - default_code = 'passwd_need_update' - default_detail = _('You should to change your password before login') - - def __init__(self, url, *args, **kwargs): - super().__init__(*args, **kwargs) - self.url = url - - -class PasswordRequireResetError(JMSException): - default_code = 'passwd_has_expired' - default_detail = _('Your password has expired, please reset before logging in') - - def __init__(self, url, *args, **kwargs): - super().__init__(*args, **kwargs) - self.url = url - - -class WeComCodeInvalid(JMSException): - default_code = 'wecom_code_invalid' - default_detail = 'Code invalid, can not get user info' - - -class WeComBindAlready(JMSException): - default_code = 'wecom_bind_already' - default_detail = 'WeCom already binded' - - -class WeComNotBound(JMSException): - default_code = 'wecom_not_bound' - default_detail = 'WeCom is not bound' - - -class DingTalkNotBound(JMSException): - default_code = 'dingtalk_not_bound' - default_detail = 'DingTalk is not bound' - - -class FeiShuNotBound(JMSException): - default_code = 'feishu_not_bound' - default_detail = 'FeiShu is not bound' - - -class PasswordInvalid(JMSException): - default_code = 'passwd_invalid' - default_detail = _('Your password is invalid') - - -class MFACodeRequiredError(AuthFailedError): - error = 'mfa_code_required' - msg = _("Please enter MFA code") - - -class SMSCodeRequiredError(AuthFailedError): - error = 'sms_code_required' - msg = _("Please enter SMS code") - - -class UserPhoneNotSet(AuthFailedError): - error = 'phone_not_set' - msg = _('Phone not set') diff --git a/apps/authentication/errors/__init__.py b/apps/authentication/errors/__init__.py new file mode 100644 index 000000000..1ab7bc8ae --- /dev/null +++ b/apps/authentication/errors/__init__.py @@ -0,0 +1,4 @@ +from .const import * +from .mfa import * +from .failed import * +from .redirect import * diff --git a/apps/authentication/errors/const.py b/apps/authentication/errors/const.py new file mode 100644 index 000000000..530bcf150 --- /dev/null +++ b/apps/authentication/errors/const.py @@ -0,0 +1,67 @@ +from django.utils.translation import gettext_lazy as _ + + +reason_password_failed = 'password_failed' +reason_password_decrypt_failed = 'password_decrypt_failed' +reason_mfa_failed = 'mfa_failed' +reason_mfa_unset = 'mfa_unset' +reason_user_not_exist = 'user_not_exist' +reason_password_expired = 'password_expired' +reason_user_invalid = 'user_invalid' +reason_user_inactive = 'user_inactive' +reason_user_expired = 'user_expired' +reason_backend_not_match = 'backend_not_match' +reason_acl_not_allow = 'acl_not_allow' +only_local_users_are_allowed = 'only_local_users_are_allowed' + +reason_choices = { + reason_password_failed: _('Username/password check failed'), + reason_password_decrypt_failed: _('Password decrypt failed'), + reason_mfa_failed: _('MFA failed'), + reason_mfa_unset: _('MFA unset'), + reason_user_not_exist: _("Username does not exist"), + reason_password_expired: _("Password expired"), + reason_user_invalid: _('Disabled or expired'), + reason_user_inactive: _("This account is inactive."), + reason_user_expired: _("This account is expired"), + reason_backend_not_match: _("Auth backend not match"), + reason_acl_not_allow: _("ACL is not allowed"), + only_local_users_are_allowed: _("Only local users are allowed") +} +old_reason_choices = { + '0': '-', + '1': reason_choices[reason_password_failed], + '2': reason_choices[reason_mfa_failed], + '3': reason_choices[reason_user_not_exist], + '4': reason_choices[reason_password_expired], +} + +session_empty_msg = _("No session found, check your cookie") +invalid_login_msg = _( + "The username or password you entered is incorrect, " + "please enter it again. " + "You can also try {times_try} times " + "(The account will be temporarily locked for {block_time} minutes)" +) +block_user_login_msg = _( + "The account has been locked " + "(please contact admin to unlock it or try again after {} minutes)" +) +block_ip_login_msg = _( + "The ip has been locked " + "(please contact admin to unlock it or try again after {} minutes)" +) +block_mfa_msg = _( + "The account has been locked " + "(please contact admin to unlock it or try again after {} minutes)" +) +mfa_error_msg = _( + "{error}, " + "You can also try {times_try} times " + "(The account will be temporarily locked for {block_time} minutes)" +) +mfa_required_msg = _("MFA required") +mfa_unset_msg = _("MFA not set, please set it first") +login_confirm_required_msg = _("Login confirm required") +login_confirm_wait_msg = _("Wait login confirm ticket for accept") +login_confirm_error_msg = _("Login confirm ticket was {}") diff --git a/apps/authentication/errors/failed.py b/apps/authentication/errors/failed.py new file mode 100644 index 000000000..118fd6d6e --- /dev/null +++ b/apps/authentication/errors/failed.py @@ -0,0 +1,167 @@ +# -*- coding: utf-8 -*- +# +from django.utils.translation import ugettext_lazy as _ +from django.conf import settings + +from users.utils import LoginBlockUtil, MFABlockUtils, LoginIpBlockUtil +from ..signals import post_auth_failed +from . import const + + +class AuthFailedNeedLogMixin: + username = '' + request = None + error = '' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + post_auth_failed.send( + sender=self.__class__, username=self.username, + request=self.request, reason=self.error + ) + + +class AuthFailedNeedBlockMixin: + username = '' + ip = '' + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + LoginBlockUtil(self.username, self.ip).incr_failed_count() + + +class AuthFailedError(Exception): + username = '' + msg = '' + error = '' + request = None + ip = '' + + def __init__(self, **kwargs): + for k, v in kwargs.items(): + setattr(self, k, v) + + def as_data(self): + return { + 'error': self.error, + 'msg': self.msg, + } + + def __str__(self): + return str(self.msg) + + +class BlockGlobalIpLoginError(AuthFailedError): + error = 'block_global_ip_login' + + def __init__(self, username, ip, **kwargs): + self.msg = const.block_ip_login_msg.format(settings.SECURITY_LOGIN_IP_LIMIT_TIME) + LoginIpBlockUtil(ip).set_block_if_need() + super().__init__(username=username, ip=ip, **kwargs) + + +class CredentialError( + AuthFailedNeedLogMixin, AuthFailedNeedBlockMixin, + BlockGlobalIpLoginError, AuthFailedError +): + def __init__(self, error, username, ip, request): + super().__init__(error=error, username=username, ip=ip, request=request) + util = LoginBlockUtil(username, ip) + times_remainder = util.get_remainder_times() + block_time = settings.SECURITY_LOGIN_LIMIT_TIME + + if times_remainder < 1: + self.msg = const.block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) + return + + default_msg = const.invalid_login_msg.format( + times_try=times_remainder, block_time=block_time + ) + if error == const.reason_password_failed: + self.msg = default_msg + else: + self.msg = const.reason_choices.get(error, default_msg) + + +class MFAFailedError(AuthFailedNeedLogMixin, AuthFailedError): + error = const.reason_mfa_failed + msg: str + + def __init__(self, username, request, ip, mfa_type, error): + super().__init__(username=username, request=request) + + util = MFABlockUtils(username, ip) + times_remainder = util.incr_failed_count() + block_time = settings.SECURITY_LOGIN_LIMIT_TIME + + if times_remainder: + self.msg = const.mfa_error_msg.format( + error=error, times_try=times_remainder, block_time=block_time + ) + else: + self.msg = const.block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) + + +class BlockMFAError(AuthFailedNeedLogMixin, AuthFailedError): + error = 'block_mfa' + + def __init__(self, username, request, ip): + self.msg = const.block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) + super().__init__(username=username, request=request, ip=ip) + + +class BlockLoginError(AuthFailedNeedBlockMixin, AuthFailedError): + error = 'block_login' + + def __init__(self, username, ip): + self.msg = const.block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME) + super().__init__(username=username, ip=ip) + + +class SessionEmptyError(AuthFailedError): + msg = const.session_empty_msg + error = 'session_empty' + + +class ACLError(AuthFailedNeedLogMixin, AuthFailedError): + msg = const.reason_acl_not_allow + error = 'acl_error' + + def __init__(self, msg, **kwargs): + self.msg = msg + super().__init__(**kwargs) + + def as_data(self): + return { + "error": const.reason_acl_not_allow, + "msg": self.msg + } + + +class LoginIPNotAllowed(ACLError): + def __init__(self, username, request, **kwargs): + self.username = username + self.request = request + super().__init__(_("IP is not allowed"), **kwargs) + + +class TimePeriodNotAllowed(ACLError): + def __init__(self, username, request, **kwargs): + self.username = username + self.request = request + super().__init__(_("Time Period is not allowed"), **kwargs) + + +class MFACodeRequiredError(AuthFailedError): + error = 'mfa_code_required' + msg = _("Please enter MFA code") + + +class SMSCodeRequiredError(AuthFailedError): + error = 'sms_code_required' + msg = _("Please enter SMS code") + + +class UserPhoneNotSet(AuthFailedError): + error = 'phone_not_set' + msg = _('Phone not set') diff --git a/apps/authentication/errors/mfa.py b/apps/authentication/errors/mfa.py new file mode 100644 index 000000000..4b7f5a57e --- /dev/null +++ b/apps/authentication/errors/mfa.py @@ -0,0 +1,38 @@ +from django.utils.translation import ugettext_lazy as _ + +from common.exceptions import JMSException + + +class SSOAuthClosed(JMSException): + default_code = 'sso_auth_closed' + default_detail = _('SSO auth closed') + + +class WeComCodeInvalid(JMSException): + default_code = 'wecom_code_invalid' + default_detail = 'Code invalid, can not get user info' + + +class WeComBindAlready(JMSException): + default_code = 'wecom_bind_already' + default_detail = 'WeCom already binded' + + +class WeComNotBound(JMSException): + default_code = 'wecom_not_bound' + default_detail = 'WeCom is not bound' + + +class DingTalkNotBound(JMSException): + default_code = 'dingtalk_not_bound' + default_detail = 'DingTalk is not bound' + + +class FeiShuNotBound(JMSException): + default_code = 'feishu_not_bound' + default_detail = 'FeiShu is not bound' + + +class PasswordInvalid(JMSException): + default_code = 'passwd_invalid' + default_detail = _('Your password is invalid') diff --git a/apps/authentication/errors/redirect.py b/apps/authentication/errors/redirect.py new file mode 100644 index 000000000..f41bda818 --- /dev/null +++ b/apps/authentication/errors/redirect.py @@ -0,0 +1,109 @@ +from django.utils.translation import ugettext_lazy as _ +from django.urls import reverse + +from common.exceptions import JMSException +from . import const + + +class NeedMoreInfoError(Exception): + error = '' + msg = '' + + def __init__(self, error='', msg=''): + if error: + self.error = error + if msg: + self.msg = msg + + def as_data(self): + return { + 'error': self.error, + 'msg': self.msg, + } + + +class NeedRedirectError(JMSException): + def __init__(self, url): + self.url = url + + +class MFARequiredError(NeedMoreInfoError): + msg = const.mfa_required_msg + error = 'mfa_required' + + def __init__(self, error='', msg='', mfa_types=()): + super().__init__(error=error, msg=msg) + self.choices = mfa_types + + def as_data(self): + return { + 'error': self.error, + 'msg': self.msg, + 'data': { + 'choices': self.choices, + 'url': reverse('api-auth:mfa-challenge') + } + } + + +class LoginConfirmBaseError(NeedMoreInfoError): + def __init__(self, ticket_id, **kwargs): + self.ticket_id = ticket_id + super().__init__(**kwargs) + + def as_data(self): + return { + "error": self.error, + "msg": self.msg, + "data": { + "ticket_id": self.ticket_id + } + } + + +class LoginConfirmWaitError(LoginConfirmBaseError): + msg = const.login_confirm_wait_msg + error = 'login_confirm_wait' + + +class LoginConfirmOtherError(LoginConfirmBaseError): + error = 'login_confirm_error' + + def __init__(self, ticket_id, status): + msg = const.login_confirm_error_msg.format(status) + super().__init__(ticket_id=ticket_id, msg=msg) + + +class PasswordTooSimple(NeedRedirectError): + default_code = 'passwd_too_simple' + default_detail = _('Your password is too simple, please change it for security') + + def __init__(self, url, *args, **kwargs): + super().__init__(*args, **kwargs) + self.url = url + + +class PasswordNeedUpdate(NeedRedirectError): + default_code = 'passwd_need_update' + default_detail = _('You should to change your password before login') + + def __init__(self, url, *args, **kwargs): + super().__init__(*args, **kwargs) + self.url = url + + +class PasswordRequireResetError(NeedRedirectError): + default_code = 'passwd_has_expired' + default_detail = _('Your password has expired, please reset before logging in') + + def __init__(self, url, *args, **kwargs): + super().__init__(*args, **kwargs) + self.url = url + + +class MFAUnsetError(NeedRedirectError): + error = const.reason_mfa_unset + msg = const.mfa_unset_msg + + def __init__(self, url, user, request): + self.url = url diff --git a/apps/authentication/mixins.py b/apps/authentication/mixins.py index f1989b181..58e6ccd36 100644 --- a/apps/authentication/mixins.py +++ b/apps/authentication/mixins.py @@ -193,8 +193,8 @@ class MFAMixin: def _check_if_no_active_mfa(self, user): active_mfa_mapper = user.active_mfa_backends_mapper if not active_mfa_mapper: - url = reverse('authentication:user-otp-enable-start') - raise errors.MFAUnsetError(user, self.request, url) + set_url = reverse('authentication:user-otp-enable-start') + raise errors.MFAUnsetError(set_url, user, self.request) def _check_login_page_mfa_if_need(self, user): if not settings.SECURITY_MFA_IN_LOGIN_PAGE: diff --git a/apps/authentication/views/login.py b/apps/authentication/views/login.py index 0736a994d..09c842d88 100644 --- a/apps/authentication/views/login.py +++ b/apps/authentication/views/login.py @@ -4,10 +4,11 @@ from __future__ import unicode_literals import os import datetime +from typing import Callable from django.templatetags.static import static from django.contrib.auth import login as auth_login, logout as auth_logout -from django.http import HttpResponse +from django.http import HttpResponse, HttpRequest from django.shortcuts import reverse, redirect from django.utils.decorators import method_decorator from django.utils.translation import ugettext as _, get_language @@ -34,106 +35,9 @@ __all__ = [ ] -@method_decorator(sensitive_post_parameters(), name='dispatch') -@method_decorator(csrf_protect, name='dispatch') -@method_decorator(never_cache, name='dispatch') -class UserLoginView(mixins.AuthMixin, FormView): - redirect_field_name = 'next' - template_name = 'authentication/login.html' - - def redirect_third_party_auth_if_need(self, request): - # show jumpserver login page if request http://{JUMP-SERVER}/?admin=1 - if self.request.GET.get("admin", 0): - return None - - auth_types = [m for m in self.get_support_auth_methods() if m.get('auto_redirect')] - if not auth_types: - return None - - # 明确直接登录哪个 - login_to = settings.LOGIN_REDIRECT_TO_BACKEND.upper() - if login_to == 'DIRECT': - return None - - auth_method = next(filter(lambda x: x['name'] == login_to, auth_types), None) - if not auth_method: - auth_method = auth_types[0] - - auth_name, redirect_url = auth_method['name'], auth_method['url'] - next_url = request.GET.get('next') or '/' - query_string = request.GET.urlencode() - redirect_url = '{}?next={}&{}'.format(redirect_url, next_url, query_string) - - if settings.LOGIN_REDIRECT_MSG_ENABLED: - message_data = { - 'title': _('Redirecting'), - 'message': _("Redirecting to {} authentication").format(auth_name), - 'redirect_url': redirect_url, - 'interval': 3, - 'has_cancel': True, - 'cancel_url': reverse('authentication:login') + '?admin=1' - } - redirect_url = FlashMessageUtil.gen_message_url(message_data) - return redirect_url - - def get(self, request, *args, **kwargs): - if request.user.is_staff: - first_login_url = redirect_user_first_login_or_index( - request, self.redirect_field_name - ) - return redirect(first_login_url) - redirect_url = self.redirect_third_party_auth_if_need(request) - if redirect_url: - return redirect(redirect_url) - request.session.set_test_cookie() - return super().get(request, *args, **kwargs) - - def form_valid(self, form): - if not self.request.session.test_cookie_worked(): - return HttpResponse(_("Please enable cookies and try again.")) - # https://docs.djangoproject.com/en/3.1/topics/http/sessions/#setting-test-cookies - self.request.session.delete_test_cookie() - - try: - self.check_user_auth(form.cleaned_data) - except errors.AuthFailedError as e: - form.add_error(None, e.msg) - self.set_login_failed_mark() - form_cls = get_user_login_form_cls(captcha=True) - new_form = form_cls(data=form.data) - new_form._errors = form.errors - context = self.get_context_data(form=new_form) - self.request.session.set_test_cookie() - return self.render_to_response(context) - except ( - errors.MFAUnsetError, - errors.PasswordTooSimple, - errors.PasswordRequireResetError, - errors.PasswordNeedUpdate - ) as e: - return redirect(e.url) - except ( - errors.MFAFailedError, - errors.BlockMFAError, - errors.MFACodeRequiredError, - errors.SMSCodeRequiredError, - errors.UserPhoneNotSet, - errors.BlockGlobalIpLoginError - ) as e: - form.add_error('code', e.msg) - return super().form_invalid(form) - self.clear_rsa_key() - return self.redirect_to_guard_view() - - def get_form_class(self): - if self.check_is_need_captcha(): - return get_user_login_form_cls(captcha=True) - else: - return get_user_login_form_cls() - - def clear_rsa_key(self): - self.request.session[RSA_PRIVATE_KEY] = None - self.request.session[RSA_PUBLIC_KEY] = None +class UserLoginContextMixin: + get_user_mfa_context: Callable + request: HttpRequest @staticmethod def get_support_auth_methods(): @@ -236,6 +140,103 @@ class UserLoginView(mixins.AuthMixin, FormView): return context +@method_decorator(sensitive_post_parameters(), name='dispatch') +@method_decorator(csrf_protect, name='dispatch') +@method_decorator(never_cache, name='dispatch') +class UserLoginView(mixins.AuthMixin, UserLoginContextMixin, FormView): + redirect_field_name = 'next' + template_name = 'authentication/login.html' + + def redirect_third_party_auth_if_need(self, request): + # show jumpserver login page if request http://{JUMP-SERVER}/?admin=1 + if self.request.GET.get("admin", 0): + return None + + auth_types = [m for m in self.get_support_auth_methods() if m.get('auto_redirect')] + if not auth_types: + return None + + # 明确直接登录哪个 + login_to = settings.LOGIN_REDIRECT_TO_BACKEND.upper() + if login_to == 'DIRECT': + return None + + auth_method = next(filter(lambda x: x['name'] == login_to, auth_types), None) + if not auth_method: + auth_method = auth_types[0] + + auth_name, redirect_url = auth_method['name'], auth_method['url'] + next_url = request.GET.get('next') or '/' + query_string = request.GET.urlencode() + redirect_url = '{}?next={}&{}'.format(redirect_url, next_url, query_string) + + if settings.LOGIN_REDIRECT_MSG_ENABLED: + message_data = { + 'title': _('Redirecting'), + 'message': _("Redirecting to {} authentication").format(auth_name), + 'redirect_url': redirect_url, + 'interval': 3, + 'has_cancel': True, + 'cancel_url': reverse('authentication:login') + '?admin=1' + } + redirect_url = FlashMessageUtil.gen_message_url(message_data) + return redirect_url + + def get(self, request, *args, **kwargs): + if request.user.is_staff: + first_login_url = redirect_user_first_login_or_index( + request, self.redirect_field_name + ) + return redirect(first_login_url) + redirect_url = self.redirect_third_party_auth_if_need(request) + if redirect_url: + return redirect(redirect_url) + request.session.set_test_cookie() + return super().get(request, *args, **kwargs) + + def form_valid(self, form): + if not self.request.session.test_cookie_worked(): + return HttpResponse(_("Please enable cookies and try again.")) + # https://docs.djangoproject.com/en/3.1/topics/http/sessions/#setting-test-cookies + self.request.session.delete_test_cookie() + + try: + self.check_user_auth(form.cleaned_data) + except errors.AuthFailedError as e: + form.add_error(None, e.msg) + self.set_login_failed_mark() + form_cls = get_user_login_form_cls(captcha=True) + new_form = form_cls(data=form.data) + new_form._errors = form.errors + context = self.get_context_data(form=new_form) + self.request.session.set_test_cookie() + return self.render_to_response(context) + except errors.NeedRedirectError as e: + return redirect(e.url) + except ( + errors.MFAFailedError, + errors.BlockMFAError, + errors.MFACodeRequiredError, + errors.SMSCodeRequiredError, + errors.UserPhoneNotSet, + errors.BlockGlobalIpLoginError + ) as e: + form.add_error('code', e.msg) + return super().form_invalid(form) + self.clear_rsa_key() + return self.redirect_to_guard_view() + + def get_form_class(self): + if self.check_is_need_captcha(): + return get_user_login_form_cls(captcha=True) + else: + return get_user_login_form_cls() + + def clear_rsa_key(self): + self.request.session[RSA_PRIVATE_KEY] = None + self.request.session[RSA_PUBLIC_KEY] = None + + class UserLoginGuardView(mixins.AuthMixin, RedirectView): redirect_field_name = 'next' login_url = reverse_lazy('authentication:login')