mirror of https://github.com/jumpserver/jumpserver
				
				
				
			
		
			
				
	
	
		
			191 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
			
		
		
	
	
			191 lines
		
	
	
		
			6.3 KiB
		
	
	
	
		
			Python
		
	
	
# -*- coding: utf-8 -*-
 | 
						|
#
 | 
						|
from django.utils.translation import gettext_lazy as _
 | 
						|
from rest_framework import serializers
 | 
						|
 | 
						|
from audits.backends.db import OperateLogStore
 | 
						|
from common.serializers.fields import LabeledChoiceField, ObjectRelatedField
 | 
						|
from common.utils import reverse, i18n_trans
 | 
						|
from common.utils.timezone import as_current_tz
 | 
						|
from ops.serializers.job import JobExecutionSerializer
 | 
						|
from orgs.mixins.serializers import BulkOrgResourceModelSerializer
 | 
						|
from terminal.models import Session
 | 
						|
from users.models import User
 | 
						|
from . import models
 | 
						|
from .const import (
 | 
						|
    ActionChoices, OperateChoices,
 | 
						|
    MFAChoices, LoginStatusChoices,
 | 
						|
    LoginTypeChoices, ActivityChoices,
 | 
						|
)
 | 
						|
 | 
						|
 | 
						|
class JobLogSerializer(JobExecutionSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = models.JobLog
 | 
						|
        read_only_fields = [
 | 
						|
            "id", "material", "time_cost", 'date_start',
 | 
						|
            'date_finished', 'date_created',
 | 
						|
            'is_finished', 'is_success', 'created_by',
 | 
						|
            'task_id'
 | 
						|
        ]
 | 
						|
        fields = read_only_fields + []
 | 
						|
 | 
						|
 | 
						|
class FTPLogSerializer(serializers.ModelSerializer):
 | 
						|
    operate = LabeledChoiceField(choices=OperateChoices.choices, label=_("Operate"))
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = models.FTPLog
 | 
						|
        fields_mini = ["id"]
 | 
						|
        fields_small = fields_mini + [
 | 
						|
            "user", "remote_addr", "asset", "account",
 | 
						|
            "org_id", "operate", "filename", "date_start",
 | 
						|
            "is_success", "has_file",
 | 
						|
        ]
 | 
						|
        fields = fields_small
 | 
						|
 | 
						|
 | 
						|
class UserLoginLogSerializer(serializers.ModelSerializer):
 | 
						|
    mfa = LabeledChoiceField(choices=MFAChoices.choices, label=_("MFA"))
 | 
						|
    type = LabeledChoiceField(choices=LoginTypeChoices.choices, label=_("Type"))
 | 
						|
    status = LabeledChoiceField(choices=LoginStatusChoices.choices, label=_("Status"))
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = models.UserLoginLog
 | 
						|
        fields_mini = ["id"]
 | 
						|
        fields_small = fields_mini + [
 | 
						|
            "username", "type", "ip",
 | 
						|
            "city", "user_agent", "mfa",
 | 
						|
            "reason", "reason_display",
 | 
						|
            "backend", "backend_display",
 | 
						|
            "status", "datetime",
 | 
						|
        ]
 | 
						|
        fields = fields_small
 | 
						|
        extra_kwargs = {
 | 
						|
            "user_agent": {"label": _("User agent")},
 | 
						|
            "reason_display": {"label": _("Reason display")},
 | 
						|
            "backend_display": {"label": _("Authentication backend")},
 | 
						|
        }
 | 
						|
 | 
						|
 | 
						|
class OperateLogActionDetailSerializer(serializers.ModelSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = models.OperateLog
 | 
						|
        fields = ('diff',)
 | 
						|
 | 
						|
    def to_representation(self, instance):
 | 
						|
        data = super().to_representation(instance)
 | 
						|
        diff = OperateLogStore.convert_diff_friendly(data['diff'])
 | 
						|
        data['diff'] = diff
 | 
						|
        return data
 | 
						|
 | 
						|
 | 
						|
class OperateLogSerializer(BulkOrgResourceModelSerializer):
 | 
						|
    action = LabeledChoiceField(choices=ActionChoices.choices, label=_("Action"))
 | 
						|
    resource = serializers.SerializerMethodField(label=_("Resource"))
 | 
						|
    resource_type = serializers.SerializerMethodField(label=_('Resource Type'))
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = models.OperateLog
 | 
						|
        fields_mini = ["id"]
 | 
						|
        fields_small = fields_mini + [
 | 
						|
            "user", "action", "resource_type",
 | 
						|
            "resource", "remote_addr", "datetime",
 | 
						|
            "org_id",
 | 
						|
        ]
 | 
						|
        fields = fields_small
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_resource_type(instance):
 | 
						|
        return _(instance.resource_type)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_resource(instance):
 | 
						|
        return i18n_trans(instance.resource)
 | 
						|
 | 
						|
 | 
						|
class PasswordChangeLogSerializer(serializers.ModelSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = models.PasswordChangeLog
 | 
						|
        fields = ("id", "user", "change_by", "remote_addr", "datetime")
 | 
						|
 | 
						|
 | 
						|
class SessionAuditSerializer(serializers.ModelSerializer):
 | 
						|
    class Meta:
 | 
						|
        model = Session
 | 
						|
        fields = "__all__"
 | 
						|
 | 
						|
 | 
						|
class ActivityUnionLogSerializer(serializers.Serializer):
 | 
						|
    id = serializers.CharField()
 | 
						|
    timestamp = serializers.SerializerMethodField()
 | 
						|
    detail_url = serializers.SerializerMethodField()
 | 
						|
    content = serializers.SerializerMethodField()
 | 
						|
    r_type = serializers.CharField(read_only=True)
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_timestamp(obj):
 | 
						|
        return as_current_tz(obj['datetime']).strftime('%Y-%m-%d %H:%M:%S')
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_content(obj):
 | 
						|
        if not obj['r_detail']:
 | 
						|
            action = obj['r_action'].replace('_', ' ').capitalize()
 | 
						|
            ctn = _('User %s %s this resource') % (obj['r_user'], _(action))
 | 
						|
        else:
 | 
						|
            ctn = i18n_trans(obj['r_detail'])
 | 
						|
        return ctn
 | 
						|
 | 
						|
    @staticmethod
 | 
						|
    def get_detail_url(obj):
 | 
						|
        detail_url = ''
 | 
						|
        detail_id, obj_type = obj['r_detail_id'], obj['r_type']
 | 
						|
        if not detail_id:
 | 
						|
            return detail_url
 | 
						|
 | 
						|
        if obj_type == ActivityChoices.operate_log:
 | 
						|
            detail_url = '%s?%s' % (
 | 
						|
                reverse(
 | 
						|
                    'audits:operate-log-detail',
 | 
						|
                    kwargs={'pk': obj['id']},
 | 
						|
                ), 'type=action_detail')
 | 
						|
        elif obj_type == ActivityChoices.task:
 | 
						|
            detail_url = reverse(
 | 
						|
                'ops:celery-task-log', kwargs={'pk': detail_id}
 | 
						|
            )
 | 
						|
        elif obj_type == ActivityChoices.login_log:
 | 
						|
            detail_url = reverse(
 | 
						|
                'audits:login-log-detail',
 | 
						|
                kwargs={'pk': detail_id},
 | 
						|
                api_to_ui=True, is_audit=True
 | 
						|
            )
 | 
						|
        return detail_url
 | 
						|
 | 
						|
 | 
						|
class FileSerializer(serializers.Serializer):
 | 
						|
    file = serializers.FileField(allow_empty_file=True)
 | 
						|
 | 
						|
 | 
						|
class UserSessionSerializer(serializers.ModelSerializer):
 | 
						|
    type = LabeledChoiceField(choices=LoginTypeChoices.choices, label=_("Type"))
 | 
						|
    user = ObjectRelatedField(required=False, queryset=User.objects, label=_('User'))
 | 
						|
    is_current_user_session = serializers.SerializerMethodField()
 | 
						|
 | 
						|
    class Meta:
 | 
						|
        model = models.UserSession
 | 
						|
        fields_mini = ['id']
 | 
						|
        fields_small = fields_mini + [
 | 
						|
            'type', 'ip', 'city', 'user_agent', 'user', 'is_current_user_session',
 | 
						|
            'backend', 'backend_display', 'date_created', 'date_expired'
 | 
						|
        ]
 | 
						|
        fields = fields_small
 | 
						|
        extra_kwargs = {
 | 
						|
            "backend_display": {"label": _("Authentication backend")},
 | 
						|
        }
 | 
						|
 | 
						|
    def get_is_current_user_session(self, obj):
 | 
						|
        request = self.context.get('request')
 | 
						|
        if not request:
 | 
						|
            return False
 | 
						|
        return request.session.session_key == obj.key
 |