mirror of https://github.com/jumpserver/jumpserver
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
310 lines
11 KiB
310 lines
11 KiB
from urllib.parse import urlencode |
|
|
|
from django.conf import settings |
|
from django.db.utils import IntegrityError |
|
from django.http.request import HttpRequest |
|
from django.http.response import HttpResponseRedirect |
|
from django.utils.translation import ugettext_lazy as _ |
|
from django.views import View |
|
from rest_framework.exceptions import APIException |
|
from rest_framework.permissions import AllowAny, IsAuthenticated |
|
|
|
from authentication import errors |
|
from authentication.const import ConfirmType |
|
from authentication.mixins import AuthMixin |
|
from authentication.notifications import OAuthBindMessage |
|
from common.mixins.views import PermissionsMixin, UserConfirmRequiredExceptionMixin |
|
from common.permissions import UserConfirmation |
|
from common.sdk.im.dingtalk import URL, DingTalk |
|
from common.utils import FlashMessageUtil, get_logger |
|
from common.utils.common import get_request_ip |
|
from common.utils.django import get_object_or_none, reverse |
|
from common.utils.random import random_string |
|
from users.models import User |
|
from users.views import UserVerifyPasswordView |
|
|
|
from .mixins import METAMixin |
|
|
|
logger = get_logger(__file__) |
|
|
|
|
|
DINGTALK_STATE_SESSION_KEY = '_dingtalk_state' |
|
|
|
|
|
class DingTalkBaseMixin(UserConfirmRequiredExceptionMixin, PermissionsMixin, View): |
|
def dispatch(self, request, *args, **kwargs): |
|
try: |
|
return super().dispatch(request, *args, **kwargs) |
|
except APIException as e: |
|
try: |
|
msg = e.detail['errmsg'] |
|
except Exception: |
|
msg = _('DingTalk Error, Please contact your system administrator') |
|
return self.get_failed_response( |
|
'/', |
|
_('DingTalk Error'), |
|
msg |
|
) |
|
|
|
def verify_state(self): |
|
state = self.request.GET.get('state') |
|
session_state = self.request.session.get(DINGTALK_STATE_SESSION_KEY) |
|
if state != session_state: |
|
return False |
|
return True |
|
|
|
def get_verify_state_failed_response(self, redirect_uri): |
|
msg = _("The system configuration is incorrect. Please contact your administrator") |
|
return self.get_failed_response(redirect_uri, msg, msg) |
|
|
|
@staticmethod |
|
def get_success_response(redirect_url, title, msg): |
|
message_data = { |
|
'title': title, |
|
'message': msg, |
|
'interval': 5, |
|
'redirect_url': redirect_url, |
|
} |
|
return FlashMessageUtil.gen_and_redirect_to(message_data) |
|
|
|
@staticmethod |
|
def get_failed_response(redirect_url, title, msg): |
|
message_data = { |
|
'title': title, |
|
'error': msg, |
|
'interval': 5, |
|
'redirect_url': redirect_url, |
|
} |
|
return FlashMessageUtil.gen_and_redirect_to(message_data) |
|
|
|
def get_already_bound_response(self, redirect_url): |
|
msg = _('DingTalk is already bound') |
|
response = self.get_failed_response(redirect_url, msg, msg) |
|
return response |
|
|
|
|
|
class DingTalkQRMixin(DingTalkBaseMixin, View): |
|
|
|
def get_qr_url(self, redirect_uri): |
|
state = random_string(16) |
|
self.request.session[DINGTALK_STATE_SESSION_KEY] = state |
|
|
|
params = { |
|
'appid': settings.DINGTALK_APPKEY, |
|
'response_type': 'code', |
|
'scope': 'snsapi_login', |
|
'state': state, |
|
'redirect_uri': redirect_uri, |
|
} |
|
url = URL.QR_CONNECT + '?' + urlencode(params) |
|
return url |
|
|
|
|
|
class DingTalkOAuthMixin(DingTalkBaseMixin, View): |
|
|
|
def get_oauth_url(self, redirect_uri): |
|
if not settings.AUTH_DINGTALK: |
|
return reverse('authentication:login') |
|
state = random_string(16) |
|
self.request.session[DINGTALK_STATE_SESSION_KEY] = state |
|
|
|
params = { |
|
'appid': settings.DINGTALK_APPKEY, |
|
'response_type': 'code', |
|
'scope': 'snsapi_auth', |
|
'state': state, |
|
'redirect_uri': redirect_uri, |
|
} |
|
url = URL.OAUTH_CONNECT + '?' + urlencode(params) |
|
return url |
|
|
|
|
|
class DingTalkQRBindView(DingTalkQRMixin, View): |
|
permission_classes = (IsAuthenticated, UserConfirmation.require(ConfirmType.ReLogin)) |
|
|
|
def get(self, request: HttpRequest): |
|
user = request.user |
|
redirect_url = request.GET.get('redirect_url') |
|
|
|
redirect_uri = reverse('authentication:dingtalk-qr-bind-callback', kwargs={'user_id': user.id}, external=True) |
|
redirect_uri += '?' + urlencode({'redirect_url': redirect_url}) |
|
|
|
url = self.get_qr_url(redirect_uri) |
|
return HttpResponseRedirect(url) |
|
|
|
|
|
class DingTalkQRBindCallbackView(DingTalkQRMixin, View): |
|
permission_classes = (IsAuthenticated,) |
|
|
|
def get(self, request: HttpRequest, user_id): |
|
code = request.GET.get('code') |
|
redirect_url = request.GET.get('redirect_url') |
|
|
|
if not self.verify_state(): |
|
return self.get_verify_state_failed_response(redirect_url) |
|
|
|
user = get_object_or_none(User, id=user_id) |
|
if user is None: |
|
logger.error(f'DingTalkQR bind callback error, user_id invalid: user_id={user_id}') |
|
msg = _('Invalid user_id') |
|
response = self.get_failed_response(redirect_url, msg, msg) |
|
return response |
|
|
|
if user.dingtalk_id: |
|
response = self.get_already_bound_response(redirect_url) |
|
return response |
|
|
|
dingtalk = DingTalk( |
|
appid=settings.DINGTALK_APPKEY, |
|
appsecret=settings.DINGTALK_APPSECRET, |
|
agentid=settings.DINGTALK_AGENTID |
|
) |
|
userid = dingtalk.get_userid_by_code(code) |
|
|
|
if not userid: |
|
msg = _('DingTalk query user failed') |
|
response = self.get_failed_response(redirect_url, msg, msg) |
|
return response |
|
|
|
try: |
|
user.dingtalk_id = userid |
|
user.save() |
|
except IntegrityError as e: |
|
if e.args[0] == 1062: |
|
msg = _('The DingTalk is already bound to another user') |
|
response = self.get_failed_response(redirect_url, msg, msg) |
|
return response |
|
raise e |
|
|
|
ip = get_request_ip(request) |
|
OAuthBindMessage(user, ip, _('DingTalk'), user_id).publish_async() |
|
msg = _('Binding DingTalk successfully') |
|
response = self.get_success_response(redirect_url, msg, msg) |
|
return response |
|
|
|
|
|
class DingTalkEnableStartView(UserVerifyPasswordView): |
|
|
|
def get_success_url(self): |
|
referer = self.request.META.get('HTTP_REFERER') |
|
redirect_url = self.request.GET.get("redirect_url") |
|
|
|
success_url = reverse('authentication:dingtalk-qr-bind') |
|
|
|
success_url += '?' + urlencode({ |
|
'redirect_url': redirect_url or referer |
|
}) |
|
|
|
return success_url |
|
|
|
|
|
class DingTalkQRLoginView(DingTalkQRMixin, METAMixin, View): |
|
permission_classes = (AllowAny,) |
|
|
|
def get(self, request: HttpRequest): |
|
redirect_url = request.GET.get('redirect_url') or reverse('index') |
|
next_url = self.get_next_url_from_meta() or reverse('index') |
|
|
|
redirect_uri = reverse('authentication:dingtalk-qr-login-callback', external=True) |
|
redirect_uri += '?' + urlencode({ |
|
'redirect_url': redirect_url, |
|
'next': next_url, |
|
}) |
|
|
|
url = self.get_qr_url(redirect_uri) |
|
return HttpResponseRedirect(url) |
|
|
|
|
|
class DingTalkQRLoginCallbackView(AuthMixin, DingTalkQRMixin, View): |
|
permission_classes = (AllowAny,) |
|
|
|
def get(self, request: HttpRequest): |
|
code = request.GET.get('code') |
|
redirect_url = request.GET.get('redirect_url') |
|
login_url = reverse('authentication:login') |
|
|
|
if not self.verify_state(): |
|
return self.get_verify_state_failed_response(redirect_url) |
|
|
|
dingtalk = DingTalk( |
|
appid=settings.DINGTALK_APPKEY, |
|
appsecret=settings.DINGTALK_APPSECRET, |
|
agentid=settings.DINGTALK_AGENTID |
|
) |
|
userid = dingtalk.get_userid_by_code(code) |
|
if not userid: |
|
# 正常流程不会出这个错误,hack 行为 |
|
msg = _('Failed to get user from DingTalk') |
|
response = self.get_failed_response(login_url, title=msg, msg=msg) |
|
return response |
|
|
|
user = get_object_or_none(User, dingtalk_id=userid) |
|
if user is None: |
|
title = _('DingTalk is not bound') |
|
msg = _('Please login with a password and then bind the DingTalk') |
|
response = self.get_failed_response(login_url, title=title, msg=msg) |
|
return response |
|
|
|
try: |
|
self.check_oauth2_auth(user, settings.AUTH_BACKEND_DINGTALK) |
|
except errors.AuthFailedError as e: |
|
self.set_login_failed_mark() |
|
msg = e.msg |
|
response = self.get_failed_response(login_url, title=msg, msg=msg) |
|
return response |
|
|
|
return self.redirect_to_guard_view() |
|
|
|
|
|
class DingTalkOAuthLoginView(DingTalkOAuthMixin, View): |
|
permission_classes = (AllowAny,) |
|
|
|
def get(self, request: HttpRequest): |
|
redirect_url = request.GET.get('redirect_url') |
|
|
|
redirect_uri = reverse('authentication:dingtalk-oauth-login-callback', external=True) |
|
redirect_uri += '?' + urlencode({'redirect_url': redirect_url}) |
|
|
|
url = self.get_oauth_url(redirect_uri) |
|
return HttpResponseRedirect(url) |
|
|
|
|
|
class DingTalkOAuthLoginCallbackView(AuthMixin, DingTalkOAuthMixin, View): |
|
permission_classes = (AllowAny,) |
|
|
|
def get(self, request: HttpRequest): |
|
code = request.GET.get('code') |
|
redirect_url = request.GET.get('redirect_url') |
|
login_url = reverse('authentication:login') |
|
|
|
if not self.verify_state(): |
|
return self.get_verify_state_failed_response(redirect_url) |
|
|
|
dingtalk = DingTalk( |
|
appid=settings.DINGTALK_APPKEY, |
|
appsecret=settings.DINGTALK_APPSECRET, |
|
agentid=settings.DINGTALK_AGENTID |
|
) |
|
userid = dingtalk.get_userid_by_code(code) |
|
if not userid: |
|
# 正常流程不会出这个错误,hack 行为 |
|
msg = _('Failed to get user from DingTalk') |
|
response = self.get_failed_response(login_url, title=msg, msg=msg) |
|
return response |
|
|
|
user = get_object_or_none(User, dingtalk_id=userid) |
|
if user is None: |
|
title = _('DingTalk is not bound') |
|
msg = _('Please login with a password and then bind the DingTalk') |
|
response = self.get_failed_response(login_url, title=title, msg=msg) |
|
return response |
|
|
|
try: |
|
self.check_oauth2_auth(user, settings.AUTH_BACKEND_DINGTALK) |
|
except errors.AuthFailedError as e: |
|
self.set_login_failed_mark() |
|
msg = e.msg |
|
response = self.get_failed_response(login_url, title=msg, msg=msg) |
|
return response |
|
|
|
return self.redirect_to_guard_view()
|
|
|