mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			147 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			147 lines
		
	
	
		
			5.2 KiB
		
	
	
	
		
			Python
		
	
	
from rest_framework import permissions, exceptions
 | 
						|
 | 
						|
from common.utils import get_logger
 | 
						|
 | 
						|
logger = get_logger(__name__)
 | 
						|
 | 
						|
 | 
						|
class RBACPermission(permissions.DjangoModelPermissions):
 | 
						|
    default_rbac_perms_tmpl = (
 | 
						|
        ('list', '%(app_label)s.view_%(model_name)s'),
 | 
						|
        ('retrieve', '%(app_label)s.view_%(model_name)s'),
 | 
						|
        ('create', '%(app_label)s.add_%(model_name)s'),
 | 
						|
        ('update', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('partial_update', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('destroy', '%(app_label)s.delete_%(model_name)s'),
 | 
						|
        ('bulk_update', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('partial_bulk_update', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('bulk_destroy', '%(app_label)s.delete_%(model_name)s'),
 | 
						|
        ('render_to_json', 'none'),
 | 
						|
        ('metadata', 'none'),
 | 
						|
        ('GET', '%(app_label)s.view_%(model_name)s'),
 | 
						|
        ('OPTIONS', 'none'),
 | 
						|
        ('HEAD', '%(app_label)s.view_%(model_name)s'),
 | 
						|
        ('POST', '%(app_label)s.add_%(model_name)s'),
 | 
						|
        ('PUT', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('PATCH', '%(app_label)s.change_%(model_name)s'),
 | 
						|
        ('DELETE', '%(app_label)s.delete_%(model_name)s'),
 | 
						|
    )
 | 
						|
 | 
						|
    # rbac_perms = ((), ())
 | 
						|
    # def get_rbac_perms():
 | 
						|
    #     return {}
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def format_perms(perms_tmpl, model_cls):
 | 
						|
        perms = set()
 | 
						|
        if not perms_tmpl:
 | 
						|
            return perms
 | 
						|
        if isinstance(perms_tmpl, str):
 | 
						|
            perms_tmpl = [perms_tmpl]
 | 
						|
        for perm_tmpl in perms_tmpl:
 | 
						|
            perm = perm_tmpl % {
 | 
						|
                'app_label': model_cls._meta.app_label,
 | 
						|
                'model_name': model_cls._meta.model_name
 | 
						|
            }
 | 
						|
            perms.add(perm)
 | 
						|
        return perms
 | 
						|
 | 
						|
    @classmethod
 | 
						|
    def parse_action_model_perms(cls, action, model_cls):
 | 
						|
        perm_tmpl = dict(cls.default_rbac_perms_tmpl).get(action)
 | 
						|
        return cls.format_perms(perm_tmpl, model_cls)
 | 
						|
 | 
						|
    def get_default_action_perms(self, model_cls):
 | 
						|
        if model_cls is None or not hasattr(model_cls, '_meta'):
 | 
						|
            return {}
 | 
						|
 | 
						|
        perms = {}
 | 
						|
        for action, tmpl in dict(self.default_rbac_perms_tmpl).items():
 | 
						|
            perms[action] = self.format_perms(tmpl, model_cls)
 | 
						|
        return perms
 | 
						|
 | 
						|
    def get_rbac_perms(self, view, model_cls) -> dict:
 | 
						|
        if hasattr(view, 'get_rbac_perms'):
 | 
						|
            return dict(view.get_rbac_perms())
 | 
						|
        perms = {}
 | 
						|
        if hasattr(view, 'rbac_perms'):
 | 
						|
            perms.update(dict(view.rbac_perms))
 | 
						|
        default_perms = self.get_default_action_perms(model_cls)
 | 
						|
        if '*' not in perms:
 | 
						|
            for k, v in default_perms.items():
 | 
						|
                perms.setdefault(k, v)
 | 
						|
        return perms
 | 
						|
 | 
						|
    def _get_action_perms(self, action, model_cls, view):
 | 
						|
        action_perms_map = self.get_rbac_perms(view, model_cls)
 | 
						|
        if action in action_perms_map:
 | 
						|
            perms = action_perms_map[action]
 | 
						|
        elif '*' in action_perms_map:
 | 
						|
            perms = action_perms_map['*']
 | 
						|
        else:
 | 
						|
            msg = 'Action not allowed: {}, only `{}` supported'.format(
 | 
						|
                action, ','.join(list(action_perms_map.keys()))
 | 
						|
            )
 | 
						|
            logger.error(msg)
 | 
						|
            raise exceptions.PermissionDenied(msg)
 | 
						|
        return perms
 | 
						|
 | 
						|
    def get_model_cls(self, view):
 | 
						|
        if hasattr(view, 'perm_model'):
 | 
						|
            return getattr(view, 'perm_model')
 | 
						|
 | 
						|
        try:
 | 
						|
            queryset = self._queryset(view)
 | 
						|
            if isinstance(queryset, list) and queryset:
 | 
						|
                model_cls = queryset[0].__class__
 | 
						|
            else:
 | 
						|
                model_cls = queryset.model
 | 
						|
        except AssertionError as e:
 | 
						|
            # logger.error(f'Error get model cls: {e}')
 | 
						|
            model_cls = None
 | 
						|
        except AttributeError as e:
 | 
						|
            # logger.error(f'Error get model cls: {e}')
 | 
						|
            model_cls = None
 | 
						|
        except Exception as e:
 | 
						|
            # logger.error('Error get model class: {} of {}'.format(e, view))
 | 
						|
            raise e
 | 
						|
        return model_cls
 | 
						|
 | 
						|
    def get_require_perms(self, request, view):
 | 
						|
        """
 | 
						|
        获取 request, view 需要的 perms
 | 
						|
        :param request:
 | 
						|
        :param view:
 | 
						|
        :return:
 | 
						|
        """
 | 
						|
        model_cls = self.get_model_cls(view)
 | 
						|
        action = getattr(view, 'action', None)
 | 
						|
        if not action:
 | 
						|
            action = request.method
 | 
						|
        perms = self._get_action_perms(action, model_cls, view)
 | 
						|
        return perms
 | 
						|
 | 
						|
    def has_permission(self, request, view):
 | 
						|
        # Workaround to ensure DjangoModelPermissions are not applied
 | 
						|
        # to the root view when using DefaultRouter.
 | 
						|
        if getattr(view, '_ignore_rbac_permissions', False):
 | 
						|
            return True
 | 
						|
        if not request.user:
 | 
						|
            return False
 | 
						|
        if request.user.is_anonymous and self.authenticated_users_only:
 | 
						|
            return False
 | 
						|
 | 
						|
        raw_action = getattr(view, 'raw_action', request.method)
 | 
						|
        if raw_action in ['metadata', 'OPTIONS']:
 | 
						|
            return True
 | 
						|
 | 
						|
        perms = self.get_require_perms(request, view)
 | 
						|
        if isinstance(perms, str):
 | 
						|
            perms = [perms]
 | 
						|
        has = request.user.has_perms(perms)
 | 
						|
        logger.debug('Api require perms: {}, result: {}'.format(perms, has))
 | 
						|
        return has
 | 
						|
 | 
						|
    def has_object_permission(self, request, view, obj):
 | 
						|
        return self.has_permission(request, view)
 |