[Update] 调整`MFA`绑定策略 V2

pull/4133/head
xinwen 2020-06-22 17:04:07 +08:00
parent 005573b53b
commit 3e993fd044
5 changed files with 53 additions and 44 deletions

View File

@ -10,3 +10,7 @@ class HttpResponseTemporaryRedirect(HttpResponse):
def __init__(self, redirect_to): def __init__(self, redirect_to):
HttpResponse.__init__(self) HttpResponse.__init__(self)
self['Location'] = iri_to_uri(redirect_to) self['Location'] = iri_to_uri(redirect_to)
def get_remote_addr(request):
return request.META.get("HTTP_X_FORWARDED_HOST") or request.META.get("REMOTE_ADDR")

View File

@ -88,4 +88,4 @@ WINDOWS_SKIP_ALL_MANUAL_PASSWORD = CONFIG.WINDOWS_SKIP_ALL_MANUAL_PASSWORD
CHANGE_AUTH_PLAN_SECURE_MODE_ENABLED = CONFIG.CHANGE_AUTH_PLAN_SECURE_MODE_ENABLED CHANGE_AUTH_PLAN_SECURE_MODE_ENABLED = CONFIG.CHANGE_AUTH_PLAN_SECURE_MODE_ENABLED
AUTH_EXPIRED_SECONDS = 60 * 5

View File

@ -5,8 +5,8 @@ import re
import pyotp import pyotp
import base64 import base64
import logging import logging
import time
from django.http import Http404
from django.conf import settings from django.conf import settings
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.core.cache import cache from django.core.cache import cache
@ -333,3 +333,15 @@ def get_source_choices():
if settings.AUTH_CAS: if settings.AUTH_CAS:
choices.append((User.SOURCE_CAS, choices_all[User.SOURCE_CAS])) choices.append((User.SOURCE_CAS, choices_all[User.SOURCE_CAS]))
return choices return choices
def is_auth_time_valid(session, key):
return True if session.get(key, 0) > time.time() else False
def is_auth_password_time_valid(session):
return is_auth_time_valid(session, 'auth_password_expired_at')
def is_auth_otp_time_valid(session):
return is_auth_time_valid(session, 'auth_opt_expired_at')

View File

