perf: 处理 acl filter 逻辑放到 queryset 中

pull/9136/head
Bai 2 years ago
parent d524e9885d
commit 2aa1d664a6

@ -30,15 +30,14 @@ class LoginAssetCheckAPI(CreateAPIView):
return serializer
def check_confirm(self):
queries = {
'user': self.serializer.user,
'asset': self.serializer.asset,
'account_username': self.serializer.account_username,
'action': LoginAssetACL.ActionChoices.login_confirm
}
with tmp_to_org(self.serializer.asset.org):
acl = LoginAssetACL.filter(**queries).valid().first()
acl = LoginAssetACL.objects\
.filter(action=LoginAssetACL.ActionChoices.login_confirm)\
.filter_user(self.serializer.user)\
.filter_asset(self.serializer.asset)\
.filter_account(self.serializer.account_username)\
.valid()\
.first()
if acl:
need_confirm = True
response_data = self._get_response_data_of_need_confirm(acl)

@ -6,6 +6,32 @@ from .base import BaseACL, BaseACLQuerySet
from common.utils.ip import contains_ip
class ACLQuerySet(BaseACLQuerySet):
def filter_user(self, user):
return self.filter(
Q(users__username_group__contains=user.username) |
Q(users__username_group__contains='*')
)
def filter_asset(self, asset):
queryset = self.filter(
Q(assets__name_group__contains=asset.name) |
Q(assets__name_group__contains='*')
)
ids = [
q.id for q in queryset
if contains_ip(asset.address, q.assets.get('address_group', []))
]
queryset = LoginAssetACL.objects.filter(id__in=ids)
return queryset
def filter_account(self, account_username):
return self.filter(
Q(accounts__username_group__contains=account_username) |
Q(accounts__username_group__contains='*')
)
class ACLManager(OrgManager):
def valid(self):
@ -32,7 +58,7 @@ class LoginAssetACL(BaseACL, OrgModelMixin):
verbose_name=_("Reviewers")
)
objects = ACLManager.from_queryset(BaseACLQuerySet)()
objects = ACLManager.from_queryset(ACLQuerySet)()
class Meta:
unique_together = ('name', 'org_id')
@ -42,42 +68,6 @@ class LoginAssetACL(BaseACL, OrgModelMixin):
def __str__(self):
return self.name
@classmethod
def filter(cls, user, asset, account_username, action):
queryset = cls.objects.filter(action=action)
queryset = cls.filter_user(user, queryset)
queryset = cls.filter_asset(asset, queryset)
queryset = cls.filter_account(account_username, queryset)
return queryset
@classmethod
def filter_user(cls, user, queryset):
queryset = queryset.filter(
Q(users__username_group__contains=user.username) |
Q(users__username_group__contains='*')
)
return queryset
@classmethod
def filter_asset(cls, asset, queryset):
queryset = queryset.filter(
Q(assets__name_group__contains=asset.name) |
Q(assets__name_group__contains='*')
)
ids = [
q.id for q in queryset if contains_ip(asset.address, q.assets.get('address_group', []))
]
queryset = cls.objects.filter(id__in=ids)
return queryset
@classmethod
def filter_account(cls, account_username, queryset):
queryset = queryset.filter(
Q(accounts__username_group__contains=account_username) |
Q(accounts__username_group__contains='*')
)
return queryset
@classmethod
def create_login_asset_confirm_ticket(cls, user, asset, account_username, assignees, org_id):
from tickets.const import TicketType

@ -66,7 +66,7 @@ def contains_ip(ip, ip_group):
if in_ip_segment(ip, _ip):
return True
else:
# is domain name
# address / host
if ip == _ip:
return True

Loading…
Cancel
Save