jumpserver/apps/acls/serializers/base.py

123 lines
3.9 KiB
Python
Raw Normal View History

2022-12-02 03:12:14 +00:00
from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers
from acls.models.base import ActionChoices, BaseACL
from common.serializers.fields import JSONManyToManyField, LabeledChoiceField
2023-04-21 10:12:39 +00:00
from jumpserver.utils import has_valid_xpack_license
2022-12-02 03:12:14 +00:00
from orgs.models import Organization
common_help_text = _(
2023-04-20 09:49:50 +00:00
"With * indicating a match all. "
2022-12-02 03:12:14 +00:00
)
class ACLUsersSerializer(serializers.Serializer):
username_group = serializers.ListField(
default=["*"],
child=serializers.CharField(max_length=128),
label=_("Username"),
help_text=common_help_text,
)
2023-05-06 11:52:03 +00:00
class ACLAssetsSerializer(serializers.Serializer):
2022-12-02 03:12:14 +00:00
address_group_help_text = _(
2023-04-20 09:49:50 +00:00
"With * indicating a match all. "
2022-12-02 03:12:14 +00:00
"Such as: "
"192.168.10.1, 192.168.1.0/24, 10.1.1.1-10.1.1.20, 2001:db8:2de::e13, 2001:db8:1a:1110::/64"
" (Domain name support)"
)
name_group = serializers.ListField(
default=["*"],
child=serializers.CharField(max_length=128),
label=_("Name"),
help_text=common_help_text,
)
address_group = serializers.ListField(
default=["*"],
child=serializers.CharField(max_length=1024),
label=_("IP/Host"),
help_text=address_group_help_text,
)
class ACLAccountsSerializer(serializers.Serializer):
username_group = serializers.ListField(
default=["*"],
child=serializers.CharField(max_length=128),
label=_("Username"),
help_text=common_help_text,
)
2023-04-21 10:12:39 +00:00
class ActionAclSerializer(serializers.Serializer):
action = LabeledChoiceField(
choices=ActionChoices.choices, default=ActionChoices.reject, label=_("Action")
)
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.set_action_choices()
def set_action_choices(self):
action = self.fields.get("action")
if not action:
return
choices = action.choices
if not has_valid_xpack_license():
choices.pop(ActionChoices.review, None)
action._choices = choices
class BaserACLSerializer(ActionAclSerializer, serializers.Serializer):
2022-12-02 03:12:14 +00:00
class Meta:
model = BaseACL
2022-12-02 03:12:14 +00:00
fields_mini = ["id", "name"]
fields_small = fields_mini + [
"is_active", "priority", "action",
"date_created", "date_updated",
"comment", "created_by", "org_id",
2022-12-02 03:12:14 +00:00
]
fields_m2m = ["reviewers", ]
2022-12-02 03:12:14 +00:00
fields = fields_small + fields_m2m
extra_kwargs = {
"priority": {"default": 50},
"is_active": {"default": True},
}
def validate_reviewers(self, reviewers):
action = self.initial_data.get('action')
if not action and self.instance:
action = self.instance.action
if action != ActionChoices.review:
return reviewers
2022-12-02 03:12:14 +00:00
org_id = self.fields["org_id"].default()
org = Organization.get_instance(org_id)
if not org:
error = _("The organization `{}` does not exist".format(org_id))
raise serializers.ValidationError(error)
users = org.get_members()
valid_reviewers = list(set(reviewers) & set(users))
if not valid_reviewers:
error = _(
"None of the reviewers belong to Organization `{}`".format(org.name)
)
raise serializers.ValidationError(error)
return valid_reviewers
class BaserUserACLSerializer(BaserACLSerializer):
users = JSONManyToManyField(label=_('User'))
class Meta(BaserACLSerializer.Meta):
fields = BaserACLSerializer.Meta.fields + ['users']
class BaseUserAssetAccountACLSerializer(BaserUserACLSerializer):
assets = JSONManyToManyField(label=_('Asset'))
accounts = serializers.ListField(label=_('Account'))
class Meta(BaserUserACLSerializer.Meta):
fields = BaserUserACLSerializer.Meta.fields + ['assets', 'accounts']