You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
jumpserver/apps/common/utils/django.py

107 lines
2.8 KiB

# -*- coding: utf-8 -*-
#
import re
from django.conf import settings
from django.db import models
from django.db.models.signals import post_save, pre_save
from django.shortcuts import reverse as dj_reverse
from django.utils import timezone
from django.utils.http import url_has_allowed_host_and_scheme
UUID_PATTERN = re.compile(r'[0-9a-zA-Z\-]{36}')
def reverse(
view_name, urlconf=None, args=None, kwargs=None,
current_app=None, external=False, api_to_ui=False,
is_console=False, is_audit=False, is_workbench=False
):
url = dj_reverse(view_name, urlconf=urlconf, args=args,
kwargs=kwargs, current_app=current_app)
if external:
site_url = settings.SITE_URL
url = site_url.strip('/') + url
if api_to_ui:
replace_str = 'ui/#'
if is_console:
replace_str += '/console'
elif is_audit:
replace_str += '/audit'
elif is_workbench:
replace_str += '/workbench'
url = url.replace('api/v1', replace_str).rstrip('/')
return url
def get_object_or_none(model, **kwargs):
try:
obj = model.objects.get(**kwargs)
except model.DoesNotExist:
return None
return obj
def date_expired_default():
try:
years = int(settings.DEFAULT_EXPIRED_YEARS)
except TypeError:
years = 70
return timezone.now() + timezone.timedelta(days=365 * years)
def union_queryset(*args, base_queryset=None):
if len(args) == 1:
return args[0]
elif len(args) == 0:
raise ValueError("args is empty")
args = [q.order_by() for q in args]
sub_query = args[0].union(*args[1:])
queryset_id = list(sub_query.values_list('id', flat=True))
if not base_queryset:
base_queryset = args[0].model.objects
queryset = base_queryset.filter(id__in=queryset_id)
return queryset
def get_log_keep_day(s, defaults=200):
try:
days = int(getattr(settings, s))
except ValueError:
days = defaults
return days
def bulk_create_with_signal(cls: models.Model, items, **kwargs):
for i in items:
pre_save.send(sender=cls, instance=i)
result = cls.objects.bulk_create(items, **kwargs)
for i in items:
post_save.send(sender=cls, instance=i, created=True)
return result
def get_request_os(request):
"""获取请求的操作系统"""
agent = request.META.get('HTTP_USER_AGENT', '').lower()
if 'windows' in agent:
return 'windows'
elif 'mac' in agent:
return 'mac'
elif 'linux' in agent:
return 'linux'
else:
return 'unknown'
def safe_next_url(next_url, request=None):
safe_hosts = [*settings.ALLOWED_HOSTS]
if request:
safe_hosts.append(request.get_host())
if not next_url or not url_has_allowed_host_and_scheme(next_url, safe_hosts):
next_url = '/'
return next_url