diff --git a/apps/acls/api/login_asset_check.py b/apps/acls/api/login_asset_check.py index f272dd1d4..3c157a1cc 100644 --- a/apps/acls/api/login_asset_check.py +++ b/apps/acls/api/login_asset_check.py @@ -1,6 +1,7 @@ from rest_framework.generics import CreateAPIView from rest_framework.response import Response +from common.db.fields import JSONManyToManyField from common.utils import reverse, lazyproperty from orgs.utils import tmp_to_org from .. import serializers @@ -30,14 +31,20 @@ class LoginAssetCheckAPI(CreateAPIView): return serializer def check_review(self): + user = self.serializer.user + asset = self.serializer.asset + + # 用户满足的 acls + queryset = LoginAssetACL.objects.all() + q = JSONManyToManyField.get_filter_q(LoginAssetACL, 'users', user) + queryset = queryset.filter(q) + q = JSONManyToManyField.get_filter_q(LoginAssetACL, 'assets', asset) + queryset = queryset.filter(q) + account_username = self.serializer.validated_data.get('account_username') + queryset = queryset.filter(accounts__contains=account_username) + with tmp_to_org(self.serializer.asset.org): - kwargs = { - 'user': self.serializer.user, - 'asset': self.serializer.asset, - 'account_username': self.serializer.validated_data.get('account_username'), - 'action': LoginAssetACL.ActionChoices.review - } - acl = LoginAssetACL.objects.filter(**kwargs).valid().first() + acl = queryset.order_by('priority').valid().first() if acl: need_review = True diff --git a/apps/acls/migrations/0012_auto_20230426_1111.py b/apps/acls/migrations/0012_auto_20230426_1111.py index 23984ac30..277905fcd 100644 --- a/apps/acls/migrations/0012_auto_20230426_1111.py +++ b/apps/acls/migrations/0012_auto_20230426_1111.py @@ -21,7 +21,7 @@ def migrate_base_acl_users_assets_accounts(apps, *args): asset_attrs.append({"name": "name", "value": asset_names, "match": "in"}) asset_address = (obj.assets or {}).get('address_group', []) if asset_address: - asset_attrs.append({"name": "address", "value": asset_address, "match": "ip_in", "rel": "or"}) + asset_attrs.append({"name": "address", "value": asset_address, "match": "ip_in"}) obj.new_assets = {"type": "attrs", "attrs": asset_attrs} account_usernames = (obj.accounts or {}).get('username_group', []) diff --git a/apps/common/db/fields.py b/apps/common/db/fields.py index 027567e4e..70c14c4f2 100644 --- a/apps/common/db/fields.py +++ b/apps/common/db/fields.py @@ -3,6 +3,7 @@ import ipaddress import json +import re from django.apps import apps from django.core.exceptions import ValidationError @@ -344,10 +345,6 @@ class RelatedManager: if name is None or val is None: continue - if val == '*': - filters = Q() - break - if match == 'ip_in': q = self.get_ip_in_q(name, val) elif match in ("exact", "contains", "startswith", "endswith", "regex"): @@ -362,7 +359,10 @@ class RelatedManager: else: q = Q() else: - q = Q(**{name: val}) + if val == '*': + q = Q() + else: + q = Q(**{name: val}) if rel == 'or': filters |= q @@ -415,6 +415,59 @@ class JSONManyToManyDescriptor: value = value.value manager.set(value) + def test_is(self): + print("Self.field is", self.field) + print("Self.field to", self.field.to) + print("Self.field model", self.field.model) + print("Self.field column", self.field.column) + print("Self.field to", self.field.__dict__) + + @staticmethod + def attr_to_regex(attr): + """将属性规则转换为正则表达式""" + name, value, match = attr['name'], attr['value'], attr['match'] + if match == 'contains': + return r'.*{}.*'.format(escape_regex(value)) + elif match == 'startswith': + return r'^{}.*'.format(escape_regex(value)) + elif match == 'endswith': + return r'.*{}$'.format(escape_regex(value)) + elif match == 'regex': + return value + elif match == 'not': + return r'^(?!^{}$)'.format(escape_regex(value)) + elif match == 'in': + values = '|'.join(map(escape_regex, value)) + return r'^(?:{})$'.format(values) + else: + return r'^{}$'.format(escape_regex(value)) + + def is_match(self, attr_dict, attr_rules): + for rule in attr_rules: + value = attr_dict.get(rule['name'], '') + regex = self.attr_to_regex(rule) + if not re.match(regex, value): + return False + return True + + def get_filter_q(self, instance): + model_cls = self.field.model + field_name = self.field.column + q = Q(users__type='all') | Q(users__type='ids', users__ids__contains=[str(instance.id)]) + queryset_id_attrs = model_cls.objects \ + .filter(**{'{}__type'.format(field_name): 'attrs'}) \ + .values_list('id', '{}__attrs'.format(field_name)) + instance_attr = {k: v for k, v in instance.__dict__.items() if not k.startswith('_')} + ids = [str(_id) for _id, attr_rules in queryset_id_attrs if self.is_match(instance_attr, attr_rules)] + if ids: + q |= Q(id__in=ids) + return q + + +def escape_regex(s): + """转义字符串中的正则表达式特殊字符""" + return re.sub('[.*+?^${}()|[\\]]', r'\\\g<0>', s) + class JSONManyToManyField(models.JSONField): def __init__(self, to, *args, **kwargs): @@ -455,18 +508,15 @@ class JSONManyToManyField(models.JSONField): if 'name' not in attr or 'value' not in attr: raise ValueError(_("Invalid attrs, should be has name and value")) - def get_db_prep_value(self, value, connection, prepared=False): - return self.get_prep_value(value) - def get_prep_value(self, value): if value is None: return None if isinstance(value, RelatedManager): value = value.value - self.check_value(value) return json.dumps(value) def validate(self, value, model_instance): super().validate(value, model_instance) if not isinstance(value, dict): raise ValidationError("Invalid JSON data for JSONManyToManyField.") + self.check_value(value)