193 lines
6.4 KiB
Python
193 lines
6.4 KiB
Python
from urllib.parse import urlencode
|
|
|
|
from django.conf import settings
|
|
from django.db.utils import IntegrityError
|
|
from django.http.request import HttpRequest
|
|
from django.http.response import HttpResponseRedirect
|
|
from django.utils.translation import ugettext_lazy as _
|
|
from django.views import View
|
|
from rest_framework.exceptions import APIException
|
|
from rest_framework.permissions import AllowAny, IsAuthenticated
|
|
|
|
from authentication.const import ConfirmType
|
|
from authentication.mixins import AuthMixin
|
|
from authentication.notifications import OAuthBindMessage
|
|
from common.views.mixins import PermissionsMixin, UserConfirmRequiredExceptionMixin
|
|
from common.permissions import UserConfirmation
|
|
from common.sdk.im.feishu import URL, FeiShu
|
|
from common.utils import FlashMessageUtil, get_logger
|
|
from common.utils.common import get_request_ip
|
|
from common.utils.django import reverse
|
|
from common.utils.random import random_string
|
|
from users.views import UserVerifyPasswordView
|
|
|
|
from .mixins import QRLoginCallbackMixin
|
|
|
|
logger = get_logger(__file__)
|
|
|
|
FEISHU_STATE_SESSION_KEY = '_feishu_state'
|
|
|
|
|
|
class FeiShuQRMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View):
|
|
def dispatch(self, request, *args, **kwargs):
|
|
try:
|
|
return super().dispatch(request, *args, **kwargs)
|
|
except APIException as e:
|
|
msg = str(e.detail)
|
|
return self.get_failed_response(
|
|
'/',
|
|
_('FeiShu Error'),
|
|
msg
|
|
)
|
|
|
|
def verify_state(self):
|
|
state = self.request.GET.get('state')
|
|
session_state = self.request.session.get(FEISHU_STATE_SESSION_KEY)
|
|
if state != session_state:
|
|
return False
|
|
return True
|
|
|
|
def get_verify_state_failed_response(self, redirect_uri):
|
|
msg = _("The system configuration is incorrect. Please contact your administrator")
|
|
return self.get_failed_response(redirect_uri, msg, msg)
|
|
|
|
def get_qr_url(self, redirect_uri):
|
|
state = random_string(16)
|
|
self.request.session[FEISHU_STATE_SESSION_KEY] = state
|
|
|
|
params = {
|
|
'app_id': settings.FEISHU_APP_ID,
|
|
'state': state,
|
|
'redirect_uri': redirect_uri,
|
|
}
|
|
url = URL().authen + '?' + urlencode(params)
|
|
return url
|
|
|
|
@staticmethod
|
|
def get_success_response(redirect_url, title, msg):
|
|
message_data = {
|
|
'title': title,
|
|
'message': msg,
|
|
'interval': 5,
|
|
'redirect_url': redirect_url,
|
|
}
|
|
return FlashMessageUtil.gen_and_redirect_to(message_data)
|
|
|
|
@staticmethod
|
|
def get_failed_response(redirect_url, title, msg):
|
|
message_data = {
|
|
'title': title,
|
|
'error': msg,
|
|
'interval': 5,
|
|
'redirect_url': redirect_url,
|
|
}
|
|
return FlashMessageUtil.gen_and_redirect_to(message_data)
|
|
|
|
def get_already_bound_response(self, redirect_url):
|
|
msg = _('FeiShu is already bound')
|
|
response = self.get_failed_response(redirect_url, msg, msg)
|
|
return response
|
|
|
|
|
|
class FeiShuQRBindView(FeiShuQRMixin, View):
|
|
permission_classes = (IsAuthenticated, UserConfirmation.require(ConfirmType.ReLogin))
|
|
|
|
def get(self, request: HttpRequest):
|
|
user = request.user
|
|
redirect_url = request.GET.get('redirect_url')
|
|
|
|
redirect_uri = reverse('authentication:feishu-qr-bind-callback', external=True)
|
|
redirect_uri += '?' + urlencode({'redirect_url': redirect_url})
|
|
|
|
url = self.get_qr_url(redirect_uri)
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
class FeiShuQRBindCallbackView(FeiShuQRMixin, View):
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
def get(self, request: HttpRequest):
|
|
code = request.GET.get('code')
|
|
redirect_url = request.GET.get('redirect_url')
|
|
|
|
if not self.verify_state():
|
|
return self.get_verify_state_failed_response(redirect_url)
|
|
|
|
user = request.user
|
|
|
|
if user.feishu_id:
|
|
response = self.get_already_bound_response(redirect_url)
|
|
return response
|
|
|
|
feishu = FeiShu(
|
|
app_id=settings.FEISHU_APP_ID,
|
|
app_secret=settings.FEISHU_APP_SECRET
|
|
)
|
|
user_id, __ = feishu.get_user_id_by_code(code)
|
|
|
|
if not user_id:
|
|
msg = _('FeiShu query user failed')
|
|
response = self.get_failed_response(redirect_url, msg, msg)
|
|
return response
|
|
|
|
try:
|
|
user.feishu_id = user_id
|
|
user.save()
|
|
except IntegrityError as e:
|
|
if e.args[0] == 1062:
|
|
msg = _('The FeiShu is already bound to another user')
|
|
response = self.get_failed_response(redirect_url, msg, msg)
|
|
return response
|
|
raise e
|
|
|
|
ip = get_request_ip(request)
|
|
OAuthBindMessage(user, ip, _('FeiShu'), user_id).publish_async()
|
|
msg = _('Binding FeiShu successfully')
|
|
response = self.get_success_response(redirect_url, msg, msg)
|
|
return response
|
|
|
|
|
|
class FeiShuEnableStartView(UserVerifyPasswordView):
|
|
|
|
def get_success_url(self):
|
|
referer = self.request.META.get('HTTP_REFERER')
|
|
redirect_url = self.request.GET.get("redirect_url")
|
|
|
|
success_url = reverse('authentication:feishu-qr-bind')
|
|
|
|
success_url += '?' + urlencode({
|
|
'redirect_url': redirect_url or referer
|
|
})
|
|
|
|
return success_url
|
|
|
|
|
|
class FeiShuQRLoginView(FeiShuQRMixin, View):
|
|
permission_classes = (AllowAny,)
|
|
|
|
def get(self, request: HttpRequest):
|
|
redirect_url = request.GET.get('redirect_url') or reverse('index')
|
|
redirect_uri = reverse('authentication:feishu-qr-login-callback', external=True)
|
|
redirect_uri += '?' + urlencode({
|
|
'redirect_url': redirect_url,
|
|
})
|
|
|
|
url = self.get_qr_url(redirect_uri)
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
class FeiShuQRLoginCallbackView(QRLoginCallbackMixin, AuthMixin, FeiShuQRMixin, View):
|
|
permission_classes = (AllowAny,)
|
|
|
|
CLIENT_INFO = (
|
|
FeiShu, {'app_id': 'FEISHU_APP_ID', 'app_secret': 'FEISHU_APP_SECRET'}
|
|
)
|
|
USER_TYPE = 'feishu'
|
|
AUTH_BACKEND = 'AUTH_BACKEND_FEISHU'
|
|
CREATE_USER_IF_NOT_EXIST = 'FEISHU_CREATE_USER_IF_NOT_EXIST'
|
|
|
|
MSG_CLIENT_ERR = _('FeiShu Error')
|
|
MSG_USER_NOT_BOUND_ERR = _('FeiShu is not bound')
|
|
MSG_USER_NEED_BOUND_WARNING = _('Please login with a password and then bind the FeiShu')
|
|
MSG_NOT_FOUND_USER_FROM_CLIENT_ERR = _('Failed to get user from FeiShu')
|