|
|
from urllib.parse import urlencode
|
|
|
|
|
|
from django.conf import settings
|
|
|
from django.http.request import HttpRequest
|
|
|
from django.http.response import HttpResponseRedirect
|
|
|
from django.utils.translation import gettext_lazy as _
|
|
|
from django.views import View
|
|
|
from rest_framework.exceptions import APIException
|
|
|
from rest_framework.permissions import IsAuthenticated, AllowAny
|
|
|
|
|
|
from authentication import errors
|
|
|
from authentication.const import ConfirmType
|
|
|
from authentication.mixins import AuthMixin
|
|
|
from authentication.permissions import UserConfirmation
|
|
|
from common.sdk.im.wecom import URL
|
|
|
from common.sdk.im.wecom import WeCom
|
|
|
from common.utils import get_logger
|
|
|
from common.utils.common import get_request_ip
|
|
|
from common.utils.django import reverse, get_object_or_none, safe_next_url
|
|
|
from common.utils.random import random_string
|
|
|
from common.views.mixins import UserConfirmRequiredExceptionMixin, PermissionsMixin
|
|
|
from users.models import User
|
|
|
from users.views import UserVerifyPasswordView
|
|
|
from .base import BaseLoginCallbackView, BaseBindCallbackView
|
|
|
from .mixins import METAMixin, FlashMessageMixin
|
|
|
|
|
|
logger = get_logger(__file__)
|
|
|
|
|
|
WECOM_STATE_SESSION_KEY = '_wecom_state'
|
|
|
|
|
|
|
|
|
class WeComBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, FlashMessageMixin, View):
|
|
|
def dispatch(self, request, *args, **kwargs):
|
|
|
try:
|
|
|
return super().dispatch(request, *args, **kwargs)
|
|
|
except APIException as e:
|
|
|
try:
|
|
|
msg = e.detail['errmsg']
|
|
|
except Exception:
|
|
|
msg = _('WeCom Error, Please contact your system administrator')
|
|
|
return self.get_failed_response(
|
|
|
'/',
|
|
|
_('WeCom Error'),
|
|
|
msg
|
|
|
)
|
|
|
|
|
|
def verify_state(self):
|
|
|
return self.verify_state_with_session_key(WECOM_STATE_SESSION_KEY)
|
|
|
|
|
|
def get_already_bound_response(self, redirect_url):
|
|
|
msg = _('WeCom is already bound')
|
|
|
response = self.get_failed_response(redirect_url, msg, msg)
|
|
|
return response
|
|
|
|
|
|
|
|
|
class WeComQRMixin(WeComBaseMixin, View):
|
|
|
|
|
|
def get_qr_url(self, redirect_uri):
|
|
|
state = random_string(16)
|
|
|
self.request.session[WECOM_STATE_SESSION_KEY] = state
|
|
|
|
|
|
params = {
|
|
|
'appid': settings.WECOM_CORPID,
|
|
|
'agentid': settings.WECOM_AGENTID,
|
|
|
'state': state,
|
|
|
'redirect_uri': redirect_uri,
|
|
|
}
|
|
|
url = URL.QR_CONNECT + '?' + urlencode(params)
|
|
|
return url
|
|
|
|
|
|
|
|
|
class WeComOAuthMixin(WeComBaseMixin, View):
|
|
|
|
|
|
def get_oauth_url(self, redirect_uri):
|
|
|
if not settings.AUTH_WECOM:
|
|
|
return reverse('authentication:login')
|
|
|
state = random_string(16)
|
|
|
self.request.session[WECOM_STATE_SESSION_KEY] = state
|
|
|
|
|
|
params = {
|
|
|
'appid': settings.WECOM_CORPID,
|
|
|
'agentid': settings.WECOM_AGENTID,
|
|
|
'state': state,
|
|
|
'redirect_uri': redirect_uri,
|
|
|
'response_type': 'code',
|
|
|
'scope': 'snsapi_base',
|
|
|
}
|
|
|
url = URL.OAUTH_CONNECT + '?' + urlencode(params) + '#wechat_redirect'
|
|
|
return url
|
|
|
|
|
|
|
|
|
class WeComQRBindView(WeComQRMixin, View):
|
|
|
permission_classes = (IsAuthenticated, UserConfirmation.require(ConfirmType.RELOGIN))
|
|
|
|
|
|
def get(self, request: HttpRequest):
|
|
|
redirect_url = request.GET.get('redirect_url')
|
|
|
redirect_uri = reverse('authentication:wecom-qr-bind-callback', external=True)
|
|
|
redirect_uri += '?' + urlencode({'redirect_url': redirect_url})
|
|
|
|
|
|
url = self.get_qr_url(redirect_uri)
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
|
class WeComQRBindCallbackView(WeComQRMixin, BaseBindCallbackView):
|
|
|
permission_classes = (IsAuthenticated,)
|
|
|
|
|
|
client_type_path = 'common.sdk.im.wecom.WeCom'
|
|
|
client_auth_params = {'corpid': 'WECOM_CORPID', 'corpsecret': 'WECOM_SECRET', 'agentid': 'WECOM_AGENTID'}
|
|
|
auth_type = 'wecom'
|
|
|
auth_type_label = _('Wecom')
|
|
|
|
|
|
|
|
|
class WeComEnableStartView(UserVerifyPasswordView):
|
|
|
def get_success_url(self):
|
|
|
referer = self.request.META.get('HTTP_REFERER')
|
|
|
redirect_url = self.request.GET.get("redirect_url")
|
|
|
|
|
|
success_url = reverse('authentication:wecom-qr-bind')
|
|
|
success_url += '?' + urlencode({
|
|
|
'redirect_url': redirect_url or referer
|
|
|
})
|
|
|
return success_url
|
|
|
|
|
|
|
|
|
class WeComQRLoginView(WeComQRMixin, METAMixin, View):
|
|
|
permission_classes = (AllowAny,)
|
|
|
|
|
|
def get(self, request: HttpRequest):
|
|
|
redirect_url = request.GET.get('redirect_url') or reverse('index')
|
|
|
next_url = self.get_next_url_from_meta() or reverse('index')
|
|
|
next_url = safe_next_url(next_url, request=request)
|
|
|
redirect_uri = reverse('authentication:wecom-qr-login-callback', external=True)
|
|
|
redirect_uri += '?' + urlencode({
|
|
|
'redirect_url': redirect_url,
|
|
|
'next': next_url,
|
|
|
})
|
|
|
|
|
|
url = self.get_qr_url(redirect_uri)
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
|
class WeComQRLoginCallbackView(WeComQRMixin, BaseLoginCallbackView):
|
|
|
permission_classes = (AllowAny,)
|
|
|
|
|
|
client_type_path = 'common.sdk.im.wecom.WeCom'
|
|
|
client_auth_params = {'corpid': 'WECOM_CORPID', 'corpsecret': 'WECOM_SECRET', 'agentid': 'WECOM_AGENTID'}
|
|
|
user_type = 'wecom'
|
|
|
auth_backend = 'AUTH_BACKEND_WECOM'
|
|
|
|
|
|
msg_client_err = _('WeCom Error')
|
|
|
msg_user_not_bound_err = _('WeCom is not bound')
|
|
|
msg_not_found_user_from_client_err = _('Failed to get user from WeCom')
|
|
|
|
|
|
|
|
|
class WeComOAuthLoginView(WeComOAuthMixin, View):
|
|
|
permission_classes = (AllowAny,)
|
|
|
|
|
|
def get(self, request: HttpRequest):
|
|
|
redirect_url = request.GET.get('redirect_url')
|
|
|
|
|
|
redirect_uri = reverse('authentication:wecom-oauth-login-callback', external=True)
|
|
|
redirect_uri += '?' + urlencode({'redirect_url': redirect_url})
|
|
|
|
|
|
url = self.get_oauth_url(redirect_uri)
|
|
|
return HttpResponseRedirect(url)
|
|
|
|
|
|
|
|
|
class WeComOAuthLoginCallbackView(AuthMixin, WeComOAuthMixin, View):
|
|
|
permission_classes = (AllowAny,)
|
|
|
|
|
|
def get(self, request: HttpRequest):
|
|
|
code = request.GET.get('code')
|
|
|
redirect_url = request.GET.get('redirect_url')
|
|
|
login_url = reverse('authentication:login')
|
|
|
|
|
|
if not self.verify_state():
|
|
|
return self.get_verify_state_failed_response(redirect_url)
|
|
|
|
|
|
wecom = WeCom(
|
|
|
corpid=settings.WECOM_CORPID,
|
|
|
corpsecret=settings.WECOM_SECRET,
|
|
|
agentid=settings.WECOM_AGENTID
|
|
|
)
|
|
|
wecom_userid, __ = wecom.get_user_id_by_code(code)
|
|
|
if not wecom_userid:
|
|
|
# 正常流程不会出这个错误,hack 行为
|
|
|
msg = _('Failed to get user from WeCom')
|
|
|
response = self.get_failed_response(login_url, title=msg, msg=msg)
|
|
|
return response
|
|
|
|
|
|
user = get_object_or_none(User, wecom_id=wecom_userid)
|
|
|
if user is None:
|
|
|
title = _('WeCom is not bound')
|
|
|
msg = _('Please login with a password and then bind the WeCom')
|
|
|
response = self.get_failed_response(login_url, title=title, msg=msg)
|
|
|
return response
|
|
|
|
|
|
try:
|
|
|
self.check_oauth2_auth(user, settings.AUTH_BACKEND_WECOM)
|
|
|
except errors.AuthFailedError as e:
|
|
|
self.set_login_failed_mark()
|
|
|
msg = e.msg
|
|
|
response = self.get_failed_response(login_url, title=msg, msg=msg)
|
|
|
return response
|
|
|
return self.redirect_to_guard_view()
|