import base64 import hashlib import uuid from datetime import datetime, timedelta from captcha.views import CaptchaStore, captcha_image from django.contrib import auth from django.contrib.auth import login from django.shortcuts import redirect from django.utils.translation import gettext_lazy as _ from drf_yasg import openapi from drf_yasg.utils import swagger_auto_schema from rest_framework import serializers from rest_framework.views import APIView from rest_framework_simplejwt.serializers import TokenObtainPairSerializer from rest_framework_simplejwt.views import TokenObtainPairView from django.conf import settings from application import dispatch from dvadmin.system.models import Users from dvadmin.utils.json_response import ErrorResponse, DetailResponse from dvadmin.utils.request_util import save_login_log from dvadmin.utils.serializers import CustomModelSerializer from dvadmin.utils.validator import CustomValidationError class CaptchaView(APIView): authentication_classes = [] permission_classes = [] @swagger_auto_schema( responses={"200": openapi.Response("获取成功")}, security=[], operation_id="captcha-get", operation_description="验证码获取", ) def get(self, request): data = {} if dispatch.get_system_config_values("base.captcha_state"): hashkey = CaptchaStore.generate_key() id = CaptchaStore.objects.filter(hashkey=hashkey).first().id imgage = captcha_image(request, hashkey) # 将图片转换为base64 image_base = base64.b64encode(imgage.content) data = { "key": id, "image_base": "data:image/png;base64," + image_base.decode("utf-8"), } return DetailResponse(data=data) class LoginSerializer(TokenObtainPairSerializer): """ 登录的序列化器: 重写djangorestframework-simplejwt的序列化器 """ captcha = serializers.CharField( max_length=6, required=False, allow_null=True, allow_blank=True ) class Meta: model = Users fields = "__all__" read_only_fields = ["id"] class LoginView(TokenObtainPairView): """ 登录接口 """ serializer_class = LoginSerializer permission_classes = [] def post(self, request, *args, **kwargs): # username可能携带的不止是用户名,可能还是用户的其它唯一标识 手机号 邮箱 username = request.data.get('username',None) if username is None: return ErrorResponse(msg="账号不能为空") password = request.data.get('password',None) if password is None: return ErrorResponse(msg="密码不能为空") if dispatch.get_system_config_values("base.captcha_state"): captcha = request.data.get('captcha', None) captchaKey = request.data.get('captchaKey', None) if captchaKey is None: return ErrorResponse(msg="验证码不能为空") if captcha is None: raise CustomValidationError("验证码不能为空") self.image_code = CaptchaStore.objects.filter( id=captchaKey ).first() five_minute_ago = datetime.now() - timedelta(hours=0, minutes=5, seconds=0) if self.image_code and five_minute_ago > self.image_code.expiration: self.image_code and self.image_code.delete() raise CustomValidationError("验证码过期") else: if self.image_code and ( self.image_code.response == captcha or self.image_code.challenge == captcha ): self.image_code and self.image_code.delete() else: self.image_code and self.image_code.delete() raise CustomValidationError("图片验证码错误") try: # 手动通过 user 签发 jwt-token user = Users.objects.get(username=username) except: return ErrorResponse(msg='该账号未注册') # 获得用户后,校验密码并签发token if not user.check_password(password): return ErrorResponse(msg='密码错误') result = { "name":user.name, "userId":user.id, "avatar":user.avatar, } dept = getattr(user, 'dept', None) if dept: result['dept_info'] = { 'dept_id': dept.id, 'dept_name': dept.name, 'dept_key': dept.key } role = getattr(user, 'role', None) if role: result['role_info'] = role.values('id', 'name', 'key') refresh = LoginSerializer.get_token(user) if settings.STRICT_LOGIN: login_flag = uuid.uuid4().__str__() refresh['login_flag'] = login_flag user.login_flag = login_flag try: user.save() except Exception as e: return ErrorResponse("登录失败!系统发生致命错误!") result["refresh"] = str(refresh) result["access"] = str(refresh.access_token) # 记录登录日志 request.user = user save_login_log(request=request) return DetailResponse(data=result,msg="获取成功") class LoginTokenSerializer(TokenObtainPairSerializer): """ 登录的序列化器: """ class Meta: model = Users fields = "__all__" read_only_fields = ["id"] default_error_messages = {"no_active_account": _("账号/密码不正确")} def validate(self, attrs): if not getattr(settings, "LOGIN_NO_CAPTCHA_AUTH", False): return {"code": 4000, "msg": "该接口暂未开通!", "data": None} data = super().validate(attrs) data["name"] = self.user.name data["userId"] = self.user.id return {"code": 2000, "msg": "请求成功", "data": data} class LoginTokenView(TokenObtainPairView): """ 登录获取token接口 """ serializer_class = LoginTokenSerializer permission_classes = [] class LogoutView(APIView): def post(self, request): if settings.STRICT_LOGIN: user = request.user try: user.login_flag = "logout" user.save() except Exception as e: pass return DetailResponse(msg="注销成功") class ApiLoginSerializer(CustomModelSerializer): """接口文档登录-序列化器""" username = serializers.CharField() password = serializers.CharField() class Meta: model = Users fields = ["username", "password"] class ApiLogin(APIView): """接口文档的登录接口""" serializer_class = ApiLoginSerializer authentication_classes = [] permission_classes = [] def post(self, request): username = request.data.get("username") password = request.data.get("password") user_obj = auth.authenticate( request, username=username, password=hashlib.md5(password.encode(encoding="UTF-8")).hexdigest(), ) if user_obj: login(request, user_obj) return redirect("/") else: return ErrorResponse(msg="账号/密码错误")