jumpserver/apps/authentication/api/mfa.py

118 lines
3.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

# -*- coding: utf-8 -*-
#
import time
from django.utils.translation import ugettext as _
from django.conf import settings
from django.shortcuts import get_object_or_404
from rest_framework.permissions import AllowAny
from rest_framework.generics import CreateAPIView
from rest_framework.serializers import ValidationError
from rest_framework.response import Response
from common.permissions import IsValidUser, NeedMFAVerify
from common.utils import get_logger
from common.exceptions import UnexpectError
from users.models.user import User
from ..serializers import OtpVerifySerializer
from .. import serializers
from .. import errors
from ..mfa.otp import MFAOtp
from ..mixins import AuthMixin
logger = get_logger(__name__)
__all__ = [
'MFAChallengeVerifyApi', 'UserOtpVerifyApi',
'MFASendCodeApi'
]
# MFASelectAPi 原来的名字
class MFASendCodeApi(AuthMixin, CreateAPIView):
"""
选择 MFA 后对应操作 apikoko 目前在用
"""
permission_classes = (AllowAny,)
serializer_class = serializers.MFASelectTypeSerializer
username = ''
ip = ''
def get_user_from_db(self, username):
"""避免暴力测试用户名"""
ip = self.get_request_ip()
self.check_mfa_is_block(username, ip)
try:
user = get_object_or_404(User, username=username)
return user
except Exception as e:
self.incr_mfa_failed_time(username, ip)
raise e
def perform_create(self, serializer):
username = serializer.validated_data.get('username', '')
mfa_type = serializer.validated_data['type']
if not username:
user = self.get_user_from_session()
else:
user = self.get_user_from_db(username)
mfa_backend = user.get_active_mfa_backend_by_type(mfa_type)
if not mfa_backend or not mfa_backend.challenge_required:
error = _('Current user not support mfa type: {}').format(mfa_type)
raise ValidationError({'error': error})
try:
mfa_backend.send_challenge()
except Exception as e:
raise UnexpectError(str(e))
class MFAChallengeVerifyApi(AuthMixin, CreateAPIView):
permission_classes = (AllowAny,)
serializer_class = serializers.MFAChallengeSerializer
def perform_create(self, serializer):
user = self.get_user_from_session()
code = serializer.validated_data.get('code')
mfa_type = serializer.validated_data.get('type', '')
self._do_check_user_mfa(code, mfa_type, user)
def create(self, request, *args, **kwargs):
try:
super().create(request, *args, **kwargs)
return Response({'msg': 'ok'})
except errors.AuthFailedError as e:
data = {"error": e.error, "msg": e.msg}
raise ValidationError(data)
except errors.NeedMoreInfoError as e:
return Response(e.as_data(), status=200)
class UserOtpVerifyApi(CreateAPIView):
permission_classes = (IsValidUser,)
serializer_class = OtpVerifySerializer
def get(self, request, *args, **kwargs):
return Response({'code': 'valid', 'msg': 'verified'})
def create(self, request, *args, **kwargs):
serializer = self.get_serializer(data=request.data)
serializer.is_valid(raise_exception=True)
code = serializer.validated_data["code"]
otp = MFAOtp(request.user)
ok, error = otp.check_code(code)
if ok:
request.session["MFA_VERIFY_TIME"] = int(time.time())
return Response({"ok": "1"})
else:
return Response({"error": _("Code is invalid, {}").format(error)}, status=400)
def get_permissions(self):
if self.request.method.lower() == 'get' \
and settings.SECURITY_VIEW_AUTH_NEED_MFA:
self.permission_classes = [NeedMFAVerify]
return super().get_permissions()