perf(users): 优化用户认证来源

pull/5766/head
ibuler 2021-03-16 16:57:56 +08:00 committed by Jiangjie.Bai
parent 1216f15e45
commit ea325f6e52
2 changed files with 75 additions and 13 deletions

View File

@ -1,13 +1,17 @@
# -*- coding: utf-8 -*-
#
import inspect
from urllib.parse import urlencode
from functools import partial
import time
from django.conf import settings
from django.contrib.auth import authenticate
from django.contrib import auth
from django.contrib.auth import (
BACKEND_SESSION_KEY, _get_backends,
PermissionDenied, user_login_failed, _clean_credentials
)
from django.shortcuts import reverse
from django.contrib.auth import BACKEND_SESSION_KEY
from common.utils import get_object_or_none, get_request_ip, get_logger, bulk_get
from users.models import User
@ -22,6 +26,59 @@ from .const import RSA_PRIVATE_KEY
logger = get_logger(__name__)
def check_backend_can_auth(username, backend_path, allowed_auth_backends):
if allowed_auth_backends is not None and backend_path not in allowed_auth_backends:
logger.debug('Skip user auth backend: {}, {} not in'.format(
username, backend_path, ','.join(allowed_auth_backends)
)
)
return False
return True
def authenticate(request=None, **credentials):
"""
If the given credentials are valid, return a User object.
"""
username = credentials.get('username')
allowed_auth_backends = User.get_user_allowed_auth_backends(username)
for backend, backend_path in _get_backends(return_tuples=True):
# 预先检查,不浪费认证时间
if not check_backend_can_auth(username, backend_path, allowed_auth_backends):
continue
backend_signature = inspect.signature(backend.authenticate)
try:
backend_signature.bind(request, **credentials)
except TypeError:
# This backend doesn't accept these credentials as arguments. Try the next one.
continue
try:
user = backend.authenticate(request, **credentials)
except PermissionDenied:
# This backend says to stop in our tracks - this user should not be allowed in at all.
break
if user is None:
continue
# 如果是 None, 证明没有检查过, 需要再次检查
if allowed_auth_backends is None:
# 有些 authentication 参数中不带 username, 之后还要再检查
allowed_auth_backends = user.get_allowed_auth_backends()
if not check_backend_can_auth(user.username, backend_path, allowed_auth_backends):
continue
# Annotate the user object with the path of the backend.
user.backend = backend_path
return user
# The credentials supplied are invalid to all backends, fire signal
user_login_failed.send(sender=__name__, credentials=_clean_credentials(credentials), request=request)
auth.authenticate = authenticate
class AuthMixin:
request = None
partial_credential_error = None
@ -121,13 +178,6 @@ class AuthMixin:
self.raise_credential_error(errors.reason_user_inactive)
return user
def _check_auth_source_is_valid(self, user, auth_backend):
# 限制只能从认证来源登录
if settings.ONLY_ALLOW_AUTH_FROM_SOURCE:
auth_backends_allowed = user.SOURCE_BACKEND_MAPPING.get(user.source)
if auth_backend not in auth_backends_allowed:
self.raise_credential_error(error=errors.reason_backend_not_match)
def _check_login_acl(self, user, ip):
# ACL 限制用户登录
from acls.models import LoginACL
@ -144,9 +194,6 @@ class AuthMixin:
user = self._check_auth_user_is_valid(username, password, public_key)
# 校验login-acl规则
self._check_login_acl(user, ip)
# 限制只能从认证来源登录
auth_backend = getattr(user, 'backend', 'django.contrib.auth.backends.ModelBackend')
self._check_auth_source_is_valid(user, auth_backend)
self._check_password_require_reset_or_not(user)
self._check_passwd_is_too_simple(user, password)
@ -154,7 +201,7 @@ class AuthMixin:
request.session['auth_password'] = 1
request.session['user_id'] = str(user.id)
request.session['auto_login'] = auto_login
request.session['auth_backend'] = auth_backend
request.session['auth_backend'] = getattr(user, 'backend', settings.AUTH_BACKEND_MODEL)
return user
@classmethod

View File

@ -679,6 +679,21 @@ class User(AuthMixin, TokenMixin, RoleMixin, MFAMixin, AbstractUser):
return
return super(User, self).delete()
@classmethod
def get_user_allowed_auth_backends(cls, username):
if not settings.ONLY_ALLOW_AUTH_FROM_SOURCE or not username:
# return settings.AUTHENTICATION_BACKENDS
return None
user = cls.objects.filter(username=username).first()
if not user:
return None
return user.get_allowed_auth_backends()
def get_allowed_auth_backends(self):
if not settings.ONLY_ALLOW_AUTH_FROM_SOURCE:
return None
return self.SOURCE_BACKEND_MAPPING.get(self.source, [])
class Meta:
ordering = ['username']
verbose_name = _("User")