mirror of https://github.com/jumpserver/jumpserver
perf: 修复事务中任务执行
parent
b023ca0c69
commit
cdbe5d31e9
|
@ -2,7 +2,6 @@ import uuid
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
|
|
||||||
from django.db import IntegrityError
|
from django.db import IntegrityError
|
||||||
from django.db import transaction
|
|
||||||
from django.db.models import Q
|
from django.db.models import Q
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from rest_framework import serializers
|
from rest_framework import serializers
|
||||||
|
@ -121,10 +120,6 @@ class AccountCreateUpdateSerializerMixin(serializers.Serializer):
|
||||||
def push_account_if_need(self, instance, push_now, params, stat):
|
def push_account_if_need(self, instance, push_now, params, stat):
|
||||||
if not push_now or stat not in ['created', 'updated']:
|
if not push_now or stat not in ['created', 'updated']:
|
||||||
return
|
return
|
||||||
transaction.on_commit(lambda: self.start_push(instance, params))
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def start_push(instance, params):
|
|
||||||
push_accounts_to_assets_task.delay([str(instance.id)], params)
|
push_accounts_to_assets_task.delay([str(instance.id)], params)
|
||||||
|
|
||||||
def get_validators(self):
|
def get_validators(self):
|
||||||
|
|
|
@ -3,6 +3,7 @@ from django.utils.translation import gettext_noop, gettext_lazy as _
|
||||||
|
|
||||||
from accounts.const import AutomationTypes
|
from accounts.const import AutomationTypes
|
||||||
from accounts.tasks.common import quickstart_automation_by_snapshot
|
from accounts.tasks.common import quickstart_automation_by_snapshot
|
||||||
|
from common.decorators import on_transaction_commit
|
||||||
from common.utils import get_logger
|
from common.utils import get_logger
|
||||||
|
|
||||||
logger = get_logger(__file__)
|
logger = get_logger(__file__)
|
||||||
|
@ -15,6 +16,7 @@ __all__ = [
|
||||||
queue="ansible", verbose_name=_('Push accounts to assets'),
|
queue="ansible", verbose_name=_('Push accounts to assets'),
|
||||||
activity_callback=lambda self, account_ids, *args, **kwargs: (account_ids, None)
|
activity_callback=lambda self, account_ids, *args, **kwargs: (account_ids, None)
|
||||||
)
|
)
|
||||||
|
@on_transaction_commit
|
||||||
def push_accounts_to_assets_task(account_ids, params=None):
|
def push_accounts_to_assets_task(account_ids, params=None):
|
||||||
from accounts.models import PushAccountAutomation
|
from accounts.models import PushAccountAutomation
|
||||||
from accounts.models import Account
|
from accounts.models import Account
|
||||||
|
|
|
@ -18,6 +18,7 @@ def on_transaction_commit(func):
|
||||||
如果不调用on_commit, 对象创建时添加多对多字段值失败
|
如果不调用on_commit, 对象创建时添加多对多字段值失败
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
@wraps(func)
|
||||||
def inner(*args, **kwargs):
|
def inner(*args, **kwargs):
|
||||||
transaction.on_commit(lambda: func(*args, **kwargs))
|
transaction.on_commit(lambda: func(*args, **kwargs))
|
||||||
|
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
from django.db import transaction
|
|
||||||
from django.db.models import Count
|
from django.db.models import Count
|
||||||
from django.db.transaction import atomic
|
|
||||||
from django.shortcuts import get_object_or_404
|
from django.shortcuts import get_object_or_404
|
||||||
from rest_framework.response import Response
|
from rest_framework.response import Response
|
||||||
from rest_framework.views import APIView
|
from rest_framework.views import APIView
|
||||||
|
@ -88,8 +86,7 @@ class JobViewSet(OrgBulkModelViewSet):
|
||||||
execution.save()
|
execution.save()
|
||||||
|
|
||||||
set_task_to_serializer_data(serializer, execution.id)
|
set_task_to_serializer_data(serializer, execution.id)
|
||||||
transaction.on_commit(
|
run_ops_job_execution.apply_async((str(execution.id),), task_id=str(execution.id))
|
||||||
lambda: run_ops_job_execution.apply_async((str(execution.id),), task_id=str(execution.id)))
|
|
||||||
|
|
||||||
|
|
||||||
class JobExecutionViewSet(OrgBulkModelViewSet):
|
class JobExecutionViewSet(OrgBulkModelViewSet):
|
||||||
|
@ -112,8 +109,7 @@ class JobExecutionViewSet(OrgBulkModelViewSet):
|
||||||
instance.save()
|
instance.save()
|
||||||
|
|
||||||
set_task_to_serializer_data(serializer, instance.id)
|
set_task_to_serializer_data(serializer, instance.id)
|
||||||
transaction.on_commit(
|
run_ops_job_execution.apply_async((str(instance.id),), task_id=str(instance.id))
|
||||||
lambda: run_ops_job_execution.apply_async((str(instance.id),), task_id=str(instance.id)))
|
|
||||||
|
|
||||||
def get_queryset(self):
|
def get_queryset(self):
|
||||||
queryset = super().get_queryset()
|
queryset = super().get_queryset()
|
||||||
|
|
|
@ -5,6 +5,7 @@ from celery.exceptions import SoftTimeLimitExceeded
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
from django_celery_beat.models import PeriodicTask
|
from django_celery_beat.models import PeriodicTask
|
||||||
|
|
||||||
|
from common.decorators import on_transaction_commit
|
||||||
from common.utils import get_logger, get_object_or_none
|
from common.utils import get_logger, get_object_or_none
|
||||||
from ops.celery import app
|
from ops.celery import app
|
||||||
from orgs.utils import tmp_to_org, tmp_to_root_org
|
from orgs.utils import tmp_to_org, tmp_to_root_org
|
||||||
|
@ -68,6 +69,7 @@ def job_execution_task_activity_callback(self, execution_id, *args, **kwargs):
|
||||||
soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task execution"),
|
soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task execution"),
|
||||||
activity_callback=job_execution_task_activity_callback
|
activity_callback=job_execution_task_activity_callback
|
||||||
)
|
)
|
||||||
|
@on_transaction_commit
|
||||||
def run_ops_job_execution(execution_id, **kwargs):
|
def run_ops_job_execution(execution_id, **kwargs):
|
||||||
with tmp_to_root_org():
|
with tmp_to_root_org():
|
||||||
execution = get_object_or_none(JobExecution, id=execution_id)
|
execution = get_object_or_none(JobExecution, id=execution_id)
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
import uuid
|
import uuid
|
||||||
|
|
||||||
from django.db import transaction
|
|
||||||
from rest_framework import status
|
from rest_framework import status
|
||||||
from rest_framework import viewsets
|
from rest_framework import viewsets
|
||||||
from rest_framework.decorators import action
|
from rest_framework.decorators import action
|
||||||
|
@ -58,17 +57,13 @@ class AppletHostDeploymentViewSet(viewsets.ModelViewSet):
|
||||||
('applets', 'terminal.view_AppletHostDeployment'),
|
('applets', 'terminal.view_AppletHostDeployment'),
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def start_deploy(instance):
|
|
||||||
task = run_applet_host_deployment.apply_async((instance.id,), task_id=str(instance.id))
|
|
||||||
|
|
||||||
def create(self, request, *args, **kwargs):
|
def create(self, request, *args, **kwargs):
|
||||||
serializer = self.get_serializer(data=request.data)
|
serializer = self.get_serializer(data=request.data)
|
||||||
serializer.is_valid(raise_exception=True)
|
serializer.is_valid(raise_exception=True)
|
||||||
instance = serializer.save()
|
instance = serializer.save()
|
||||||
|
task = run_applet_host_deployment.delay(instance.id)
|
||||||
instance.save_task(instance.id)
|
instance.save_task(instance.id)
|
||||||
transaction.on_commit(lambda: self.start_deploy(instance))
|
return Response({'task': str(task.id)}, status=201)
|
||||||
return Response({'task': str(instance.id)}, status=201)
|
|
||||||
|
|
||||||
@action(methods=['post'], detail=False)
|
@action(methods=['post'], detail=False)
|
||||||
def applets(self, request, *args, **kwargs):
|
def applets(self, request, *args, **kwargs):
|
||||||
|
@ -84,10 +79,5 @@ class AppletHostDeploymentViewSet(viewsets.ModelViewSet):
|
||||||
applet_host_deployment_ids = [str(obj.id) for obj in applet_host_deployments]
|
applet_host_deployment_ids = [str(obj.id) for obj in applet_host_deployments]
|
||||||
task_id = str(uuid.uuid4())
|
task_id = str(uuid.uuid4())
|
||||||
model.objects.filter(id__in=applet_host_deployment_ids).update(task=task_id)
|
model.objects.filter(id__in=applet_host_deployment_ids).update(task=task_id)
|
||||||
transaction.on_commit(lambda: self.start_install_applet(applet_host_deployment_ids, applet_id, task_id))
|
run_applet_host_deployment_install_applet.delay(applet_host_deployment_ids, applet_id)
|
||||||
return Response({'task': task_id}, status=201)
|
return Response({'task': task_id}, status=201)
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def start_install_applet(applet_host_deployment_ids, applet_id, task_id):
|
|
||||||
run_applet_host_deployment_install_applet.apply_async((applet_host_deployment_ids, applet_id),
|
|
||||||
task_id=str(task_id))
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ from django.core.files.storage import default_storage
|
||||||
from django.utils import timezone
|
from django.utils import timezone
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
|
||||||
|
from common.decorators import on_transaction_commit
|
||||||
from common.storage.replay import ReplayStorageHandler
|
from common.storage.replay import ReplayStorageHandler
|
||||||
from ops.celery.decorator import (
|
from ops.celery.decorator import (
|
||||||
register_as_period_task, after_app_ready_start,
|
register_as_period_task, after_app_ready_start,
|
||||||
|
@ -90,6 +91,7 @@ def upload_session_replay_to_external_storage(session_id):
|
||||||
verbose_name=_('Run applet host deployment'),
|
verbose_name=_('Run applet host deployment'),
|
||||||
activity_callback=lambda self, did, *args, **kwargs: ([did],)
|
activity_callback=lambda self, did, *args, **kwargs: ([did],)
|
||||||
)
|
)
|
||||||
|
@on_transaction_commit
|
||||||
def run_applet_host_deployment(did):
|
def run_applet_host_deployment(did):
|
||||||
with tmp_to_builtin_org(system=1):
|
with tmp_to_builtin_org(system=1):
|
||||||
deployment = AppletHostDeployment.objects.get(id=did)
|
deployment = AppletHostDeployment.objects.get(id=did)
|
||||||
|
@ -100,6 +102,7 @@ def run_applet_host_deployment(did):
|
||||||
verbose_name=_('Install applet'),
|
verbose_name=_('Install applet'),
|
||||||
activity_callback=lambda self, ids, applet_id, *args, **kwargs: (ids,)
|
activity_callback=lambda self, ids, applet_id, *args, **kwargs: (ids,)
|
||||||
)
|
)
|
||||||
|
@on_transaction_commit
|
||||||
def run_applet_host_deployment_install_applet(ids, applet_id):
|
def run_applet_host_deployment_install_applet(ids, applet_id):
|
||||||
with tmp_to_builtin_org(system=1):
|
with tmp_to_builtin_org(system=1):
|
||||||
for did in ids:
|
for did in ids:
|
||||||
|
|
Loading…
Reference in New Issue