perf: 优化逻辑,抽离callback_base类

pull/10409/head
jiangweidong 2023-04-28 16:24:33 +08:00 committed by Jiangjie.Bai
parent 7a97496f70
commit 3367f65b02
5 changed files with 161 additions and 201 deletions

View File

@ -0,0 +1,105 @@
from functools import lru_cache
from rest_framework.request import Request
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.db.utils import IntegrityError
from django.views import View
from authentication import errors
from authentication.mixins import AuthMixin
from users.models import User
from common.utils.django import reverse, get_object_or_none
from common.utils import get_logger
from .mixins import FlashMessageMixin
logger = get_logger(__file__)
class BaseLoginCallbackView(AuthMixin, FlashMessageMixin, View):
def __init__(self):
super().__init__()
self.client_type = None
self.client_auth_params = {}
self.user_type = None
self.auth_backend = None
self.create_user_if_not_exist_setting = ''
# 提示信息
self.msg_client_err = _('Error')
self.msg_user_not_bound_err = _('Error')
self.msg_user_need_bound_warning = _('Error')
self.msg_not_found_user_from_client_err = _('Error')
def verify_state(self):
raise NotImplementedError
def get_verify_state_failed_response(self, redirect_uri):
raise NotImplementedError
@property
@lru_cache(maxsize=1)
def client(self):
if self.client_type is None or not self.client_auth_params:
raise NotImplementedError
client_init = {k: getattr(settings, v) for k, v in self.client_auth_params.items()}
return self.client_type(**client_init)
def create_user_if_not_exist(self, user_id, **kwargs):
user = None
if not getattr(settings, self.create_user_if_not_exist_setting):
title = self.msg_client_err
msg = self.msg_user_need_bound_warning
return user, (title, msg)
user_attr = self.client.get_user_detail(user_id, **kwargs)
try:
user, create = User.objects.get_or_create(
username=user_attr['username'], defaults=user_attr
)
setattr(user, f'{self.user_type}_id', user_id)
if create:
setattr(user, 'source', self.user_type)
user.save()
except IntegrityError as err:
logger.error(f'{self.msg_client_err}: create user error: {err}')
if user is None:
title = self.msg_client_err
msg = _('If you have any question, please contact the administrator')
return user, (title, msg)
return user, None
def get(self, request: Request):
code = request.GET.get('code')
redirect_url = request.GET.get('redirect_url')
login_url = reverse('authentication:login')
if not self.verify_state():
return self.get_verify_state_failed_response(redirect_url)
user_id, other_info = self.client.get_user_id_by_code(code)
if not user_id:
# 正常流程不会出这个错误hack 行为
err = self.msg_not_found_user_from_client_err
response = self.get_failed_response(login_url, title=err, msg=err)
return response
user = get_object_or_none(User, **{f'{self.user_type}_id': user_id})
if user is None:
user, err = self.create_user_if_not_exist(user_id, other_info=other_info)
if err is not None:
response = self.get_failed_response(login_url, title=err[0], msg=err[1])
return response
try:
self.check_oauth2_auth(user, getattr(settings, self.auth_backend))
except errors.AuthFailedError as e:
self.set_login_failed_mark()
msg = e.msg
response = self.get_failed_response(login_url, title=msg, msg=msg)
return response
return self.redirect_to_guard_view()

View File

