You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 

146 lines
5.2 KiB

from rest_framework import permissions, exceptions
from common.utils import get_logger
logger = get_logger(__name__)
class RBACPermission(permissions.DjangoModelPermissions):
default_rbac_perms_tmpl = (
('list', '%(app_label)s.view_%(model_name)s'),
('retrieve', '%(app_label)s.view_%(model_name)s'),
('create', '%(app_label)s.add_%(model_name)s'),
('update', '%(app_label)s.change_%(model_name)s'),
('partial_update', '%(app_label)s.change_%(model_name)s'),
('destroy', '%(app_label)s.delete_%(model_name)s'),
('bulk_update', '%(app_label)s.change_%(model_name)s'),
('partial_bulk_update', '%(app_label)s.change_%(model_name)s'),
('bulk_destroy', '%(app_label)s.delete_%(model_name)s'),
('render_to_json', '%(app_label)s.add_%(model_name)s'),
('metadata', '*'),
('GET', '%(app_label)s.view_%(model_name)s'),
('OPTIONS', '*'),
('HEAD', '%(app_label)s.view_%(model_name)s'),
('POST', '%(app_label)s.add_%(model_name)s'),
('PUT', '%(app_label)s.change_%(model_name)s'),
('PATCH', '%(app_label)s.change_%(model_name)s'),
('DELETE', '%(app_label)s.delete_%(model_name)s'),
)
# rbac_perms = ((), ())
# def get_rbac_perms():
# return {}
@staticmethod
def format_perms(perms_tmpl, model_cls):
perms = set()
if not perms_tmpl:
return perms
if isinstance(perms_tmpl, str):
perms_tmpl = [perms_tmpl]
for perm_tmpl in perms_tmpl:
perm = perm_tmpl % {
'app_label': model_cls._meta.app_label,
'model_name': model_cls._meta.model_name
}
perms.add(perm)
return perms
@classmethod
def parse_action_model_perms(cls, action, model_cls):
perm_tmpl = dict(cls.default_rbac_perms_tmpl).get(action)
return cls.format_perms(perm_tmpl, model_cls)
def get_default_action_perms(self, model_cls):
if model_cls is None:
return {}
perms = {}
for action, tmpl in dict(self.default_rbac_perms_tmpl).items():
perms[action] = self.format_perms(tmpl, model_cls)
return perms
def get_rbac_perms(self, view, model_cls) -> dict:
if hasattr(view, 'get_rbac_perms'):
return dict(view.get_rbac_perms())
perms = {}
if hasattr(view, 'rbac_perms'):
perms.update(dict(view.rbac_perms))
default_perms = self.get_default_action_perms(model_cls)
if '*' not in perms:
for k, v in default_perms.items():
perms.setdefault(k, v)
return perms
def _get_action_perms(self, action, model_cls, view):
action_perms_map = self.get_rbac_perms(view, model_cls)
if action in action_perms_map:
perms = action_perms_map[action]
elif '*' in action_perms_map:
perms = action_perms_map['*']
else:
msg = 'Action not allowed: {}, only `{}` supported'.format(
action, ','.join(list(action_perms_map.keys()))
)
logger.error(msg)
raise exceptions.PermissionDenied(msg)
return perms
def get_model_cls(self, view):
if hasattr(view, 'perm_model'):
return getattr(view, 'perm_model')
try:
queryset = self._queryset(view)
if isinstance(queryset, list) and queryset:
model_cls = queryset[0].__class__
else:
model_cls = queryset.model
except AssertionError as e:
# logger.error(f'Error get model cls: {e}')
model_cls = None
except AttributeError as e:
# logger.error(f'Error get model cls: {e}')
model_cls = None
except Exception as e:
# logger.error('Error get model class: {} of {}'.format(e, view))
raise e
return model_cls
def get_require_perms(self, request, view):
"""
获取 request, view 需要的 perms
:param request:
:param view:
:return:
"""
model_cls = self.get_model_cls(view)
action = getattr(view, 'action', None)
if not action:
action = request.method
perms = self._get_action_perms(action, model_cls, view)
return perms
def has_permission(self, request, view):
# Workaround to ensure DjangoModelPermissions are not applied
# to the root view when using DefaultRouter.
if getattr(view, '_ignore_rbac_permissions', False):
return True
if not request.user:
return False
if request.user.is_anonymous and self.authenticated_users_only:
return False
raw_action = getattr(view, 'raw_action', request.method)
if raw_action in ['metadata', 'OPTIONS']:
return True
perms = self.get_require_perms(request, view)
if isinstance(perms, str):
perms = [perms]
has = request.user.has_perms(perms)
logger.debug('Api require perms: {}, result: {}'.format(perms, has))
return has
def has_object_permission(self, request, view, obj):
return self.has_permission(request, view)