perf: 优化 ops task

pull/12835/head
ibuler 2024-03-18 14:19:15 +08:00 committed by 老广
parent 90d4914280
commit e9f591b33b
4 changed files with 46 additions and 33 deletions

View File

@ -17,9 +17,6 @@ class AdHocViewSet(OrgBulkModelViewSet):
search_fields = ('name', 'comment') search_fields = ('name', 'comment')
model = AdHoc model = AdHoc
def allow_bulk_destroy(self, qs, filtered):
return True
def get_queryset(self): def get_queryset(self):
queryset = super().get_queryset() queryset = super().get_queryset()
return queryset.filter(creator=self.request.user) return queryset.filter(creator=self.request.user)

View File

@ -1,6 +1,5 @@
import json import json
import os import os
from psutil import NoSuchProcess
from celery.result import AsyncResult from celery.result import AsyncResult
from django.conf import settings from django.conf import settings
@ -20,7 +19,9 @@ from common.permissions import IsValidUser
from ops.celery import app from ops.celery import app
from ops.const import Types from ops.const import Types
from ops.models import Job, JobExecution from ops.models import Job, JobExecution
from ops.serializers.job import JobSerializer, JobExecutionSerializer, FileSerializer, JobTaskStopSerializer from ops.serializers.job import (
JobSerializer, JobExecutionSerializer, FileSerializer, JobTaskStopSerializer
)
__all__ = [ __all__ = [
'JobViewSet', 'JobExecutionViewSet', 'JobRunVariableHelpAPIView', 'JobExecutionTaskDetail', 'UsernameHintsAPI' 'JobViewSet', 'JobExecutionViewSet', 'JobRunVariableHelpAPIView', 'JobExecutionTaskDetail', 'UsernameHintsAPI'
@ -43,7 +44,8 @@ def set_task_to_serializer_data(serializer, task_id):
def merge_nodes_and_assets(nodes, assets, user): def merge_nodes_and_assets(nodes, assets, user):
if nodes: if not nodes:
return assets
perm_util = UserPermAssetUtil(user=user) perm_util = UserPermAssetUtil(user=user)
for node_id in nodes: for node_id in nodes:
if node_id == PermNode.FAVORITE_NODE_KEY: if node_id == PermNode.FAVORITE_NODE_KEY:
@ -51,7 +53,7 @@ def merge_nodes_and_assets(nodes, assets, user):
elif node_id == PermNode.UNGROUPED_NODE_KEY: elif node_id == PermNode.UNGROUPED_NODE_KEY:
node_assets = perm_util.get_ungroup_assets() node_assets = perm_util.get_ungroup_assets()
else: else:
node, node_assets = perm_util.get_node_all_assets(node_id) _, node_assets = perm_util.get_node_all_assets(node_id)
assets.extend(node_assets.exclude(id__in=[asset.id for asset in assets])) assets.extend(node_assets.exclude(id__in=[asset.id for asset in assets]))
return assets return assets
@ -70,12 +72,13 @@ class JobViewSet(OrgBulkModelViewSet):
return self.permission_denied(request, "Command execution disabled") return self.permission_denied(request, "Command execution disabled")
return super().check_permissions(request) return super().check_permissions(request)
def allow_bulk_destroy(self, qs, filtered):
return True
def get_queryset(self): def get_queryset(self):
queryset = super().get_queryset() queryset = super().get_queryset()
queryset = queryset.filter(creator=self.request.user).exclude(type=Types.upload_file) queryset = queryset \
.filter(creator=self.request.user) \
.exclude(type=Types.upload_file)
# Job 列表不显示 adhoc, retrieve 要取状态
if self.action != 'retrieve': if self.action != 'retrieve':
return queryset.filter(instant=False) return queryset.filter(instant=False)
return queryset return queryset
@ -83,10 +86,11 @@ class JobViewSet(OrgBulkModelViewSet):
def perform_create(self, serializer): def perform_create(self, serializer):
run_after_save = serializer.validated_data.pop('run_after_save', False) run_after_save = serializer.validated_data.pop('run_after_save', False)
node_ids = serializer.validated_data.pop('nodes', []) node_ids = serializer.validated_data.pop('nodes', [])
assets = serializer.validated_data.__getitem__('assets') assets = serializer.validated_data.get('assets')
assets = merge_nodes_and_assets(node_ids, assets, self.request.user) assets = merge_nodes_and_assets(node_ids, assets, self.request.user)
serializer.validated_data.__setitem__('assets', assets) serializer.validated_data['assets'] = assets
instance = serializer.save() instance = serializer.save()
if instance.instant or run_after_save: if instance.instant or run_after_save:
self.run_job(instance, serializer) self.run_job(instance, serializer)
@ -103,7 +107,10 @@ class JobViewSet(OrgBulkModelViewSet):
set_task_to_serializer_data(serializer, execution.id) set_task_to_serializer_data(serializer, execution.id)
transaction.on_commit( transaction.on_commit(
lambda: run_ops_job_execution.apply_async((str(execution.id),), task_id=str(execution.id))) lambda: run_ops_job_execution.apply_async(
(str(execution.id),), task_id=str(execution.id)
)
)
@staticmethod @staticmethod
def get_duplicates_files(files): def get_duplicates_files(files):
@ -124,8 +131,8 @@ class JobViewSet(OrgBulkModelViewSet):
exceeds_limit_files.append(file) exceeds_limit_files.append(file)
return exceeds_limit_files return exceeds_limit_files
@action(methods=[POST], detail=False, serializer_class=FileSerializer, permission_classes=[IsValidUser, ], @action(methods=[POST], detail=False, serializer_class=FileSerializer,
url_path='upload') permission_classes=[IsValidUser, ], url_path='upload')
def upload(self, request, *args, **kwargs): def upload(self, request, *args, **kwargs):
uploaded_files = request.FILES.getlist('files') uploaded_files = request.FILES.getlist('files')
serializer = self.get_serializer(data=request.data) serializer = self.get_serializer(data=request.data)

View File

@ -52,16 +52,16 @@ class PlaybookViewSet(OrgBulkModelViewSet):
if 'multipart/form-data' in self.request.headers['Content-Type']: if 'multipart/form-data' in self.request.headers['Content-Type']:
src_path = safe_join(settings.MEDIA_ROOT, instance.path.name) src_path = safe_join(settings.MEDIA_ROOT, instance.path.name)
dest_path = safe_join(settings.DATA_DIR, "ops", "playbook", instance.id.__str__()) dest_path = safe_join(settings.DATA_DIR, "ops", "playbook", instance.id.__str__())
try: try:
unzip_playbook(src_path, dest_path) unzip_playbook(src_path, dest_path)
except RuntimeError as e: except RuntimeError:
raise JMSException(code='invalid_playbook_file', detail={"msg": "Unzip failed"}) raise JMSException(code='invalid_playbook_file', detail={"msg": "Unzip failed"})
if 'main.yml' not in os.listdir(dest_path): if 'main.yml' not in os.listdir(dest_path):
raise PlaybookNoValidEntry raise PlaybookNoValidEntry
else: elif instance.create_method == 'blank':
if instance.create_method == 'blank':
dest_path = safe_join(settings.DATA_DIR, "ops", "playbook", instance.id.__str__()) dest_path = safe_join(settings.DATA_DIR, "ops", "playbook", instance.id.__str__())
os.makedirs(dest_path) os.makedirs(dest_path)
with open(safe_join(dest_path, 'main.yml'), 'w') as f: with open(safe_join(dest_path, 'main.yml'), 'w') as f:
@ -71,7 +71,7 @@ class PlaybookViewSet(OrgBulkModelViewSet):
class PlaybookFileBrowserAPIView(APIView): class PlaybookFileBrowserAPIView(APIView):
permission_classes = (RBACPermission,) permission_classes = (RBACPermission,)
rbac_perms = { rbac_perms = {
'GET': 'ops.change_playbook', 'GET': 'ops.view_playbook',
'POST': 'ops.change_playbook', 'POST': 'ops.change_playbook',
'DELETE': 'ops.change_playbook', 'DELETE': 'ops.change_playbook',
'PATCH': 'ops.change_playbook', 'PATCH': 'ops.change_playbook',

View File

@ -1,10 +1,11 @@
# coding: utf-8 # coding: utf-8
import datetime import datetime
import time
from celery import shared_task from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from django.utils.translation import gettext_lazy as _
from django.utils import timezone from django.utils import timezone
from django.utils.translation import gettext_lazy as _
from django_celery_beat.models import PeriodicTask from django_celery_beat.models import PeriodicTask
from common.const.crontab import CRONTAB_AT_AM_TWO from common.const.crontab import CRONTAB_AT_AM_TWO
@ -143,3 +144,11 @@ def clean_job_execution_period():
with tmp_to_root_org(): with tmp_to_root_org():
del_res = JobExecution.objects.filter(date_created__lt=expired_day).delete() del_res = JobExecution.objects.filter(date_created__lt=expired_day).delete()
logger.info(f"clean job_execution db record success! delete {days} days {del_res[0]} records") logger.info(f"clean job_execution db record success! delete {days} days {del_res[0]} records")
@shared_task
def longtime_add(x, y):
print('long time task begins')
time.sleep(50)
print('long time task finished')
return x + y