perf: 第三方用户认证错误信息提示(尤其是第三方认证跳转的情况) (#10446)

Co-authored-by: feng <1304903146@qq.com>
pull/10447/head
fit2bot 2023-05-12 17:22:18 +08:00 committed by GitHub
parent afb49f4040
commit 6afcf7bf42
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 49 additions and 16 deletions

View File

@ -1,17 +1,16 @@
import base64 import base64
import time
from django.conf import settings
from django.contrib.auth import logout as auth_logout
from django.http import HttpResponse
from django.shortcuts import redirect, reverse, render from django.shortcuts import redirect, reverse, render
from django.utils.deprecation import MiddlewareMixin from django.utils.deprecation import MiddlewareMixin
from django.http import HttpResponse
from django.conf import settings
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.contrib.auth import logout as auth_logout
from apps.authentication import mixins from apps.authentication import mixins
from authentication.signals import post_auth_failed
from common.utils import gen_key_pair from common.utils import gen_key_pair
from common.utils import get_request_ip from common.utils import get_request_ip
from .signals import post_auth_failed
class MFAMiddleware: class MFAMiddleware:
@ -76,12 +75,18 @@ class ThirdPartyLoginMiddleware(mixins.AuthMixin):
ip = get_request_ip(request) ip = get_request_ip(request)
try: try:
self.request = request self.request = request
self._check_third_party_login_acl()
self._check_login_acl(request.user, ip) self._check_login_acl(request.user, ip)
except Exception as e: except Exception as e:
post_auth_failed.send( if getattr(request, 'user_need_delete', False):
sender=self.__class__, username=request.user.username, request.user.delete()
request=self.request, reason=e.msg else:
) error_message = getattr(e, 'msg', None)
error_message = error_message or str(e)
post_auth_failed.send(
sender=self.__class__, username=request.user.username,
request=self.request, reason=error_message
)
auth_logout(request) auth_logout(request)
context = { context = {
'title': _('Authentication failed'), 'title': _('Authentication failed'),

View File

@ -54,6 +54,7 @@ def authenticate(request=None, **credentials):
""" """
username = credentials.get('username') username = credentials.get('username')
temp_user = None
for backend, backend_path in _get_backends(return_tuples=True): for backend, backend_path in _get_backends(return_tuples=True):
# 检查用户名是否允许认证 (预先检查,不浪费认证时间) # 检查用户名是否允许认证 (预先检查,不浪费认证时间)
logger.info('Try using auth backend: {}'.format(str(backend))) logger.info('Try using auth backend: {}'.format(str(backend)))
@ -77,11 +78,19 @@ def authenticate(request=None, **credentials):
# 检查用户是否允许认证 # 检查用户是否允许认证
if not backend.user_allow_authenticate(user): if not backend.user_allow_authenticate(user):
temp_user = user
temp_user.backend = backend_path
continue continue
# Annotate the user object with the path of the backend. # Annotate the user object with the path of the backend.
user.backend = backend_path user.backend = backend_path
return user return user
else:
if temp_user is not None:
source_display = temp_user.source_display
request.error_message = '''The administrator has enabled 'Only allow login from user source'.
The current user source is {}. Please contact the administrator.'''.format(source_display)
return temp_user
# The credentials supplied are invalid to all backends, fire signal # The credentials supplied are invalid to all backends, fire signal
user_login_failed.send(sender=__name__, credentials=_clean_credentials(credentials), request=request) user_login_failed.send(sender=__name__, credentials=_clean_credentials(credentials), request=request)
@ -345,6 +354,13 @@ class AuthACLMixin:
self.request.session['auth_acl_id'] = str(acl.id) self.request.session['auth_acl_id'] = str(acl.id)
return return
def _check_third_party_login_acl(self):
request = self.request
error_message = getattr(request, 'error_message', None)
if not error_message:
return
raise ValueError(error_message)
def check_user_login_confirm_if_need(self, user): def check_user_login_confirm_if_need(self, user):
if not self.request.session.get("auth_confirm_required"): if not self.request.session.get("auth_confirm_required"):
return return

View File

@ -1,9 +1,9 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
from django.conf import settings from django.conf import settings
from django.core.exceptions import PermissionDenied
from django.db.models.signals import post_save from django.db.models.signals import post_save
from django.dispatch import receiver from django.dispatch import receiver
from django.utils.translation import ugettext_lazy as _
from django_auth_ldap.backend import populate_user from django_auth_ldap.backend import populate_user
from django_cas_ng.signals import cas_user_authenticated from django_cas_ng.signals import cas_user_authenticated
@ -12,16 +12,29 @@ from authentication.backends.oidc.signals import openid_create_or_update_user
from authentication.backends.saml2.signals import saml2_create_or_update_user from authentication.backends.saml2.signals import saml2_create_or_update_user
from common.decorators import on_transaction_commit from common.decorators import on_transaction_commit
from common.utils import get_logger from common.utils import get_logger
from jumpserver.utils import get_current_request
from .models import User, UserPasswordHistory from .models import User, UserPasswordHistory
from .signals import post_user_create from .signals import post_user_create
logger = get_logger(__file__) logger = get_logger(__file__)
def user_authenticated_handle(user, created, source, attrs=None, **kwargs): def third_party_login_acl(created):
if created and settings.ONLY_ALLOW_EXIST_USER_AUTH: if created and settings.ONLY_ALLOW_EXIST_USER_AUTH:
user.delete() request = get_current_request()
raise PermissionDenied(f'Not allow non-exist user auth: {user.username}') request.user_need_delete = True
request.error_message = _(
'''The administrator has enabled "Only allow existing users to log in",
and the current user is not in the user list. Please contact the administrator.'''
)
return False
return True
def user_authenticated_handle(user, created, source, attrs=None, **kwargs):
if not third_party_login_acl(created):
return
if created: if created:
user.source = source user.source = source
user.save() user.save()
@ -122,9 +135,8 @@ def on_ldap_create_user(sender, user, ldap_user, **kwargs):
@receiver(openid_create_or_update_user) @receiver(openid_create_or_update_user)
def on_openid_create_or_update_user(sender, request, user, created, name, username, email, **kwargs): def on_openid_create_or_update_user(sender, request, user, created, name, username, email, **kwargs):
if created and settings.ONLY_ALLOW_EXIST_USER_AUTH: if not third_party_login_acl(created):
user.delete() return
raise PermissionDenied(f'Not allow non-exist user auth: {username}')
if created: if created:
logger.debug( logger.debug(