feat: 添加限制用户只能从source登录的功能 (#5592)

* stash it

* feat: 添加限制用户只能从source登录的功能

* fix: 修复小错误

Co-authored-by: ibuler <ibuler@qq.com>
pull/5652/head
fit2bot 2021-02-26 17:33:11 +08:00 committed by GitHub
parent b483f78d52
commit a7ab7da61c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 146 additions and 319 deletions

View File

@ -1,10 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
from django.db.models.signals import post_save, post_delete from django.db.models.signals import post_save, post_delete
from django.dispatch import receiver from django.dispatch import receiver
from django.conf import settings
from django.db import transaction from django.db import transaction
from django.utils import timezone from django.utils import timezone
from django.utils.functional import LazyObject
from django.contrib.auth import BACKEND_SESSION_KEY from django.contrib.auth import BACKEND_SESSION_KEY
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from rest_framework.renderers import JSONRenderer from rest_framework.renderers import JSONRenderer
@ -34,17 +35,22 @@ MODELS_NEED_RECORD = (
) )
LOGIN_BACKEND = { class AuthBackendLabelMapping(LazyObject):
'PublicKeyAuthBackend': _('SSH Key'), @staticmethod
'RadiusBackend': User.Source.radius.label, def get_login_backends():
'RadiusRealmBackend': User.Source.radius.label, backend_label_mapping = {}
'LDAPAuthorizationBackend': User.Source.ldap.label, for source, backends in User.SOURCE_BACKEND_MAPPING.items():
'ModelBackend': _('Password'), for backend in backends:
'SSOAuthentication': _('SSO'), backend_label_mapping[backend] = source.label
'CASBackend': User.Source.cas.label, backend_label_mapping[settings.AUTH_BACKEND_PUBKEY] = _('SSH Key')
'OIDCAuthCodeBackend': User.Source.openid.label, backend_label_mapping[settings.AUTH_BACKEND_MODEL] = _('Password')
'OIDCAuthPasswordBackend': User.Source.openid.label, return backend_label_mapping
}
def _setup(self):
self._wrapped = self.get_login_backends()
AUTH_BACKEND_LABEL_MAPPING = AuthBackendLabelMapping()
def create_operate_log(action, sender, resource): def create_operate_log(action, sender, resource):
@ -70,6 +76,7 @@ def create_operate_log(action, sender, resource):
@receiver(post_save) @receiver(post_save)
def on_object_created_or_update(sender, instance=None, created=False, update_fields=None, **kwargs): def on_object_created_or_update(sender, instance=None, created=False, update_fields=None, **kwargs):
# last_login 改变是最后登录日期, 每次登录都会改变
if instance._meta.object_name == 'User' and \ if instance._meta.object_name == 'User' and \
update_fields and 'last_login' in update_fields: update_fields and 'last_login' in update_fields:
return return
@ -125,14 +132,13 @@ def on_audits_log_create(sender, instance=None, **kwargs):
def get_login_backend(request): def get_login_backend(request):
backend = request.session.get('auth_backend', '') or request.session.get(BACKEND_SESSION_KEY, '') backend = request.session.get('auth_backend', '') or \
request.session.get(BACKEND_SESSION_KEY, '')
backend = backend.rsplit('.', maxsplit=1)[-1] backend_label = AUTH_BACKEND_LABEL_MAPPING.get(backend, None)
if backend in LOGIN_BACKEND: if backend_label is None:
return LOGIN_BACKEND[backend] backend_label = ''
else: return backend_label
logger.warn(f'LOGIN_BACKEND_NOT_FOUND: {backend}')
return ''
def generate_data(username, request): def generate_data(username, request):

View File

@ -2,7 +2,7 @@
# #
import traceback import traceback
from django.contrib.auth import get_user_model, authenticate from django.contrib.auth import get_user_model
from radiusauth.backends import RADIUSBackend, RADIUSRealmBackend from radiusauth.backends import RADIUSBackend, RADIUSRealmBackend
from django.conf import settings from django.conf import settings

View File

