jumpserver/apps/reports/api/users/user.py

115 lines
4.0 KiB
Python

# -*- coding: utf-8 -*-
#
from collections import defaultdict
from django.db.models import Count
from django.http.response import JsonResponse
from django.utils import timezone
from rest_framework.views import APIView
from audits.const import LoginStatusChoices
from audits.models import UserLoginLog
from common.permissions import IsValidLicense
from common.utils import lazyproperty
from common.utils.timezone import local_zero_hour, local_now
from rbac.permissions import RBACPermission
__all__ = ['UserReportApi']
class UserReportApi(APIView):
http_method_names = ['get']
# TODO: Define the required RBAC permissions for this API
rbac_perms = {
}
permission_classes = [RBACPermission, IsValidLicense]
@lazyproperty
def days(self):
count = self.request.query_params.get('days', 1)
return int(count)
@property
def days_to_datetime(self):
if self.days == 1:
return local_zero_hour()
return local_now() - timezone.timedelta(days=self.days)
@lazyproperty
def date_range_list(self):
return [
(local_now() - timezone.timedelta(days=i)).date()
for i in range(self.days - 1, -1, -1)
]
def filter_by_date_range(self, queryset, field_name):
date_range_bounds = self.days_to_datetime.date(), (local_now() + timezone.timedelta(days=1)).date()
return queryset.filter(**{f'{field_name}__range': date_range_bounds})
def get_user_login_metrics(self):
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
data = defaultdict(set)
for t, username in filtered_queryset.values_list('datetime', 'username'):
date_str = str(t.date())
data[date_str].add(username)
metrics = [len(v) for __, v in data]
return metrics
def get_user_login_method_metrics(self):
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
backends = set()
data = defaultdict(lambda: defaultdict(set))
for t, username, backend in filtered_queryset.values_list('datetime', 'username', 'backend'):
backends.add(backend)
date_str = str(t.date())
data[date_str][backend].add(username)
metrics = defaultdict(list)
for backend in backends:
for date_str, usernames in data.items():
metrics[backend].append(len(usernames.get(backend, set())))
return metrics
def get_user_login_region_distribution(self):
filtered_queryset = self.filter_by_date_range(self.user_login_log_queryset, 'datetime')
data = filtered_queryset.values('city').annotate(
user_count=Count('username', distinct=True)
).order_by('-user_count')
return list(data)
@lazyproperty
def user_login_log_queryset(self):
queryset = UserLoginLog.objects.filter(status=LoginStatusChoices.success)
return UserLoginLog.filter_login_queryset_by_org(queryset)
def get(self, request, *args, **kwargs):
query_params = self.request.query_params
data = {}
_all = query_params.get('all')
if _all or query_params.get('user_login_log_metrics'):
metrics = self.get_user_login_metrics()
data.update({
'dates_metrics_date': [date.strftime('%m-%d') for date in self.date_range_list] or ['0'],
'dates_metrics_total': metrics,
})
if _all or query_params.get('user_login_method_metrics'):
metrics = self.get_user_login_method_metrics()
data.update({
'dates_metrics_date': [date.strftime('%m-%d') for date in self.date_range_list] or ['0'],
'dates_metrics_total': metrics,
})
if _all or query_params.get('user_login_region_distribution'):
data.update({
'user_login_region_distribution': self.get_user_login_region_distribution(),
})
return JsonResponse(data, status=200)