django-vue-admin/backend/dvadmin/system/views/datav.py

267 lines
11 KiB
Python

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2023/4/14 15:49
# @Author : harry
import datetime
import json
import re
import time
from django.db.models import Count, Sum, Q
from django.db.models.functions import TruncMonth, TruncDay
from django.utils import timezone
from rest_framework.decorators import action
from rest_framework.permissions import IsAuthenticated
from rest_framework.viewsets import GenericViewSet
from conf.env import DATABASE_USER, DATABASE_NAME
from dvadmin.system.models import Users, LoginLog, FileList
from dvadmin.system.views.login_log import LoginLogSerializer
from dvadmin.utils.json_response import DetailResponse
from django.db import connection
from django.utils.timezone import now
from django.db.models import Count
from django.db.models.functions import TruncDate
from dvadmin.utils.string_util import format_bytes
def jx_timestamp():
cur_time = datetime.datetime.now()
a = datetime.datetime.strftime(cur_time, '%Y-%m-%d %H:%M:%S')
timeStamp = int(time.mktime(time.strptime(a, "%Y-%m-%d %H:%M:%S")))
timeArray = time.localtime(timeStamp)
otherStyleTime = time.strftime("%Y-%m-%d %H:%M:%S", timeArray)
return otherStyleTime
class DataVViewSet(GenericViewSet):
queryset = LoginLog.objects.all()
serializer_class = LoginLogSerializer
extra_filter_backends = []
ordering_fields = ['create_datetime']
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def users_login_total(self, request):
"""
用户登录总数数据
:param request:
:return:
"""
login_total = LoginLog.objects.all().count()
return DetailResponse(data={"login_total": login_total}, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def users_total(self, request):
"""
用户总数
:param request:
:return:
"""
users_total = Users.objects.all().count()
return DetailResponse(data={"users_total": users_total, }, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def attachment_total(self, request):
"""
附件统计数据
:param request:
:return:
"""
count = FileList.objects.all().count()
if count != 0:
data = FileList.objects.aggregate(sum_size=Sum('size'))
else:
data = {"sum_size": 0}
return DetailResponse(data={"count": count, "occupy_space": format_bytes(data.get('sum_size') or 0)},
msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def database_total(self, request):
"""
数据库统计数据
:param request:
:return:
"""
count = len(connection.introspection.table_names())
database_type = connection.settings_dict['ENGINE']
sql = None
if 'mysql' in database_type:
sql = "SELECT SUM(data_length + index_length) AS size FROM information_schema.TABLES WHERE table_schema = DATABASE()"
elif 'postgres' in database_type or 'psqlextra' in database_type:
sql = """SELECT SUM(pg_total_relation_size(quote_ident(schemaname) || '.' || quote_ident(tablename))) AS size FROM pg_tables WHERE schemaname = current_schema();"""
elif 'oracle' in database_type:
sql = "SELECT SUM(bytes) AS size FROM user_segments"
elif 'microsoft' in database_type:
sql = "SELECT SUM(size) * 8 AS size FROM sys.database_files"
else:
space = 0
if sql:
with connection.cursor() as cursor:
try:
cursor.execute(sql)
result = cursor.fetchone()
space = result[0]
except Exception as e:
print(e)
space = '无权限'
return DetailResponse(data={"count": count, "space": format_bytes(space or 0)}, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def registered_user(self, request):
"""
用户注册趋势
:param request:
:return:
"""
today = datetime.datetime.today()
seven_days_ago = today - datetime.timedelta(days=30)
users = Users.objects.filter(date_joined__gte=seven_days_ago).annotate(day=TruncDay('date_joined')).values(
'day').annotate(count=Count('id'))
result = []
for i in range(30):
date = (today - datetime.timedelta(days=i)).strftime('%Y-%m-%d')
count = 0
for user in users:
if user['day'] == date:
count = user['count']
break
result.append({'day': date, 'count': count})
# users_last_month = Users.objects.filter(date_joined__gte=last_month).annotate(day=TruncDate('date_joined')).values('day').annotate(count=Count('id'))
return DetailResponse(data={"registered_user_list": result}, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def registered_user(self, request):
"""
用户注册趋势
:param request:
:return:
"""
day = 30
today = datetime.datetime.today()
seven_days_ago = today - datetime.timedelta(days=day)
users = Users.objects.filter(create_datetime__gte=seven_days_ago).annotate(
day=TruncDay('create_datetime')).values(
'day').annotate(count=Count('id')).order_by('-day')
result = []
data_dict = {ele.get('day').strftime('%Y-%m-%d'): ele.get('count') for ele in users}
for i in range(day):
date = (today - datetime.timedelta(days=i)).strftime('%Y-%m-%d')
result.append({'day': date, 'count': data_dict[date] if date in data_dict else 0})
result = sorted(result, key=lambda x: x['day'])
return DetailResponse(data={"registered_user_list": result}, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def login_user(self, request):
"""
用户登录趋势
:param request:
:return:
"""
day = 30
today = datetime.datetime.today()
seven_days_ago = today - datetime.timedelta(days=day)
users = LoginLog.objects.filter(create_datetime__gte=seven_days_ago).annotate(
day=TruncDay('create_datetime')).values(
'day').annotate(count=Count('id')).order_by('-day')
result = []
data_dict = {ele.get('day').strftime('%Y-%m-%d'): ele.get('count') for ele in users}
for i in range(day):
date = (today - datetime.timedelta(days=i)).strftime('%Y-%m-%d')
result.append({'day': date, 'count': data_dict[date] if date in data_dict else 0})
result = sorted(result, key=lambda x: x['day'])
return DetailResponse(data={"login_user": result}, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def users_active(self, request):
"""
用户新增活跃数据统计
:param request:
:return:
"""
today = datetime.date.today()
seven_days_ago = today - datetime.timedelta(days=6)
thirty_days_ago = today - datetime.timedelta(days=29)
today_users = Users.objects.filter(date_joined__date=today).count()
today_logins = Users.objects.filter(last_login__date=today).count()
three_days_users = Users.objects.filter(date_joined__gte=seven_days_ago).count()
seven_days_users = Users.objects.filter(date_joined__gte=thirty_days_ago).count()
seven_days_active = Users.objects.filter(last_login__gte=seven_days_ago).values('last_login').annotate(
count=Count('id', distinct=True)).count()
monthly_active = Users.objects.filter(last_login__gte=thirty_days_ago).values('last_login').annotate(
count=Count('id', distinct=True)).count()
data = {
'today_users': today_users,
'today_logins': today_logins,
'three_days': three_days_users,
'seven_days': seven_days_users,
'seven_days_active': seven_days_active,
'monthly_active': monthly_active
}
return DetailResponse(data=data, msg="获取成功")
@action(methods=["GET"], detail=False, permission_classes=[IsAuthenticated])
def login_region(self, request):
"""
登录用户区域分布
:param request:
:return:
"""
CHINA_PROVINCES = [
{'name': '北京', 'code': '110000'},
{'name': '天津', 'code': '120000'},
{'name': '河北', 'code': '130000'},
{'name': '山西', 'code': '140000'},
{'name': '内蒙古', 'code': '150000'},
{'name': '辽宁', 'code': '210000'},
{'name': '吉林', 'code': '220000'},
{'name': '黑龙江', 'code': '230000'},
{'name': '上海', 'code': '310000'},
{'name': '江苏', 'code': '320000'},
{'name': '浙江', 'code': '330000'},
{'name': '安徽', 'code': '340000'},
{'name': '福建', 'code': '350000'},
{'name': '江西', 'code': '360000'},
{'name': '山东', 'code': '370000'},
{'name': '河南', 'code': '410000'},
{'name': '湖北', 'code': '420000'},
{'name': '湖南', 'code': '430000'},
{'name': '广东', 'code': '440000'},
{'name': '广西', 'code': '450000'},
{'name': '海南', 'code': '460000'},
{'name': '重庆', 'code': '500000'},
{'name': '四川', 'code': '510000'},
{'name': '贵州', 'code': '520000'},
{'name': '云南', 'code': '530000'},
{'name': '西藏', 'code': '540000'},
{'name': '陕西', 'code': '610000'},
{'name': '甘肃', 'code': '620000'},
{'name': '青海', 'code': '630000'},
{'name': '宁夏', 'code': '640000'},
{'name': '新疆', 'code': '650000'},
{'name': '台湾', 'code': '710000'},
{'name': '香港', 'code': '810000'},
{'name': '澳门', 'code': '820000'},
{'name': '未知区域', 'code': '000000'},
]
provinces = [x['name'] for x in CHINA_PROVINCES]
day = 30
today = datetime.datetime.today()
seven_days_ago = today - datetime.timedelta(days=day)
province_data = LoginLog.objects.filter(create_datetime__gte=seven_days_ago).values('province').annotate(
count=Count('id')).order_by('-count')
province_dict = {p: 0 for p in provinces}
for ele in province_data:
if ele.get('province') in province_dict:
province_dict[ele.get('province')] += ele.get('count')
else:
province_dict['未知区域'] += ele.get('count')
data = [{'region': key, 'count': val} for key, val in province_dict.items()]
data = sorted(data, key=lambda x: x['count'], reverse=True)
return DetailResponse(data=data, msg="获取成功")