perf: 优化 celery task log 权限控制

pull/12835/head
ibuler 2024-03-18 18:53:22 +08:00 committed by Bryan
parent e9f591b33b
commit b6ab3df038
5 changed files with 55 additions and 5 deletions

View File

@ -4,7 +4,7 @@ path_perms_map = {
'xpack': '*', 'xpack': '*',
'settings': '*', 'settings': '*',
'img': '*', 'img': '*',
'replay': 'default', 'replay': 'terminal.view_sessionreplay',
'applets': 'terminal.view_applet', 'applets': 'terminal.view_applet',
'virtual_apps': 'terminal.view_virtualapp', 'virtual_apps': 'terminal.view_virtualapp',
'playbooks': 'ops.view_playbook' 'playbooks': 'ops.view_playbook'

View File

@ -0,0 +1,21 @@
# Generated by Django 4.1.13 on 2024-03-18 06:47
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('ops', '0027_alter_celerytaskexecution_options'),
]
operations = [
migrations.AddField(
model_name='celerytaskexecution',
name='creator',
field=models.ForeignKey(db_constraint=False, default=None, null=True, on_delete=django.db.models.deletion.SET_NULL, to=settings.AUTH_USER_MODEL, verbose_name='Creator'),
),
]

View File

@ -82,6 +82,8 @@ class CeleryTaskExecution(models.Model):
kwargs = models.JSONField(verbose_name=_("Kwargs")) kwargs = models.JSONField(verbose_name=_("Kwargs"))
state = models.CharField(max_length=16, verbose_name=_("State")) state = models.CharField(max_length=16, verbose_name=_("State"))
is_finished = models.BooleanField(default=False, verbose_name=_("Finished")) is_finished = models.BooleanField(default=False, verbose_name=_("Finished"))
creator = models.ForeignKey('users.User', on_delete=models.SET_NULL, default=None, null=True,
verbose_name=_('Creator'), db_constraint=False)
date_published = models.DateTimeField(auto_now_add=True, verbose_name=_('Date published')) date_published = models.DateTimeField(auto_now_add=True, verbose_name=_('Date published'))
date_start = models.DateTimeField(null=True, verbose_name=_('Date start')) date_start = models.DateTimeField(null=True, verbose_name=_('Date start'))
date_finished = models.DateTimeField(null=True, verbose_name=_('Date finished')) date_finished = models.DateTimeField(null=True, verbose_name=_('Date finished'))

View File

@ -1,8 +1,7 @@
import ast import ast
import psutil
from psutil import NoSuchProcess
import time import time
import psutil
from celery import signals from celery import signals
from django.core.cache import cache from django.core.cache import cache
from django.db import transaction from django.db import transaction
@ -11,10 +10,12 @@ from django.db.utils import ProgrammingError
from django.dispatch import receiver from django.dispatch import receiver
from django.utils import translation, timezone from django.utils import translation, timezone
from django.utils.functional import LazyObject from django.utils.functional import LazyObject
from psutil import NoSuchProcess
from common.db.utils import close_old_connections, get_logger from common.db.utils import close_old_connections, get_logger
from common.signals import django_ready from common.signals import django_ready
from common.utils.connection import RedisPubSub from common.utils.connection import RedisPubSub
from jumpserver.utils import get_current_request
from orgs.utils import get_current_org_id, set_current_org from orgs.utils import get_current_org_id, set_current_org
from .celery import app from .celery import app
from .models import CeleryTaskExecution, CeleryTask, Job from .models import CeleryTaskExecution, CeleryTask, Job
@ -146,6 +147,9 @@ def task_sent_handler(headers=None, body=None, **kwargs):
'args': args, 'args': args,
'kwargs': kwargs 'kwargs': kwargs
} }
request = get_current_request()
if request and request.user.is_authenticated:
data['creator'] = request.user
CeleryTaskExecution.objects.create(**data) CeleryTaskExecution.objects.create(**data)
CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now()) CeleryTask.objects.filter(name=task).update(date_last_publish=timezone.now())

View File

@ -2,6 +2,7 @@ import asyncio
import os import os
import aiofiles import aiofiles
from asgiref.sync import sync_to_async
from channels.generic.websocket import AsyncJsonWebsocketConsumer from channels.generic.websocket import AsyncJsonWebsocketConsumer
from common.db.utils import close_old_connections from common.db.utils import close_old_connections
@ -9,12 +10,17 @@ from common.utils import get_logger
from .ansible.utils import get_ansible_task_log_path from .ansible.utils import get_ansible_task_log_path
from .celery.utils import get_celery_task_log_path from .celery.utils import get_celery_task_log_path
from .const import CELERY_LOG_MAGIC_MARK from .const import CELERY_LOG_MAGIC_MARK
from .models import CeleryTaskExecution
logger = get_logger(__name__) logger = get_logger(__name__)
class TaskLogWebsocket(AsyncJsonWebsocketConsumer): class TaskLogWebsocket(AsyncJsonWebsocketConsumer):
disconnected = False disconnected = False
user_tasks = (
'ops.tasks.run_ops_job',
'ops.tasks.run_ops_job_execution',
)
log_types = { log_types = {
'celery': get_celery_task_log_path, 'celery': get_celery_task_log_path,
@ -33,10 +39,27 @@ class TaskLogWebsocket(AsyncJsonWebsocketConsumer):
if func: if func:
return func(task_id) return func(task_id)
@sync_to_async
def get_task(self, task_id):
task = CeleryTaskExecution.objects.filter(id=task_id).first()
# task.creator 是 foreign key, 会异步去查询的,在下面的 if task.creator 会报错, 所以这里先取出来
if task and task.creator != ' ':
return task
else:
return None
async def receive_json(self, content, **kwargs): async def receive_json(self, content, **kwargs):
task_id = content.get('task') task_id = content.get('task')
task_typ = content.get('type', 'celery') task = await self.get_task(task_id)
log_path = self.get_log_path(task_id, task_typ) if not task:
await self.send_json({'message': 'Task not found', 'task': task_id})
return
if task.name in self.user_tasks and task.creator != self.scope['user']:
await self.send_json({'message': 'No permission', 'task': task_id})
return
task_type = content.get('type', 'celery')
log_path = self.get_log_path(task_id, task_type)
await self.async_handle_task(task_id, log_path) await self.async_handle_task(task_id, log_path)
async def async_handle_task(self, task_id, log_path): async def async_handle_task(self, task_id, log_path):