|
|
|
@ -11,7 +11,6 @@ from django.utils.functional import LazyObject
|
|
|
|
|
from django.utils.translation import gettext_lazy as _ |
|
|
|
|
from rest_framework.request import Request |
|
|
|
|
|
|
|
|
|
from acls.const import ActionChoices |
|
|
|
|
from acls.models import LoginACL |
|
|
|
|
from acls.notifications import UserLoginReminderMsg |
|
|
|
|
from audits.models import UserLoginLog |
|
|
|
@ -85,6 +84,9 @@ def generate_data(username, request, login_type=None):
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def create_user_session(request, user_id, instance: UserLoginLog): |
|
|
|
|
# TODO 目前只记录 web 登录的 session |
|
|
|
|
if instance.type != LoginTypeChoices.web: |
|
|
|
|
return |
|
|
|
|
session_key = request.session.session_key or '-' |
|
|
|
|
session_store_cls = import_module(settings.SESSION_ENGINE).SessionStore |
|
|
|
|
session_store = session_store_cls(session_key=session_key) |
|
|
|
@ -102,10 +104,21 @@ def create_user_session(request, user_id, instance: UserLoginLog):
|
|
|
|
|
'date_expired': instance.datetime + timedelta(seconds=ttl), |
|
|
|
|
} |
|
|
|
|
user_session = UserSession.objects.create(**online_session_data) |
|
|
|
|
request.session['user_session_id'] = user_session.id |
|
|
|
|
request.session['user_session_id'] = str(user_session.id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def send_login_info_to_reviewers(instance: UserLoginLog | str, auth_acl_id): |
|
|
|
|
if isinstance(instance, str): |
|
|
|
|
instance = UserLoginLog.objects.filter(id=instance).first() |
|
|
|
|
|
|
|
|
|
if not instance: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
acl = LoginACL.objects.filter(id=auth_acl_id).first() |
|
|
|
|
if not acl or not acl.reviewers.exists(): |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
def send_login_info_to_reviewers(instance: UserLoginLog, reviewers): |
|
|
|
|
reviewers = acl.reviewers.all() |
|
|
|
|
for reviewer in reviewers: |
|
|
|
|
UserLoginReminderMsg(reviewer, instance).publish_async() |
|
|
|
|
|
|
|
|
@ -119,22 +132,15 @@ def on_user_auth_success(sender, user, request, login_type=None, **kwargs):
|
|
|
|
|
data.update({'mfa': int(user.mfa_enabled), 'status': True}) |
|
|
|
|
instance = write_login_log(**data) |
|
|
|
|
|
|
|
|
|
# TODO 目前只记录 web 登录的 session |
|
|
|
|
if instance.type != LoginTypeChoices.web: |
|
|
|
|
return |
|
|
|
|
create_user_session(request, user.id, instance) |
|
|
|
|
|
|
|
|
|
request.session['user_log_id'] = str(instance.id) |
|
|
|
|
request.session['can_send_notifications'] = True |
|
|
|
|
auth_notice_required = request.session.get('auth_notice_required') |
|
|
|
|
if not auth_notice_required: |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
auth_acl_id = request.session.get('auth_acl_id') |
|
|
|
|
acl = LoginACL.objects.filter(id=auth_acl_id, action=ActionChoices.notice).first() |
|
|
|
|
if not acl or not acl.reviewers.exists(): |
|
|
|
|
return |
|
|
|
|
|
|
|
|
|
reviewers = acl.reviewers.all() |
|
|
|
|
send_login_info_to_reviewers(instance, reviewers) |
|
|
|
|
send_login_info_to_reviewers(instance, auth_acl_id) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@receiver(post_auth_failed) |
|
|
|
|