@ -23,14 +23,15 @@ from common.utils.random import random_string
from users.models import User
from users.views import UserVerifyPasswordView
from .mixins import METAMixin, QRLoginCallbackMixin
from .base import BaseLoginCallbackView
from .mixins import METAMixin, FlashMessageMixin
logger = get_logger(__file__)
DINGTALK_STATE_SESSION_KEY = '_dingtalk_state'
class DingTalkBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
class DingTalkBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, FlashMessageMixin, View):
def dispatch(self, request, *args, **kwargs):
try:
return super().dispatch(request, *args, **kwargs)
@ -56,26 +57,6 @@ class DingTalkBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, Vie
msg = _("The system configuration is incorrect. Please contact your administrator")
return self.get_failed_response(redirect_uri, msg, msg)
@staticmethod
def get_success_response(redirect_url, title, msg):
message_data = {
'title': title,
'message': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
@staticmethod
def get_failed_response(redirect_url, title, msg):
message_data = {
'title': title,
'error': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
def get_already_bound_response(self, redirect_url):
msg = _('DingTalk is already bound')
response = self.get_failed_response(redirect_url, msg, msg)
@ -214,20 +195,21 @@ class DingTalkQRLoginView(DingTalkQRMixin, METAMixin, View):
return HttpResponseRedirect(url)
class DingTalkQRLoginCallbackView(QRLoginCallbackMixin, AuthMixin, DingTalkQRMixin, View):
class DingTalkQRLoginCallbackView(DingTalkQRMixin, BaseLoginCallbackView):
permission_classes = (AllowAny,)
CLIENT_INFO = (
DingTalk, {'appid': 'DINGTALK_APPKEY', 'appsecret': 'DINGTALK_APPSECRET', 'agentid': 'DINGTALK_AGENTID'}
)
USER_TYPE = 'dingtalk'
AUTH_BACKEND = 'AUTH_BACKEND_DINGTALK'
CREATE_USER_IF_NOT_EXIST = 'DINGTALK_CREATE_USER_IF_NOT_EXIST'
def __init__(self):
super(DingTalkQRLoginCallbackView, self).__init__()
self.client_type = DingTalk
self.client_auth_params = {'appid': 'DINGTALK_APPKEY', 'appsecret': 'DINGTALK_APPSECRET', 'agentid': 'DINGTALK_AGENTID'}
self.user_type = 'dingtalk'
self.auth_backend = 'AUTH_BACKEND_DINGTALK'
self.create_user_if_not_exist_setting = 'DINGTALK_CREATE_USER_IF_NOT_EXIST'
MSG_CLIENT_ERR = _('DingTalk Error')
MSG_USER_NOT_BOUND_ERR = _('DingTalk is not bound')
MSG_USER_NEED_BOUND_WARNING = _('Please login with a password and then bind the DingTalk')
MSG_NOT_FOUND_USER_FROM_CLIENT_ERR = _('Failed to get user from DingTalk')
self.msg_client_err = _('DingTalk Error')
self.msg_user_not_bound_err = _('DingTalk is not bound')
self.msg_user_need_bound_warning = _('Please login with a password and then bind the DingTalk')
self.msg_not_found_user_from_client_err = _('Failed to get user from DingTalk')
class DingTalkOAuthLoginView(DingTalkOAuthMixin, View):

View File

@ -10,25 +10,25 @@ from rest_framework.exceptions import APIException
from rest_framework.permissions import AllowAny, IsAuthenticated
from authentication.const import ConfirmType
from authentication.mixins import AuthMixin
from authentication.notifications import OAuthBindMessage
from common.views.mixins import PermissionsMixin, UserConfirmRequiredExceptionMixin
from common.permissions import UserConfirmation
from common.sdk.im.feishu import URL, FeiShu
from common.utils import FlashMessageUtil, get_logger
from common.utils import get_logger
from common.utils.common import get_request_ip
from common.utils.django import reverse
from common.utils.random import random_string
from users.views import UserVerifyPasswordView
from .mixins import QRLoginCallbackMixin
from .base import BaseLoginCallbackView
from .mixins import FlashMessageMixin
logger = get_logger(__file__)
FEISHU_STATE_SESSION_KEY = '_feishu_state'
class FeiShuQRMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
class FeiShuQRMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, FlashMessageMixin, View):
def dispatch(self, request, *args, **kwargs):
try:
return super().dispatch(request, *args, **kwargs)
@ -63,26 +63,6 @@ class FeiShuQRMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
url = URL().authen + '?' + urlencode(params)
return url
@staticmethod
def get_success_response(redirect_url, title, msg):
message_data = {
'title': title,
'message': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
@staticmethod
def get_failed_response(redirect_url, title, msg):
message_data = {
'title': title,
'error': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
def get_already_bound_response(self, redirect_url):
msg = _('FeiShu is already bound')
response = self.get_failed_response(redirect_url, msg, msg)
@ -93,7 +73,6 @@ class FeiShuQRBindView(FeiShuQRMixin, View):
permission_classes = (IsAuthenticated, UserConfirmation.require(ConfirmType.ReLogin))
def get(self, request: HttpRequest):
user = request.user
redirect_url = request.GET.get('redirect_url')
redirect_uri = reverse('authentication:feishu-qr-bind-callback', external=True)
@ -176,17 +155,18 @@ class FeiShuQRLoginView(FeiShuQRMixin, View):
return HttpResponseRedirect(url)
class FeiShuQRLoginCallbackView(QRLoginCallbackMixin, AuthMixin, FeiShuQRMixin, View):
class FeiShuQRLoginCallbackView(FeiShuQRMixin, BaseLoginCallbackView):
permission_classes = (AllowAny,)
CLIENT_INFO = (
FeiShu, {'app_id': 'FEISHU_APP_ID', 'app_secret': 'FEISHU_APP_SECRET'}
)
USER_TYPE = 'feishu'
AUTH_BACKEND = 'AUTH_BACKEND_FEISHU'
CREATE_USER_IF_NOT_EXIST = 'FEISHU_CREATE_USER_IF_NOT_EXIST'
def __init__(self):
super(FeiShuQRLoginCallbackView, self).__init__()
self.client_type = FeiShu
self.client_auth_params = {'app_id': 'FEISHU_APP_ID', 'app_secret': 'FEISHU_APP_SECRET'}
self.user_type = 'feishu'
self.auth_backend = 'AUTH_BACKEND_FEISHU'
self.create_user_if_not_exist_setting = 'FEISHU_CREATE_USER_IF_NOT_EXIST'
MSG_CLIENT_ERR = _('FeiShu Error')
MSG_USER_NOT_BOUND_ERR = _('FeiShu is not bound')
MSG_USER_NEED_BOUND_WARNING = _('Please login with a password and then bind the FeiShu')
MSG_NOT_FOUND_USER_FROM_CLIENT_ERR = _('Failed to get user from FeiShu')
self.msg_client_err = _('FeiShu Error')
self.msg_user_not_bound_err = _('FeiShu is not bound')
self.msg_user_need_bound_warning = _('Please login with a password and then bind the FeiShu')
self.msg_not_found_user_from_client_err = _('Failed to get user from FeiShu')

View File

@ -1,19 +1,6 @@
# -*- coding: utf-8 -*-
#
from functools import lru_cache
from rest_framework.request import Request
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.db.utils import IntegrityError
from authentication import errors
from users.models import User
from common.utils.django import reverse, get_object_or_none
from common.utils import get_logger
logger = get_logger(__file__)
from common.utils import FlashMessageUtil
class METAMixin:
@ -27,90 +14,14 @@ class METAMixin:
return next_url
class Client:
get_user_id_by_code: callable
get_user_detail: callable
class FlashMessageMixin:
@staticmethod
def get_response(redirect_url, title, msg, m_type='message'):
message_data = {'title': title, 'interval': 5, 'redirect_url': redirect_url, m_type: msg}
return FlashMessageUtil.gen_and_redirect_to(message_data)
def get_success_response(self, redirect_url, title, msg):
self.get_response(redirect_url, title, msg)
class QRLoginCallbackMixin:
verify_state: callable
get_verify_state_failed_response: callable
get_failed_response: callable
check_oauth2_auth: callable
set_login_failed_mark: callable
redirect_to_guard_view: callable
# 属性
_client: Client
CLIENT_INFO: tuple
USER_TYPE: str
AUTH_BACKEND: str
CREATE_USER_IF_NOT_EXIST: str
# 提示信息
MSG_CLIENT_ERR: str
MSG_USER_NOT_BOUND_ERR: str
MSG_USER_NEED_BOUND_WARNING: str
MSG_NOT_FOUND_USER_FROM_CLIENT_ERR: str
@property
@lru_cache(maxsize=1)
def client(self):
client_type, client_init = self.CLIENT_INFO
client_init = {k: getattr(settings, v) for k, v in client_init.items()}
return client_type(**client_init)
def create_user_if_not_exist(self, user_id, **kwargs):
user = None
if not getattr(settings, self.CREATE_USER_IF_NOT_EXIST):
title = self.MSG_CLIENT_ERR
msg = self.MSG_USER_NEED_BOUND_WARNING
return user, (title, msg)
user_attr = self.client.get_user_detail(user_id, **kwargs)
try:
user, create = User.objects.get_or_create(
username=user_attr['username'], defaults=user_attr
)
setattr(user, f'{self.USER_TYPE}_id', user_id)
if create:
setattr(user, 'source', self.USER_TYPE)
user.save()
except IntegrityError as err:
logger.error(f'{self.MSG_CLIENT_ERR}: create user error: {err}')
if user is None:
title = self.MSG_CLIENT_ERR
msg = _('If you have any question, please contact the administrator')
return user, (title, msg)
return user, None
def get(self, request: Request):
code = request.GET.get('code')
redirect_url = request.GET.get('redirect_url')
login_url = reverse('authentication:login')
if not self.verify_state():
return self.get_verify_state_failed_response(redirect_url)
user_id, other_info = self.client.get_user_id_by_code(code)
if not user_id:
# 正常流程不会出这个错误hack 行为
err = self.MSG_NOT_FOUND_USER_FROM_CLIENT_ERR
response = self.get_failed_response(login_url, title=err, msg=err)
return response
user = get_object_or_none(User, **{f'{self.USER_TYPE}_id': user_id})
if user is None:
user, err = self.create_user_if_not_exist(user_id, other_info=other_info)
if err is not None:
response = self.get_failed_response(login_url, title=err[0], msg=err[1])
return response
try:
self.check_oauth2_auth(user, getattr(settings, self.AUTH_BACKEND))
except errors.AuthFailedError as e:
self.set_login_failed_mark()
msg = e.msg
response = self.get_failed_response(login_url, title=msg, msg=msg)
return response
return self.redirect_to_guard_view()
def get_failed_response(self, redirect_url, title, msg):
self.get_response(redirect_url, title, msg, 'error')

View File

@ -23,14 +23,15 @@ from authentication.mixins import AuthMixin
from authentication.const import ConfirmType
from authentication.notifications import OAuthBindMessage
from .mixins import METAMixin, QRLoginCallbackMixin
from .base import BaseLoginCallbackView
from .mixins import METAMixin, FlashMessageMixin
logger = get_logger(__file__)
WECOM_STATE_SESSION_KEY = '_wecom_state'
class WeComBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
class WeComBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, FlashMessageMixin, View):
def dispatch(self, request, *args, **kwargs):
try:
return super().dispatch(request, *args, **kwargs)
@ -56,26 +57,6 @@ class WeComBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
msg = _("The system configuration is incorrect. Please contact your administrator")
return self.get_failed_response(redirect_uri, msg, msg)
@staticmethod
def get_success_response(redirect_url, title, msg):
message_data = {
'title': title,
'message': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
@staticmethod
def get_failed_response(redirect_url, title, msg):
message_data = {
'title': title,
'error': msg,
'interval': 5,
'redirect_url': redirect_url,
}
return FlashMessageUtil.gen_and_redirect_to(message_data)
def get_already_bound_response(self, redirect_url):
msg = _('WeCom is already bound')
response = self.get_failed_response(redirect_url, msg, msg)
@ -209,20 +190,21 @@ class WeComQRLoginView(WeComQRMixin, METAMixin, View):
return HttpResponseRedirect(url)
class WeComQRLoginCallbackView(QRLoginCallbackMixin, AuthMixin, WeComQRMixin, View):
class WeComQRLoginCallbackView(WeComQRMixin, BaseLoginCallbackView):
permission_classes = (AllowAny,)
CLIENT_INFO = (
WeCom, {'corpid': 'WECOM_CORPID', 'corpsecret': 'WECOM_SECRET', 'agentid': 'WECOM_AGENTID'}
)
USER_TYPE = 'wecom'
AUTH_BACKEND = 'AUTH_BACKEND_WECOM'
CREATE_USER_IF_NOT_EXIST = 'WECOM_CREATE_USER_IF_NOT_EXIST'
def __init__(self):
super(WeComQRLoginCallbackView, self).__init__()
self.client_type = WeCom
self.client_auth_params = {'corpid': 'WECOM_CORPID', 'corpsecret': 'WECOM_SECRET', 'agentid': 'WECOM_AGENTID'}
self.user_type = 'wecom'
self.auth_backend = 'AUTH_BACKEND_WECOM'
self.create_user_if_not_exist_setting = 'WECOM_CREATE_USER_IF_NOT_EXIST'
MSG_CLIENT_ERR = _('WeCom Error')
MSG_USER_NOT_BOUND_ERR = _('WeCom is not bound')
MSG_USER_NEED_BOUND_WARNING = _('Please login with a password and then bind the WeCom')
MSG_NOT_FOUND_USER_FROM_CLIENT_ERR = _('Failed to get user from WeCom')
self.msg_client_err = _('WeCom Error')
self.msg_user_not_bound_err = _('WeCom is not bound')
self.msg_user_need_bound_warning = _('Please login with a password and then bind the WeCom')
self.msg_not_found_user_from_client_err = _('Failed to get user from WeCom')
class WeComOAuthLoginView(WeComOAuthMixin, View):