@ -7,17 +7,17 @@ from django.views.generic.base import TemplateView
from django.views.generic.edit import FormView from django.views.generic.edit import FormView
from django.contrib.auth import logout as auth_logout from django.contrib.auth import logout as auth_logout
from django.conf import settings from django.conf import settings
from django.shortcuts import redirect from django.http.response import HttpResponseForbidden
from authentication.mixins import AuthMixin from authentication.mixins import AuthMixin
from users.models import User from users.models import User
from common.utils import get_logger from common.utils import get_logger, get_object_or_none
from common.utils import get_object_or_none
from common.permissions import IsValidUser from common.permissions import IsValidUser
from ... import forms from ... import forms
from .password import UserVerifyPasswordView from .password import UserVerifyPasswordView
from ...utils import ( from ...utils import (
generate_otp_uri, check_otp_code, get_user_or_pre_auth_user, generate_otp_uri, check_otp_code, get_user_or_pre_auth_user,
is_auth_password_time_valid, is_auth_otp_time_valid
) )
__all__ = [ __all__ = [
@ -57,52 +57,43 @@ class UserOtpEnableBindView(AuthMixin, TemplateView, FormView):
success_url = reverse_lazy('users:user-otp-settings-success') success_url = reverse_lazy('users:user-otp-settings-success')
def get(self, request, *args, **kwargs): def get(self, request, *args, **kwargs):
return self._check_can_bind() or super().get(request, *args, **kwargs) if self._check_can_bind():
return super().get(request, *args, **kwargs)
return HttpResponseForbidden()
def post(self, request, *args, **kwargs): def post(self, request, *args, **kwargs):
return self._check_can_bind() or super().post(request, *args, **kwargs) if self._check_can_bind():
return super().post(request, *args, **kwargs)
return HttpResponseForbidden()
def _check_can_bind(self): def _check_authenticated_user_can_bind(self):
""" user = self.request.user
:return: session = self.request.session
- `None` 表示验证成功
- `Response` 验证失败调用函数需直接返回该 `Response`
"""
request = self.request if not user.mfa_enabled:
request_user = request.user return is_auth_password_time_valid(session)
if not user.otp_secret_key:
return is_auth_password_time_valid(session)
return is_auth_otp_time_valid(session)
def _check_unauthenticated_user_can_bind(self):
session_user = None session_user = None
if not self.request.session.is_empty(): if not self.request.session.is_empty():
user_id = self.request.session.get('user_id') user_id = self.request.session.get('user_id')
session_user = get_object_or_none(User, pk=user_id) session_user = get_object_or_none(User, pk=user_id)
auth_password = request.session.get('auth_password') if session_user:
if request_user.is_authenticated: if all((is_auth_password_time_valid(self.request.session), session_user.mfa_enabled, not session_user.otp_secret_key)):
# 用户已登录,在 `mfa_enabled` 启用,而且 `otp_secret_key` 不为空的情况,跳转到 return True
# otp 认证界面 return False
if request_user.mfa_enabled and request_user.otp_secret_key:
logger.warn(f'OPT_BIND-> authenticated ' def _check_can_bind(self):
f'request_user.username={request_user.username}, ' if self.request.user.is_authenticated:
f'request_user.mfa_enabled={request_user.mfa_enabled}, ' return self._check_authenticated_user_can_bind()
f'request_user.otp_secret_key={request_user.otp_secret_key}')
return redirect(reverse('authentication:user-otp-update'))
return None
elif session_user:
# 未登录,但是验证过了密码,如果是 `reset` 流程,需要 `mfa_enabled` 启用,`otp_secret_key` 为空
if not all((auth_password, session_user.mfa_enabled, not session_user.otp_secret_key)):
logger.warn(f'OPT_BIND-> auth_password '
f'session_user.username={session_user.username}, '
f'auth_password={auth_password}, '
f'session_user.mfa_enabled={session_user.mfa_enabled}, '
f'session_user.otp_secret_key={session_user.otp_secret_key}')
return redirect(reverse('authentication:login'))
return None
else: else:
# 未登录,没有验证过密码,直接跳转到登录界面 return self._check_unauthenticated_user_can_bind()
logger.warn(f'OPT_BIND-> anonymous '
f'REMOTE_ADDR={request.META.get("HTTP_X_FORWARDED_HOST") or request.META.get("REMOTE_ADDR")}')
return redirect(reverse('authentication:login'))
def form_valid(self, form): def form_valid(self, form):
otp_code = form.cleaned_data.get('otp_code') otp_code = form.cleaned_data.get('otp_code')
@ -169,8 +160,7 @@ class UserOtpUpdateView(FormView):
valid = user.check_mfa(otp_code) valid = user.check_mfa(otp_code)
if valid: if valid:
user.otp_secret_key = '' self.request.session['auth_opt_expired_at'] = time.time() + settings.AUTH_EXPIRED_SECONDS
user.save()
return super().form_valid(form) return super().form_valid(form)
else: else:
error = _('MFA code invalid, or ntp sync server time') error = _('MFA code invalid, or ntp sync server time')

View File

@ -1,8 +1,10 @@
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import time
from django.conf import settings
from django.contrib.auth import authenticate from django.contrib.auth import authenticate
from django.shortcuts import redirect from django.shortcuts import redirect
from django.urls import reverse_lazy, reverse from django.urls import reverse_lazy
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from django.views.generic.edit import UpdateView, FormView from django.views.generic.edit import UpdateView, FormView
from django.contrib.auth import logout as auth_logout from django.contrib.auth import logout as auth_logout
@ -76,6 +78,7 @@ class UserVerifyPasswordView(FormView):
user.save() user.save()
self.request.session['user_id'] = str(user.id) self.request.session['user_id'] = str(user.id)
self.request.session['auth_password'] = 1 self.request.session['auth_password'] = 1
self.request.session['auth_password_expired_at'] = time.time() + settings.AUTH_EXPIRED_SECONDS
return redirect(self.get_success_url()) return redirect(self.get_success_url())
def get_success_url(self): def get_success_url(self):