mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			161 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			161 lines
		
	
	
		
			4.6 KiB
		
	
	
	
		
			Python
		
	
	
| # -*- coding: utf-8 -*-
 | ||
| #
 | ||
| from django.conf import settings
 | ||
| from django.utils.translation import gettext_lazy as _
 | ||
| 
 | ||
| from users.utils import LoginBlockUtil, MFABlockUtils, LoginIpBlockUtil
 | ||
| from . import const
 | ||
| from ..signals import post_auth_failed
 | ||
| 
 | ||
| 
 | ||
| class AuthFailedNeedLogMixin:
 | ||
|     username = ''
 | ||
|     request = None
 | ||
|     error = ''
 | ||
|     msg = ''
 | ||
| 
 | ||
|     def __init__(self, *args, **kwargs):
 | ||
|         super().__init__(*args, **kwargs)
 | ||
|         post_auth_failed.send(
 | ||
|             sender=self.__class__, username=self.username,
 | ||
|             request=self.request, reason=self.msg
 | ||
|         )
 | ||
| 
 | ||
| 
 | ||
| class AuthFailedNeedBlockMixin:
 | ||
|     username = ''
 | ||
|     ip = ''
 | ||
| 
 | ||
|     def __init__(self, *args, **kwargs):
 | ||
|         super().__init__(*args, **kwargs)
 | ||
|         LoginBlockUtil(self.username, self.ip).incr_failed_count()
 | ||
| 
 | ||
| 
 | ||
| class AuthFailedError(Exception):
 | ||
|     username = ''
 | ||
|     msg = ''
 | ||
|     error = ''
 | ||
|     request = None
 | ||
|     ip = ''
 | ||
| 
 | ||
|     def __init__(self, **kwargs):
 | ||
|         for k, v in kwargs.items():
 | ||
|             setattr(self, k, v)
 | ||
| 
 | ||
|     def as_data(self):
 | ||
|         return {
 | ||
|             'error': self.error,
 | ||
|             'msg': self.msg,
 | ||
|         }
 | ||
| 
 | ||
|     def __str__(self):
 | ||
|         return str(self.msg)
 | ||
| 
 | ||
| 
 | ||
| class BlockGlobalIpLoginError(AuthFailedError):
 | ||
|     error = 'block_global_ip_login'
 | ||
| 
 | ||
|     def __init__(self, username, ip, **kwargs):
 | ||
|         if not self.msg:
 | ||
|             self.msg = const.block_ip_login_msg.format(settings.SECURITY_LOGIN_IP_LIMIT_TIME)
 | ||
|         LoginIpBlockUtil(ip).set_block_if_need()
 | ||
|         super().__init__(username=username, ip=ip, **kwargs)
 | ||
| 
 | ||
| 
 | ||
| class CredentialError(
 | ||
|     AuthFailedNeedLogMixin, AuthFailedNeedBlockMixin,
 | ||
|     BlockGlobalIpLoginError, AuthFailedError
 | ||
| ):
 | ||
|     def __init__(self, error, username, ip, request):
 | ||
|         util = LoginBlockUtil(username, ip)
 | ||
|         times_remainder = util.get_remainder_times()
 | ||
|         block_time = settings.SECURITY_LOGIN_LIMIT_TIME
 | ||
|         if times_remainder < 1:
 | ||
|             self.msg = const.block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME)
 | ||
|         else:
 | ||
|             default_msg = const.invalid_login_msg.format(
 | ||
|                 times_try=times_remainder, block_time=block_time
 | ||
|             )
 | ||
|             if error == const.reason_password_failed:
 | ||
|                 self.msg = default_msg
 | ||
|             else:
 | ||
|                 self.msg = const.reason_choices.get(error, default_msg)
 | ||
|         # 先处理 msg 在 super,记录日志时原因才准确
 | ||
|         super().__init__(error=error, username=username, ip=ip, request=request)
 | ||
| 
 | ||
| 
 | ||
| class MFAFailedError(AuthFailedNeedLogMixin, AuthFailedError):
 | ||
|     error = const.reason_mfa_failed
 | ||
|     msg: str
 | ||
| 
 | ||
|     def __init__(self, username, request, ip, mfa_type, error):
 | ||
|         util = MFABlockUtils(username, ip)
 | ||
|         times_remainder = util.incr_failed_count()
 | ||
|         block_time = settings.SECURITY_LOGIN_LIMIT_TIME
 | ||
| 
 | ||
|         if times_remainder:
 | ||
|             self.msg = const.mfa_error_msg.format(
 | ||
|                 error=error, times_try=times_remainder, block_time=block_time
 | ||
|             )
 | ||
|         else:
 | ||
|             self.msg = const.block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME)
 | ||
|         super().__init__(username=username, request=request)
 | ||
| 
 | ||
| 
 | ||
| class BlockMFAError(AuthFailedNeedLogMixin, AuthFailedError):
 | ||
|     error = 'block_mfa'
 | ||
| 
 | ||
|     def __init__(self, username, request, ip):
 | ||
|         self.msg = const.block_mfa_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME)
 | ||
|         super().__init__(username=username, request=request, ip=ip)
 | ||
| 
 | ||
| 
 | ||
| class BlockLoginError(AuthFailedNeedBlockMixin, AuthFailedError):
 | ||
|     error = 'block_login'
 | ||
| 
 | ||
|     def __init__(self, username, ip):
 | ||
|         self.msg = const.block_user_login_msg.format(settings.SECURITY_LOGIN_LIMIT_TIME)
 | ||
|         super().__init__(username=username, ip=ip)
 | ||
| 
 | ||
| 
 | ||
| class SessionEmptyError(AuthFailedError):
 | ||
|     msg = const.session_empty_msg
 | ||
|     error = 'session_empty'
 | ||
| 
 | ||
| 
 | ||
| class ACLError(AuthFailedNeedLogMixin, AuthFailedError):
 | ||
|     msg = const.reason_acl_not_allow
 | ||
|     error = 'acl_error'
 | ||
| 
 | ||
|     def __init__(self, msg, **kwargs):
 | ||
|         self.msg = msg
 | ||
|         super().__init__(**kwargs)
 | ||
| 
 | ||
|     def as_data(self):
 | ||
|         return {
 | ||
|             "error": const.reason_acl_not_allow,
 | ||
|             "msg": self.msg
 | ||
|         }
 | ||
| 
 | ||
| 
 | ||
| class LoginACLIPAndTimePeriodNotAllowed(ACLError):
 | ||
|     def __init__(self, username, request, **kwargs):
 | ||
|         self.username = username
 | ||
|         self.request = request
 | ||
|         super().__init__(_("Current IP and Time period is not allowed"), **kwargs)
 | ||
| 
 | ||
| 
 | ||
| class MFACodeRequiredError(AuthFailedError):
 | ||
|     error = 'mfa_code_required'
 | ||
|     msg = _("Please enter MFA code")
 | ||
| 
 | ||
| 
 | ||
| class SMSCodeRequiredError(AuthFailedError):
 | ||
|     error = 'sms_code_required'
 | ||
|     msg = _("Please enter SMS code")
 | ||
| 
 | ||
| 
 | ||
| class UserPhoneNotSet(AuthFailedError):
 | ||
|     error = 'phone_not_set'
 | ||
|     msg = _('Phone not set')
 |