mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			60 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			60 lines
		
	
	
		
			1.6 KiB
		
	
	
	
		
			Python
		
	
	
| from django.conf import settings
 | |
| from django.utils.module_loading import import_string
 | |
| from django.utils.translation import ugettext_lazy as _
 | |
| 
 | |
| from common.utils import get_logger
 | |
| from .base import BaseMFA
 | |
| 
 | |
| logger = get_logger(__file__)
 | |
| 
 | |
| mfa_custom_method = None
 | |
| 
 | |
| if settings.MFA_CUSTOM:
 | |
|     """ 保证自定义认证方法在服务运行时不能被更改,只在第一次调用时加载一次 """
 | |
|     try:
 | |
|         mfa_custom_method_path = 'data.mfa.main.check_code'
 | |
|         mfa_custom_method = import_string(mfa_custom_method_path)
 | |
|     except Exception as e:
 | |
|         logger.warning('Import custom auth method failed: {}, Maybe not enabled'.format(e))
 | |
| 
 | |
| custom_failed_msg = _("MFA Custom code invalid")
 | |
| 
 | |
| 
 | |
| class MFACustom(BaseMFA):
 | |
|     name = 'mfa_custom'
 | |
|     display_name = 'Custom'
 | |
|     placeholder = _("MFA custom verification code")
 | |
| 
 | |
|     def check_code(self, code):
 | |
|         assert self.is_authenticated()
 | |
|         ok = False
 | |
|         try:
 | |
|             ok = mfa_custom_method(user=self.user, code=code)
 | |
|         except Exception as exc:
 | |
|             logger.error('Custom authenticate error: {}'.format(exc))
 | |
|         msg = '' if ok else custom_failed_msg
 | |
|         return ok, msg
 | |
| 
 | |
|     def is_active(self):
 | |
|         return True
 | |
| 
 | |
|     @staticmethod
 | |
|     def global_enabled():
 | |
|         return settings.MFA_CUSTOM and callable(mfa_custom_method)
 | |
| 
 | |
|     def get_enable_url(self) -> str:
 | |
|         return ''
 | |
| 
 | |
|     def can_disable(self):
 | |
|         return False
 | |
| 
 | |
|     def disable(self):
 | |
|         return ''
 | |
| 
 | |
|     @staticmethod
 | |
|     def help_text_of_disable():
 | |
|         return _("MFA custom global enabled, cannot disable")
 | |
| 
 | |
|     def get_disable_url(self) -> str:
 | |
|         return ''
 |