2022-02-17 12:13:31 +00:00
from django.utils.translation import gettext_lazy as _
from django.db import models
from django.db.models import Q
2022-04-21 14:37:29 +00:00
from django.conf import settings
2022-03-21 08:40:14 +00:00
from django.core.exceptions import ValidationError
2022-02-17 12:13:31 +00:00
from rest_framework.serializers import ValidationError
from common.db.models import JMSModel
from common.utils import lazyproperty
2022-04-20 03:19:37 +00:00
from orgs.utils import current_org, tmp_to_root_org
2022-02-17 12:13:31 +00:00
from .role import Role
from ..const import Scope
__all__ = ['RoleBinding', 'SystemRoleBinding', 'OrgRoleBinding']
class RoleBindingManager(models.Manager):
def get_queryset(self):
queryset = super(RoleBindingManager, self).get_queryset()
2022-03-16 07:59:52 +00:00
q = Q(scope=Scope.system, org__isnull=True)
2022-02-17 12:13:31 +00:00
if not current_org.is_root():
2022-03-16 04:01:51 +00:00
q |= Q(org_id=current_org.id, scope=Scope.org)
2022-02-17 12:13:31 +00:00
queryset = queryset.filter(q)
return queryset
2022-03-16 07:59:52 +00:00
def root_all(self):
queryset = super().get_queryset()
if current_org.is_root():
return queryset
return self.get_queryset()
2022-02-17 12:13:31 +00:00
class RoleBinding(JMSModel):
Scope = Scope
""" 定义 用户-角色 关系 """
scope = models.CharField(
max_length=128, choices=Scope.choices, default=Scope.system,
user = models.ForeignKey(
'users.User', related_name='role_bindings', on_delete=models.CASCADE, verbose_name=_('User')
role = models.ForeignKey(
Role, related_name='role_bindings', on_delete=models.CASCADE, verbose_name=_('Role')
org = models.ForeignKey(
'orgs.Organization', related_name='role_bindings', blank=True, null=True,
on_delete=models.CASCADE, verbose_name=_('Organization')
objects = RoleBindingManager()
class Meta:
verbose_name = _('Role binding')
unique_together = [
('user', 'role', 'org'),
def __str__(self):
display = '{user} & {role}'.format(user=self.user, role=self.role)
if self.org:
display += ' | {org}'.format(org=self.org)
return display
2022-03-16 07:59:52 +00:00
def org_name(self):
if self.org:
return self.org.name
return ''
2022-02-17 12:13:31 +00:00
def save(self, *args, **kwargs):
self.scope = self.role.scope
2022-03-21 08:40:14 +00:00
2022-02-17 12:13:31 +00:00
return super().save(*args, **kwargs)
def get_user_perms(cls, user):
roles = cls.get_user_roles(user)
return Role.get_roles_perms(roles)
def get_role_users(cls, role):
from users.models import User
2022-03-16 07:59:52 +00:00
bindings = cls.objects.root_all().filter(role=role, scope=role.scope)
2022-02-17 12:13:31 +00:00
user_ids = bindings.values_list('user', flat=True).distinct()
return User.objects.filter(id__in=user_ids)
def get_user_roles(cls, user):
bindings = cls.objects.filter(user=user)
roles_id = bindings.values_list('role', flat=True).distinct()
return Role.objects.filter(id__in=roles_id)
def user_display(self):
return self.user.name
def role_display(self):
return self.role.display_name
2022-03-21 08:40:14 +00:00
def is_scope_org(self):
return self.scope == Scope.org
2022-04-18 09:17:23 +00:00
def get_user_has_the_perm_orgs(cls, perm, user):
from orgs.models import Organization
roles = Role.get_roles_by_perm(perm)
2022-04-20 03:19:37 +00:00
with tmp_to_root_org():
bindings = list(cls.objects.root_all().filter(role__in=roles, user=user))
2022-04-18 09:17:23 +00:00
2022-04-20 10:50:53 +00:00
system_bindings = [b for b in bindings if b.scope == Role.Scope.system.value]
# 工作台仅限于自己加入的组织
2022-04-18 09:17:23 +00:00
if perm == 'rbac.view_workbench':
2022-04-25 03:38:15 +00:00
all_orgs = user.orgs.all().distinct()
2022-04-18 09:17:23 +00:00
all_orgs = Organization.objects.all()
2022-04-21 14:37:29 +00:00
if not settings.XPACK_ENABLED:
all_orgs = all_orgs.filter(id=Organization.DEFAULT_ID)
2022-04-20 10:50:53 +00:00
# 有系统级别的绑定,就代表在所有组织有这个权限
2022-04-18 09:17:23 +00:00
if system_bindings:
orgs = all_orgs
org_ids = [b.org.id for b in bindings if b.org]
orgs = all_orgs.filter(id__in=org_ids)
2022-04-20 10:50:53 +00:00
# 全局组织
2022-07-07 08:21:39 +00:00
if orgs and user.has_perm('orgs.view_rootorg'):
2022-04-18 09:17:23 +00:00
orgs = [Organization.root(), *list(orgs)]
return orgs
2022-02-17 12:13:31 +00:00
2022-03-16 07:59:52 +00:00
class OrgRoleBindingManager(RoleBindingManager):
2022-02-17 12:13:31 +00:00
def get_queryset(self):
2022-03-16 07:59:52 +00:00
queryset = super(RoleBindingManager, self).get_queryset()
2022-02-17 12:13:31 +00:00
if current_org.is_root():
2022-03-16 07:59:52 +00:00
queryset = queryset.none()
2022-03-04 02:16:21 +00:00
2022-03-16 07:59:52 +00:00
queryset = queryset.filter(org_id=current_org.id, scope=Scope.org)
2022-02-17 12:13:31 +00:00
return queryset
class OrgRoleBinding(RoleBinding):
objects = OrgRoleBindingManager()
def save(self, *args, **kwargs):
self.org_id = current_org.id
self.scope = Scope.org
return super().save(*args, **kwargs)
def delete(self, **kwargs):
has_other_role = self.__class__.objects \
.filter(user=self.user, scope=self.scope) \
.exclude(id=self.id) \
if not has_other_role:
error = _('User last role in org, can not be delete, '
'you can remove user from org instead')
raise ValidationError({'error': error})
return super().delete(**kwargs)
class Meta:
proxy = True
verbose_name = _('Organization role binding')
2022-03-16 07:59:52 +00:00
class SystemRoleBindingManager(RoleBindingManager):
2022-02-17 12:13:31 +00:00
def get_queryset(self):
2022-03-16 07:59:52 +00:00
queryset = super(RoleBindingManager, self).get_queryset()\
2022-02-17 12:13:31 +00:00
return queryset
class SystemRoleBinding(RoleBinding):
objects = SystemRoleBindingManager()
class Meta:
proxy = True
verbose_name = _('System role binding')
def save(self, *args, **kwargs):
self.scope = Scope.system
return super().save(*args, **kwargs)
2022-03-21 08:40:14 +00:00
def clean(self):
kwargs = dict(role=self.role, user=self.user, scope=self.scope)
exists = self.__class__.objects.filter(**kwargs).exists()
if exists:
msg = "Duplicate for key 'role_user' of system role binding, {}_{}".format(
self.role.id, self.user.id
raise ValidationError(msg)