mirror of https://github.com/jumpserver/jumpserver
115 lines
4.0 KiB
Python
115 lines
4.0 KiB
Python
# -*- coding: utf-8 -*-
|
|
#
|
|
from collections import defaultdict
|
|
|
|
from django.db.models import Count
|
|
from django.http.response import JsonResponse
|
|
from django.utils import timezone
|
|
from rest_framework.views import APIView
|
|
|
|
from audits.const import LoginStatusChoices
|
|
from audits.models import UserLoginLog
|
|
from common.permissions import IsValidLicense
|
|
from common.utils import lazyproperty
|
|
from common.utils.timezone import local_zero_hour, local_now
|
|
from rbac.permissions import RBACPermission
|
|
|
|
__all__ = ['UserReportApi']
|
|
|
|
|
|
class UserReportApi(APIView):
|
|
http_method_names = ['get']
|
|
# TODO: Define the required RBAC permissions for this API
|
|
rbac_perms = {
|
|
}
|
|
permission_classes = [RBACPermission, IsValidLicense]
|
|
|
|
@lazyproperty
|
|
def days(self):
|
|
count = self.request.query_params.get('days', 1)
|
|
return int(count)
|
|
|
|
@property
|
|
def days_to_datetime(self):
|
|
if self.days == 1:
|
|
return local_zero_hour()
|
|
return local_now() - timezone.timedelta(days=self.days)
|
|
|
|
@lazyproperty
|
|
def date_range_list(self):
|
|
return [
|
|
(local_now() - timezone.timedelta(days=i)).date()
|
|
for i in range(self.days - 1, -1, -1)
|
|
]
|
|
|
|
def filter_by_date_range(self, queryset, field_name):
|
|
date_range_bounds = self.days_to_datetime.date(), (local_now() + timezone.timedelta(days=1)).date()
|
|
return queryset.filter(**{f'{field_name}__range': date_range_bounds})
|
|
|
|
def get_user_login_metrics(self):
|
|
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
|
|
|
|
data = defaultdict(set)
|
|
for t, username in filtered_queryset.values_list('datetime', 'username'):
|
|
date_str = str(t.date())
|
|
data[date_str].add(username)
|
|
|
|
metrics = [len(v) for __, v in data]
|
|
return metrics
|
|
|
|
def get_user_login_method_metrics(self):
|
|
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
|
|
|
|
backends = set()
|
|
data = defaultdict(lambda: defaultdict(set))
|
|
for t, username, backend in filtered_queryset.values_list('datetime', 'username', 'backend'):
|
|
backends.add(backend)
|
|
date_str = str(t.date())
|
|
data[date_str][backend].add(username)
|
|
|
|
metrics = defaultdict(list)
|
|
for backend in backends:
|
|
for date_str, usernames in data.items():
|
|
metrics[backend].append(len(usernames.get(backend, set())))
|
|
return metrics
|
|
|
|
def get_user_login_region_distribution(self):
|
|
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
|
|
|
|
data = filtered_queryset.values('city').annotate(
|
|
user_count=Count('username', distinct=True)
|
|
).order_by('-user_count')
|
|
return list(data)
|
|
|
|
@lazyproperty
|
|
def user_login_log_queryset(self):
|
|
queryset = UserLoginLog.objects.filter(status=LoginStatusChoices.success)
|
|
return UserLoginLog.filter_login_queryset_by_org(queryset)
|
|
|
|
def get(self, request, *args, **kwargs):
|
|
query_params = self.request.query_params
|
|
data = {}
|
|
|
|
_all = query_params.get('all')
|
|
|
|
if _all or query_params.get('user_login_log_metrics'):
|
|
metrics = self.get_user_login_metrics()
|
|
data.update({
|
|
'dates_metrics_date': [date.strftime('%m-%d') for date in self.date_range_list] or ['0'],
|
|
'dates_metrics_total': metrics,
|
|
})
|
|
|
|
if _all or query_params.get('user_login_method_metrics'):
|
|
metrics = self.get_user_login_method_metrics()
|
|
data.update({
|
|
'dates_metrics_date': [date.strftime('%m-%d') for date in self.date_range_list] or ['0'],
|
|
'dates_metrics_total': metrics,
|
|
})
|
|
|
|
if _all or query_params.get('user_login_region_distribution'):
|
|
data.update({
|
|
'user_login_region_distribution': self.get_user_login_region_distribution(),
|
|
})
|
|
|
|
return JsonResponse(data, status=200)
|