jumpserver/apps/audits/signal_handlers/login_log.py

97 lines
3.4 KiB
Python

# -*- coding: utf-8 -*-
#
from django.utils.functional import LazyObject
from django.utils.translation import ugettext_lazy as _
from django.conf import settings
from django.contrib.auth import BACKEND_SESSION_KEY
from django.dispatch import receiver
from django.utils import timezone, translation
from rest_framework.request import Request
from authentication.signals import post_auth_failed, post_auth_success
from authentication.utils import check_different_city_login_if_need
from common.utils import get_request_ip, get_logger
from users.models import User
from ..utils import write_login_log
logger = get_logger(__name__)
class AuthBackendLabelMapping(LazyObject):
@staticmethod
def get_login_backends():
backend_label_mapping = {}
for source, backends in User.SOURCE_BACKEND_MAPPING.items():
for backend in backends:
backend_label_mapping[backend] = source.label
backend_label_mapping[settings.AUTH_BACKEND_PUBKEY] = _("SSH Key")
backend_label_mapping[settings.AUTH_BACKEND_MODEL] = _("Password")
backend_label_mapping[settings.AUTH_BACKEND_SSO] = _("SSO")
backend_label_mapping[settings.AUTH_BACKEND_AUTH_TOKEN] = _("Auth Token")
backend_label_mapping[settings.AUTH_BACKEND_WECOM] = _("WeCom")
backend_label_mapping[settings.AUTH_BACKEND_FEISHU] = _("FeiShu")
backend_label_mapping[settings.AUTH_BACKEND_DINGTALK] = _("DingTalk")
backend_label_mapping[settings.AUTH_BACKEND_TEMP_TOKEN] = _("Temporary token")
return backend_label_mapping
def _setup(self):
self._wrapped = self.get_login_backends()
AUTH_BACKEND_LABEL_MAPPING = AuthBackendLabelMapping()
def get_login_backend(request):
backend = request.session.get('auth_backend', '') or \
request.session.get(BACKEND_SESSION_KEY, '')
backend_label = AUTH_BACKEND_LABEL_MAPPING.get(backend, None)
if backend_label is None:
backend_label = ''
return backend_label
def generate_data(username, request, login_type=None):
user_agent = request.META.get('HTTP_USER_AGENT', '')
login_ip = get_request_ip(request) or '0.0.0.0'
if login_type is None and isinstance(request, Request):
login_type = request.META.get('HTTP_X_JMS_LOGIN_TYPE', 'U')
if login_type is None:
login_type = 'W'
with translation.override('en'):
backend = str(get_login_backend(request))
data = {
'username': username,
'ip': login_ip,
'type': login_type,
'user_agent': user_agent[0:254],
'datetime': timezone.now(),
'backend': backend,
}
return data
@receiver(post_auth_success)
def on_user_auth_success(sender, user, request, login_type=None, **kwargs):
logger.debug('User login success: {}'.format(user.username))
check_different_city_login_if_need(user, request)
data = generate_data(
user.username, request, login_type=login_type
)
request.session['login_time'] = data['datetime'].strftime("%Y-%m-%d %H:%M:%S")
data.update({'mfa': int(user.mfa_enabled), 'status': True})
write_login_log(**data)
@receiver(post_auth_failed)
def on_user_auth_failed(sender, username, request, reason='', **kwargs):
logger.debug('User login failed: {}'.format(username))
data = generate_data(username, request)
data.update({'reason': reason[:128], 'status': False})
write_login_log(**data)