@ -18,6 +18,7 @@ reason_user_not_exist = 'user_not_exist'
reason_password_expired = 'password_expired' reason_password_expired = 'password_expired'
reason_user_invalid = 'user_invalid' reason_user_invalid = 'user_invalid'
reason_user_inactive = 'user_inactive' reason_user_inactive = 'user_inactive'
reason_backend_not_match = 'backend_not_match'
reason_choices = { reason_choices = {
reason_password_failed: _('Username/password check failed'), reason_password_failed: _('Username/password check failed'),
@ -27,7 +28,8 @@ reason_choices = {
reason_user_not_exist: _("Username does not exist"), reason_user_not_exist: _("Username does not exist"),
reason_password_expired: _("Password expired"), reason_password_expired: _("Password expired"),
reason_user_invalid: _('Disabled or expired'), reason_user_invalid: _('Disabled or expired'),
reason_user_inactive: _("This account is inactive.") reason_user_inactive: _("This account is inactive."),
reason_backend_not_match: _("Auth backend not match")
} }
old_reason_choices = { old_reason_choices = {
'0': '-', '0': '-',

View File

@ -9,7 +9,7 @@ from django.contrib.auth import authenticate
from django.shortcuts import reverse from django.shortcuts import reverse
from django.contrib.auth import BACKEND_SESSION_KEY from django.contrib.auth import BACKEND_SESSION_KEY
from common.utils import get_object_or_none, get_request_ip, get_logger from common.utils import get_object_or_none, get_request_ip, get_logger, bulk_get
from users.models import User from users.models import User
from users.utils import ( from users.utils import (
is_block_login, clean_failed_count is_block_login, clean_failed_count
@ -24,6 +24,7 @@ logger = get_logger(__name__)
class AuthMixin: class AuthMixin:
request = None request = None
partial_credential_error = None
def get_user_from_session(self): def get_user_from_session(self):
if self.request.session.is_empty(): if self.request.session.is_empty():
@ -75,49 +76,75 @@ class AuthMixin:
return rsa_decrypt(raw_passwd, rsa_private_key) return rsa_decrypt(raw_passwd, rsa_private_key)
except Exception as e: except Exception as e:
logger.error(e, exc_info=True) logger.error(e, exc_info=True)
logger.error(f'Decrypt password faild: password[{raw_passwd}] rsa_private_key[{rsa_private_key}]') logger.error(f'Decrypt password failed: password[{raw_passwd}] '
f'rsa_private_key[{rsa_private_key}]')
return None return None
return raw_passwd return raw_passwd
def check_user_auth(self, decrypt_passwd=False): def raise_credential_error(self, error):
self.check_is_block() raise self.partial_credential_error(error=errors.reason_password_decrypt_failed)
def get_auth_data(self, decrypt_passwd=False):
request = self.request request = self.request
if hasattr(request, 'data'): if hasattr(request, 'data'):
data = request.data data = request.data
else: else:
data = request.POST data = request.POST
username = data.get('username', '')
password = data.get('password', '')
challenge = data.get('challenge', '')
public_key = data.get('public_key', '')
ip = self.get_request_ip()
CredentialError = partial(errors.CredentialError, username=username, ip=ip, request=request) items = ['username', 'password', 'challenge', 'public_key']
username, password, challenge, public_key = bulk_get(data, *items, default='')
password = password + challenge.strip()
ip = self.get_request_ip()
self.partial_credential_error = partial(errors.CredentialError, username=username, ip=ip, request=request)
if decrypt_passwd: if decrypt_passwd:
password = self.decrypt_passwd(password) password = self.decrypt_passwd(password)
if not password: if not password:
raise CredentialError(error=errors.reason_password_decrypt_failed) self.raise_credential_error(errors.reason_password_decrypt_failed)
return username, password, public_key, ip
user = authenticate(request, def _check_only_allow_exists_user_auth(self, username):
username=username, # 仅允许预先存在的用户认证
password=password + challenge.strip(), if settings.ONLY_ALLOW_EXIST_USER_AUTH:
public_key=public_key) exist = User.objects.filter(username=username).exists()
if not exist:
logger.error(f"Only allow exist user auth, login failed: {username}")
self.raise_credential_error(errors.reason_user_not_exist)
def _check_auth_user_is_valid(self, username, password, public_key):
user = authenticate(self.request, username=username, password=password, public_key=public_key)
if not user: if not user:
raise CredentialError(error=errors.reason_password_failed) self.raise_credential_error(errors.reason_password_failed)
elif user.is_expired: elif user.is_expired:
raise CredentialError(error=errors.reason_user_inactive) self.raise_credential_error(errors.reason_user_inactive)
elif not user.is_active: elif not user.is_active:
raise CredentialError(error=errors.reason_user_inactive) self.raise_credential_error(errors.reason_user_inactive)
return user
def _check_auth_source_is_valid(self, user, auth_backend):
# 限制只能从认证来源登录
if settings.ONLY_ALLOW_AUTH_FROM_SOURCE:
auth_backends_allowed = user.SOURCE_BACKEND_MAPPING.get(user.source)
if auth_backend not in auth_backends_allowed:
self.raise_credential_error(error=errors.reason_backend_not_match)
def check_user_auth(self, decrypt_passwd=False):
self.check_is_block()
request = self.request
username, password, public_key, ip = self.get_auth_data(decrypt_passwd=decrypt_passwd)
self._check_only_allow_exists_user_auth(username)
user = self._check_auth_user_is_valid(username, password, public_key)
# 限制只能从认证来源登录
auth_backend = getattr(user, 'backend', 'django.contrib.auth.backends.ModelBackend')
self._check_auth_source_is_valid(user, auth_backend)
self._check_password_require_reset_or_not(user) self._check_password_require_reset_or_not(user)
self._check_passwd_is_too_simple(user, password) self._check_passwd_is_too_simple(user, password)
clean_failed_count(username, ip) clean_failed_count(username, ip)
request.session['auth_password'] = 1 request.session['auth_password'] = 1
request.session['user_id'] = str(user.id) request.session['user_id'] = str(user.id)
auth_backend = getattr(user, 'backend', 'django.contrib.auth.backends.ModelBackend')
request.session['auth_backend'] = auth_backend request.session['auth_backend'] = auth_backend
return user return user

View File

@ -24,7 +24,7 @@ def on_user_auth_login_success(sender, user, request, **kwargs):
@receiver(openid_user_login_success) @receiver(openid_user_login_success)
def on_oidc_user_login_success(sender, request, user, **kwargs): def on_oidc_user_login_success(sender, request, user, create=False, **kwargs):
request.session[BACKEND_SESSION_KEY] = 'OIDCAuthCodeBackend' request.session[BACKEND_SESSION_KEY] = 'OIDCAuthCodeBackend'
post_auth_success.send(sender, user=user, request=request) post_auth_success.send(sender, user=user, request=request)

View File

@ -45,9 +45,10 @@ class UserLoginView(mixins.AuthMixin, FormView):
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
if request.user.is_staff: if request.user.is_staff:
return redirect(redirect_user_first_login_or_index( first_login_url = redirect_user_first_login_or_index(
request, self.redirect_field_name) request, self.redirect_field_name
) )
return redirect(first_login_url)
request.session.set_test_cookie() request.session.set_test_cookie()
return super().get(request, *args, **kwargs) return super().get(request, *args, **kwargs)

View File

@ -272,5 +272,12 @@ class Time:
last = timestamp last = timestamp
def bulk_get(d, *keys, default=None):
values = []
for key in keys:
values.append(d.get(key, default))
return values
def isinstance_method(attr): def isinstance_method(attr):
return isinstance(attr, type(Time().time)) return isinstance(attr, type(Time().time))

View File

@ -282,6 +282,8 @@ class Config(dict):
'REFERER_CHECK_ENABLED': False, 'REFERER_CHECK_ENABLED': False,
'SERVER_REPLAY_STORAGE': {}, 'SERVER_REPLAY_STORAGE': {},
'CONNECTION_TOKEN_ENABLED': False, 'CONNECTION_TOKEN_ENABLED': False,
'ONLY_ALLOW_EXIST_USER_AUTH': False,
'ONLY_ALLOW_AUTH_FROM_SOURCE': True,
'DISK_CHECK_ENABLED': True, 'DISK_CHECK_ENABLED': True,
} }

View File

@ -2,6 +2,7 @@
# #
import os import os
import ldap import ldap
from django.utils.translation import ugettext_lazy as _
from ..const import CONFIG, PROJECT_DIR from ..const import CONFIG, PROJECT_DIR
@ -94,7 +95,7 @@ CAS_IGNORE_REFERER = True
CAS_LOGOUT_COMPLETELY = CONFIG.CAS_LOGOUT_COMPLETELY CAS_LOGOUT_COMPLETELY = CONFIG.CAS_LOGOUT_COMPLETELY
CAS_VERSION = CONFIG.CAS_VERSION CAS_VERSION = CONFIG.CAS_VERSION
CAS_ROOT_PROXIED_AS = CONFIG.CAS_ROOT_PROXIED_AS CAS_ROOT_PROXIED_AS = CONFIG.CAS_ROOT_PROXIED_AS
CAS_CHECK_NEXT = lambda: lambda _next_page: True CAS_CHECK_NEXT = lambda _next_page: True
# SSO Auth # SSO Auth
AUTH_SSO = CONFIG.AUTH_SSO AUTH_SSO = CONFIG.AUTH_SSO
@ -105,17 +106,29 @@ TOKEN_EXPIRATION = CONFIG.TOKEN_EXPIRATION
LOGIN_CONFIRM_ENABLE = CONFIG.LOGIN_CONFIRM_ENABLE LOGIN_CONFIRM_ENABLE = CONFIG.LOGIN_CONFIRM_ENABLE
OTP_IN_RADIUS = CONFIG.OTP_IN_RADIUS OTP_IN_RADIUS = CONFIG.OTP_IN_RADIUS
AUTHENTICATION_BACKENDS = [
'authentication.backends.pubkey.PublicKeyAuthBackend', AUTH_BACKEND_MODEL = 'django.contrib.auth.backends.ModelBackend'
'django.contrib.auth.backends.ModelBackend', AUTH_BACKEND_PUBKEY = 'authentication.backends.pubkey.PublicKeyAuthBackend'
] AUTH_BACKEND_LDAP = 'authentication.backends.ldap.LDAPAuthorizationBackend'
AUTH_BACKEND_OIDC_PASSWORD = 'jms_oidc_rp.backends.OIDCAuthPasswordBackend'
AUTH_BACKEND_OIDC_CODE = 'jms_oidc_rp.backends.OIDCAuthCodeBackend'
AUTH_BACKEND_RADIUS = 'authentication.backends.radius.RadiusBackend'
AUTH_BACKEND_CAS = 'authentication.backends.cas.CASBackend'
AUTH_BACKEND_SSO = 'authentication.backends.api.SSOAuthentication'
AUTHENTICATION_BACKENDS = [AUTH_BACKEND_MODEL, AUTH_BACKEND_PUBKEY]
if AUTH_CAS: if AUTH_CAS:
AUTHENTICATION_BACKENDS.insert(0, 'authentication.backends.cas.CASBackend') AUTHENTICATION_BACKENDS.insert(0, AUTH_BACKEND_CAS)
if AUTH_OPENID: if AUTH_OPENID:
AUTHENTICATION_BACKENDS.insert(0, 'jms_oidc_rp.backends.OIDCAuthPasswordBackend') AUTHENTICATION_BACKENDS.insert(0, AUTH_BACKEND_OIDC_PASSWORD)
AUTHENTICATION_BACKENDS.insert(0, 'jms_oidc_rp.backends.OIDCAuthCodeBackend') AUTHENTICATION_BACKENDS.insert(0, AUTH_BACKEND_OIDC_CODE)
if AUTH_RADIUS: if AUTH_RADIUS:
AUTHENTICATION_BACKENDS.insert(0, 'authentication.backends.radius.RadiusBackend') AUTHENTICATION_BACKENDS.insert(0, AUTH_BACKEND_RADIUS)
if AUTH_SSO: if AUTH_SSO:
AUTHENTICATION_BACKENDS.insert(0, 'authentication.backends.api.SSOAuthentication') AUTHENTICATION_BACKENDS.insert(0, AUTH_BACKEND_SSO)
ONLY_ALLOW_EXIST_USER_AUTH = CONFIG.ONLY_ALLOW_EXIST_USER_AUTH
ONLY_ALLOW_AUTH_FROM_SOURCE = CONFIG.ONLY_ALLOW_AUTH_FROM_SOURCE

View File

@ -90,7 +90,7 @@ class Setting(models.Model):
setting = cls.objects.filter(name='AUTH_LDAP').first() setting = cls.objects.filter(name='AUTH_LDAP').first()
if not setting: if not setting:
return return
ldap_backend = 'authentication.backends.ldap.LDAPAuthorizationBackend' ldap_backend = settings.AUTH_BACKEND_LABEL_MAPPING['ldap']
backends = settings.AUTHENTICATION_BACKENDS backends = settings.AUTHENTICATION_BACKENDS
has = ldap_backend in backends has = ldap_backend in backends
if setting.cleaned_value and not has: if setting.cleaned_value and not has:

View File

@ -1,5 +1,3 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
from .user import *
from .group import *
from .profile import * from .profile import *

View File

@ -1,44 +0,0 @@
# -*- coding: utf-8 -*-
#
from django import forms
from django.utils.translation import gettext_lazy as _
from orgs.mixins.forms import OrgModelForm
from ..models import User, UserGroup
__all__ = ['UserGroupForm']
class UserGroupForm(OrgModelForm):
users = forms.ModelMultipleChoiceField(
queryset=User.objects.none(),
label=_("User"),
widget=forms.SelectMultiple(
attrs={
'class': 'users-select2',
'data-placeholder': _('Select users')
}
),
required=False,
)
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.set_fields_queryset()
def set_fields_queryset(self):
users_field = self.fields.get('users')
if self.instance:
users_field.initial = self.instance.users.all()
users_field.queryset = self.instance.users.all()
else:
users_field.queryset = User.objects.none()
def save(self, commit=True):
raise Exception("Save by restful api")
class Meta:
model = UserGroup
fields = [
'name', 'users', 'comment',
]

View File

@ -12,9 +12,21 @@ __all__ = [
'UserProfileForm', 'UserMFAForm', 'UserFirstLoginFinishForm', 'UserProfileForm', 'UserMFAForm', 'UserFirstLoginFinishForm',
'UserPasswordForm', 'UserPublicKeyForm', 'FileForm', 'UserPasswordForm', 'UserPublicKeyForm', 'FileForm',
'UserTokenResetPasswordForm', 'UserForgotPasswordForm', 'UserTokenResetPasswordForm', 'UserForgotPasswordForm',
'UserCheckPasswordForm', 'UserCheckOtpCodeForm'
] ]
class UserCheckPasswordForm(forms.Form):
password = forms.CharField(
label=_('Password'), widget=forms.PasswordInput,
max_length=128, strip=False
)
class UserCheckOtpCodeForm(forms.Form):
otp_code = forms.CharField(label=_('MFA code'), max_length=6)
class UserProfileForm(forms.ModelForm): class UserProfileForm(forms.ModelForm):
username = forms.CharField(disabled=True, label=_("Username")) username = forms.CharField(disabled=True, label=_("Username"))
name = forms.CharField(disabled=True, label=_("Name")) name = forms.CharField(disabled=True, label=_("Name"))

View File

@ -1,199 +0,0 @@
from django import forms
from django.utils.translation import gettext_lazy as _
from common.utils import validate_ssh_public_key
from orgs.mixins.forms import OrgModelForm
from ..models import User
from ..utils import (
check_password_rules, get_current_org_members, get_source_choices
)
__all__ = [
'UserCreateForm', 'UserUpdateForm', 'UserBulkUpdateForm',
'UserCheckOtpCodeForm', 'UserCheckPasswordForm'
]
class UserCreateUpdateFormMixin(OrgModelForm):
role_choices = ((i, n) for i, n in User.ROLE.choices if i != User.ROLE.APP)
password = forms.CharField(
label=_('Password'), widget=forms.PasswordInput,
max_length=128, strip=False, required=False,
)
role = forms.ChoiceField(
choices=role_choices, required=True,
initial=User.ROLE.USER, label=_("Role")
)
source = forms.ChoiceField(
choices=get_source_choices, required=True,
initial=User.Source.local.value, label=_("Source")
)
public_key = forms.CharField(
label=_('ssh public key'), max_length=5000, required=False,
widget=forms.Textarea(attrs={'placeholder': _('ssh-rsa AAAA...')}),
help_text=_('Paste user id_rsa.pub here.')
)
class Meta:
model = User
fields = [
'username', 'name', 'email', 'groups', 'wechat',
'source', 'phone', 'role', 'date_expired',
'comment', 'mfa_level'
]
widgets = {
'mfa_level': forms.RadioSelect(),
'groups': forms.SelectMultiple(
attrs={
'class': 'select2',
'data-placeholder': _('Join user groups')
}
)
}
def __init__(self, *args, **kwargs):
self.request = kwargs.pop("request", None)
super(UserCreateUpdateFormMixin, self).__init__(*args, **kwargs)
roles = []
# Super admin user
if self.request.user.is_superuser:
roles.append((User.ROLE.ADMIN, User.ROLE.ADMIN.label))
roles.append((User.ROLE.USER, User.ROLE.USER.label))
roles.append((User.ROLE.AUDITOR, User.ROLE.AUDITOR.label))
# Org admin user
else:
user = kwargs.get('instance')
# Update
if user:
role = kwargs.get('instance').role
roles.append((role, User.ROLE[role]))
# Create
else:
roles.append((User.ROLE.USER, User.ROLE.USER.label))
field = self.fields['role']
field.choices = set(roles)
def clean_public_key(self):
public_key = self.cleaned_data['public_key']
if not public_key:
return public_key
if self.instance.public_key and public_key == self.instance.public_key:
msg = _('Public key should not be the same as your old one.')
raise forms.ValidationError(msg)
if not validate_ssh_public_key(public_key):
raise forms.ValidationError(_('Not a valid ssh public key'))
return public_key
def clean_password(self):
password_strategy = self.data.get('password_strategy')
# 创建-不设置密码
if password_strategy == '0':
return
password = self.data.get('password')
# 更新-密码为空
if password_strategy is None and not password:
return
if not check_password_rules(password):
msg = _('* Your password does not meet the requirements')
raise forms.ValidationError(msg)
return password
def save(self, commit=True):
password = self.cleaned_data.get('password')
mfa_level = self.cleaned_data.get('mfa_level')
public_key = self.cleaned_data.get('public_key')
user = super().save(commit=commit)
if password:
user.reset_password(password)
if mfa_level:
user.mfa_level = mfa_level
user.save()
if public_key:
user.public_key = public_key
user.save()
return user
class UserCreateForm(UserCreateUpdateFormMixin):
EMAIL_SET_PASSWORD = _('Reset link will be generated and sent to the user')
CUSTOM_PASSWORD = _('Set password')
PASSWORD_STRATEGY_CHOICES = (
(0, EMAIL_SET_PASSWORD),
(1, CUSTOM_PASSWORD)
)
password_strategy = forms.ChoiceField(
choices=PASSWORD_STRATEGY_CHOICES, required=True, initial=0,
widget=forms.RadioSelect(), label=_('Password strategy')
)
class UserUpdateForm(UserCreateUpdateFormMixin):
pass
class UserBulkUpdateForm(OrgModelForm):
users = forms.ModelMultipleChoiceField(
required=True,
label=_('Select users'),
queryset=User.objects.none(),
widget=forms.SelectMultiple(
attrs={
'class': 'users-select2',
'data-placeholder': _('Select users')
}
)
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.set_fields_queryset()
def set_fields_queryset(self):
users_field = self.fields['users']
users_field.queryset = get_current_org_members()
class Meta:
model = User
fields = ['users', 'groups', 'date_expired']
widgets = {
"groups": forms.SelectMultiple(
attrs={
'class': 'select2',
'data-placeholder': _('User group')
}
)
}
def save(self, commit=True):
changed_fields = []
for field in self._meta.fields:
if self.data.get(field) is not None:
changed_fields.append(field)
cleaned_data = {k: v for k, v in self.cleaned_data.items()
if k in changed_fields}
users = cleaned_data.pop('users', '')
groups = cleaned_data.pop('groups', [])
users = User.objects.filter(id__in=[user.id for user in users])
users.update(**cleaned_data)
if groups:
for user in users:
user.groups.set(groups)
return users
class UserCheckPasswordForm(forms.Form):
password = forms.CharField(
label=_('Password'), widget=forms.PasswordInput,
max_length=128, strip=False
)
class UserCheckOtpCodeForm(forms.Form):
otp_code = forms.CharField(label=_('MFA code'), max_length=6)

View File

@ -514,6 +514,14 @@ class User(AuthMixin, TokenMixin, RoleMixin, MFAMixin, AbstractUser):
radius = 'radius', 'Radius' radius = 'radius', 'Radius'
cas = 'cas', 'CAS' cas = 'cas', 'CAS'
SOURCE_BACKEND_MAPPING = {
Source.local: [settings.AUTH_BACKEND_MODEL, settings.AUTH_BACKEND_PUBKEY],
Source.ldap: [settings.AUTH_BACKEND_LDAP],
Source.openid: [settings.AUTH_BACKEND_OIDC_PASSWORD, settings.AUTH_BACKEND_OIDC_CODE],
Source.radius: [settings.AUTH_BACKEND_RADIUS],
Source.cas: [settings.AUTH_BACKEND_CAS],
}
id = models.UUIDField(default=uuid.uuid4, primary_key=True) id = models.UUIDField(default=uuid.uuid4, primary_key=True)
username = models.CharField( username = models.CharField(
max_length=128, unique=True, verbose_name=_('Username') max_length=128, unique=True, verbose_name=_('Username')
@ -562,7 +570,8 @@ class User(AuthMixin, TokenMixin, RoleMixin, MFAMixin, AbstractUser):
max_length=30, default='', blank=True, verbose_name=_('Created by') max_length=30, default='', blank=True, verbose_name=_('Created by')
) )
source = models.CharField( source = models.CharField(
max_length=30, default=Source.local.value, choices=Source.choices, max_length=30, default=Source.local,
choices=Source.choices,
verbose_name=_('Source') verbose_name=_('Source')
) )
date_password_last_updated = models.DateTimeField( date_password_last_updated = models.DateTimeField(
@ -570,8 +579,6 @@ class User(AuthMixin, TokenMixin, RoleMixin, MFAMixin, AbstractUser):
verbose_name=_('Date password last updated') verbose_name=_('Date password last updated')
) )
user_cache_key_prefix = '_User_{}'
def __str__(self): def __str__(self):
return '{0.name}({0.username})'.format(self) return '{0.name}({0.username})'.format(self)

View File

@ -4,6 +4,7 @@
from django.dispatch import receiver from django.dispatch import receiver
from django_auth_ldap.backend import populate_user from django_auth_ldap.backend import populate_user
from django.conf import settings from django.conf import settings
from django.core.exceptions import PermissionDenied
from django_cas_ng.signals import cas_user_authenticated from django_cas_ng.signals import cas_user_authenticated
from jms_oidc_rp.signals import openid_create_or_update_user from jms_oidc_rp.signals import openid_create_or_update_user
@ -27,6 +28,9 @@ def on_user_create(sender, user=None, **kwargs):
@receiver(cas_user_authenticated) @receiver(cas_user_authenticated)
def on_cas_user_authenticated(sender, user, created, **kwargs): def on_cas_user_authenticated(sender, user, created, **kwargs):
if created and settings.ONLY_ALLOW_EXIST_USER_AUTH:
user.delete()
raise PermissionDenied(f'Not allow non-exist user auth: {user.username}')
if created: if created:
user.source = user.Source.cas.value user.source = user.Source.cas.value
user.save() user.save()
@ -43,6 +47,10 @@ def on_ldap_create_user(sender, user, ldap_user, **kwargs):
@receiver(openid_create_or_update_user) @receiver(openid_create_or_update_user)
def on_openid_create_or_update_user(sender, request, user, created, name, username, email, **kwargs): def on_openid_create_or_update_user(sender, request, user, created, name, username, email, **kwargs):
if created and settings.ONLY_ALLOW_EXIST_USER_AUTH:
user.delete()
raise PermissionDenied(f'Not allow non-exist user auth: {username}')
if created: if created:
logger.debug( logger.debug(
"Receive OpenID user created signal: {}, " "Receive OpenID user created signal: {}, "

View File

@ -382,22 +382,6 @@ def get_current_org_members(exclude=()):
return current_org.get_members(exclude=exclude) return current_org.get_members(exclude=exclude)
def get_source_choices():
from .models import User
choices = [
(User.Source.local.value, User.Source.local.label),
]
if settings.AUTH_LDAP:
choices.append((User.Source.ldap.value, User.Source.ldap.label))
if settings.AUTH_OPENID:
choices.append((User.Source.openid.value, User.Source.openid.label))
if settings.AUTH_RADIUS:
choices.append((User.Source.radius.value, User.Source.radius.label))
if settings.AUTH_CAS:
choices.append((User.Source.cas.value, User.Source.cas.label))
return choices
def is_auth_time_valid(session, key): def is_auth_time_valid(session, key):
return True if session.get(key, 0) > time.time() else False return True if session.get(key, 0) > time.time() else False

View File

@ -51,6 +51,9 @@ class UserOtpEnableInstallAppView(TemplateView):
return super().get_context_data(**kwargs) return super().get_context_data(**kwargs)
class UserOtpEnableBindView(AuthMixin, TemplateView, FormView): class UserOtpEnableBindView(AuthMixin, TemplateView, FormView):
template_name = 'users/user_otp_enable_bind.html' template_name = 'users/user_otp_enable_bind.html'
form_class = forms.UserCheckOtpCodeForm form_class = forms.UserCheckOtpCodeForm