mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			114 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			114 lines
		
	
	
		
			4.2 KiB
		
	
	
	
		
			Python
		
	
	
from django.conf import settings
 | 
						|
from django.db.models import Q
 | 
						|
from django.utils import timezone
 | 
						|
from django.utils.translation import gettext as _
 | 
						|
from django_filters import rest_framework as filters
 | 
						|
 | 
						|
from common.drf.filters import BaseFilterSet
 | 
						|
from common.utils import is_uuid
 | 
						|
from rbac.models import Role, OrgRoleBinding, SystemRoleBinding
 | 
						|
from users.models.user import User
 | 
						|
 | 
						|
 | 
						|
class UserFilter(BaseFilterSet):
 | 
						|
    system_roles = filters.CharFilter(method='filter_system_roles')
 | 
						|
    org_roles = filters.CharFilter(method='filter_org_roles')
 | 
						|
    groups = filters.CharFilter(field_name="groups__name", lookup_expr='exact')
 | 
						|
    group_id = filters.CharFilter(field_name="groups__id", lookup_expr='exact')
 | 
						|
    exclude_group_id = filters.CharFilter(
 | 
						|
        field_name="groups__id", lookup_expr='exact', exclude=True
 | 
						|
    )
 | 
						|
    is_expired = filters.BooleanFilter(method='filter_is_expired')
 | 
						|
    is_valid = filters.BooleanFilter(method='filter_is_valid')
 | 
						|
    is_password_expired = filters.BooleanFilter(method='filter_long_time')
 | 
						|
    is_long_time_no_login = filters.BooleanFilter(method='filter_long_time')
 | 
						|
    is_login_blocked = filters.BooleanFilter(method='filter_is_blocked')
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = User
 | 
						|
        fields = (
 | 
						|
            'id', 'username', 'email', 'name',
 | 
						|
            'groups', 'group_id', 'exclude_group_id',
 | 
						|
            'source', 'org_roles', 'system_roles',
 | 
						|
            'is_active', 'is_first_login', 'mfa_level'
 | 
						|
        )
 | 
						|
 | 
						|
    def filter_is_blocked(self, queryset, name, value):
 | 
						|
        from users.utils import LoginBlockUtil
 | 
						|
        usernames = LoginBlockUtil.get_blocked_usernames()
 | 
						|
        if value:
 | 
						|
            queryset = queryset.filter(username__in=usernames)
 | 
						|
        else:
 | 
						|
            queryset = queryset.exclude(username__in=usernames)
 | 
						|
        return queryset
 | 
						|
 | 
						|
    def filter_long_time(self, queryset, name, value):
 | 
						|
        now = timezone.now()
 | 
						|
        if name == 'is_password_expired':
 | 
						|
            interval = settings.SECURITY_PASSWORD_EXPIRATION_TIME
 | 
						|
        else:
 | 
						|
            interval = 30
 | 
						|
        date_expired = now - timezone.timedelta(days=int(interval))
 | 
						|
 | 
						|
        if name == 'is_password_expired':
 | 
						|
            key = 'date_password_last_updated'
 | 
						|
        elif name == 'is_long_time_no_login':
 | 
						|
            key = 'last_login'
 | 
						|
        else:
 | 
						|
            raise ValueError('Invalid filter name')
 | 
						|
 | 
						|
        if value:
 | 
						|
            kwargs = {f'{key}__lt': date_expired}
 | 
						|
        else:
 | 
						|
            kwargs = {f'{key}__gt': date_expired}
 | 
						|
        q = Q(**kwargs) | Q(**{f'{key}__isnull': True})
 | 
						|
        return queryset.filter(q)
 | 
						|
 | 
						|
    def filter_is_valid(self, queryset, name, value):
 | 
						|
        if value:
 | 
						|
            queryset = self.filter_is_expired(queryset, name, False).filter(is_active=True)
 | 
						|
        else:
 | 
						|
            q = Q(date_expired__lt=timezone.now()) | Q(is_active=False)
 | 
						|
            queryset = queryset.filter(q)
 | 
						|
        return queryset
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def filter_is_expired(queryset, name, value):
 | 
						|
        now = timezone.now()
 | 
						|
        if value:
 | 
						|
            queryset = queryset.filter(date_expired__lt=now)
 | 
						|
        else:
 | 
						|
            queryset = queryset.filter(date_expired__gte=now)
 | 
						|
        return queryset
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def _get_role(value):
 | 
						|
        from rbac.builtin import BuiltinRole
 | 
						|
        roles = BuiltinRole.get_roles()
 | 
						|
        for role in roles.values():
 | 
						|
            if _(role.name) == value:
 | 
						|
                return role
 | 
						|
 | 
						|
        if is_uuid(value):
 | 
						|
            return Role.objects.filter(id=value).first()
 | 
						|
        else:
 | 
						|
            return Role.objects.filter(name=value).first()
 | 
						|
 | 
						|
    def _filter_roles(self, queryset, value, scope):
 | 
						|
        role = self._get_role(value)
 | 
						|
        if not role:
 | 
						|
            return queryset.none()
 | 
						|
 | 
						|
        rb_model = SystemRoleBinding if scope == Role.Scope.system.value else OrgRoleBinding
 | 
						|
        user_ids = rb_model.objects.filter(role_id=role.id).values_list('user_id', flat=True)
 | 
						|
        queryset = queryset.filter(id__in=user_ids).distinct()
 | 
						|
        return queryset
 | 
						|
 | 
						|
    def filter_system_roles(self, queryset, name, value):
 | 
						|
        queryset = self._filter_roles(queryset=queryset, value=value, scope=Role.Scope.system.value)
 | 
						|
        return queryset
 | 
						|
 | 
						|
    def filter_org_roles(self, queryset, name, value):
 | 
						|
        queryset = self._filter_roles(queryset=queryset, value=value, scope=Role.Scope.org.value)
 | 
						|
        return queryset
 |