feat: 增加异步任务api

pull/8991/head
Aaron3S 2022-10-24 20:14:18 +08:00
parent 1372d6322d
commit 64e03a4412
10 changed files with 178 additions and 27 deletions

View File

@ -12,7 +12,7 @@ from .mixin import BulkListSerializerMixin, BulkSerializerMixin
__all__ = [ __all__ = [
'MethodSerializer', 'EmptySerializer', 'BulkModelSerializer', 'MethodSerializer', 'EmptySerializer', 'BulkModelSerializer',
'AdaptedBulkListSerializer', 'CeleryTaskSerializer', 'AdaptedBulkListSerializer', 'CeleryTaskExecutionSerializer',
'WritableNestedModelSerializer', 'WritableNestedModelSerializer',
'GroupedChoiceSerializer', 'GroupedChoiceSerializer',
] ]
@ -73,7 +73,7 @@ class AdaptedBulkListSerializer(BulkListSerializerMixin, BulkListSerializer):
pass pass
class CeleryTaskSerializer(serializers.Serializer): class CeleryTaskExecutionSerializer(serializers.Serializer):
task = serializers.CharField(read_only=True) task = serializers.CharField(read_only=True)

View File

@ -5,7 +5,7 @@ from django.shortcuts import get_object_or_404
from rest_framework import viewsets, generics from rest_framework import viewsets, generics
from rest_framework.views import Response from rest_framework.views import Response
from common.drf.serializers import CeleryTaskSerializer from common.drf.serializers import CeleryTaskExecutionSerializer
from ..models import AdHoc, AdHocExecution from ..models import AdHoc, AdHocExecution
from ..serializers import ( from ..serializers import (
AdHocSerializer, AdHocSerializer,

View File

@ -4,6 +4,7 @@
import os import os
import re import re
from celery import current_app
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
from rest_framework import viewsets from rest_framework import viewsets
from celery.result import AsyncResult from celery.result import AsyncResult
@ -12,20 +13,21 @@ from django_celery_beat.models import PeriodicTask
from common.permissions import IsValidUser from common.permissions import IsValidUser
from common.api import LogTailApi from common.api import LogTailApi
from ..models import CeleryTask from ..models import CeleryTaskExecution, CeleryTask
from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer from ..serializers import CeleryResultSerializer, CeleryPeriodTaskSerializer
from ..celery.utils import get_celery_task_log_path from ..celery.utils import get_celery_task_log_path
from ..ansible.utils import get_ansible_task_log_path from ..ansible.utils import get_ansible_task_log_path
from common.mixins.api import CommonApiMixin from common.mixins.api import CommonApiMixin
__all__ = [ __all__ = [
'CeleryTaskLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet', 'CeleryTaskExecutionLogApi', 'CeleryResultApi', 'CeleryPeriodTaskViewSet',
'AnsibleTaskLogApi', 'AnsibleTaskLogApi', 'CeleryTaskViewSet', 'CeleryTaskExecutionViewSet'
] ]
from ..serializers.celery import CeleryTaskSerializer, CeleryTaskExecutionSerializer
class CeleryTaskLogApi(LogTailApi):
class CeleryTaskExecutionLogApi(LogTailApi):
permission_classes = (IsValidUser,) permission_classes = (IsValidUser,)
task = None task = None
task_id = '' task_id = ''
@ -46,8 +48,8 @@ class CeleryTaskLogApi(LogTailApi):
if new_path and os.path.isfile(new_path): if new_path and os.path.isfile(new_path):
return new_path return new_path
try: try:
task = CeleryTask.objects.get(id=self.task_id) task = CeleryTaskExecution.objects.get(id=self.task_id)
except CeleryTask.DoesNotExist: except CeleryTaskExecution.DoesNotExist:
return None return None
return task.full_log_path return task.full_log_path
@ -94,3 +96,15 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet):
queryset = super().get_queryset() queryset = super().get_queryset()
queryset = queryset.exclude(description='') queryset = queryset.exclude(description='')
return queryset return queryset
class CeleryTaskViewSet(CommonApiMixin, viewsets.ModelViewSet):
queryset = CeleryTask.objects.all()
serializer_class = CeleryTaskSerializer
http_method_names = ('get',)
class CeleryTaskExecutionViewSet(CommonApiMixin, viewsets.ModelViewSet):
queryset = CeleryTaskExecution.objects.all()
serializer_class = CeleryTaskExecutionSerializer
http_method_names = ('get',)

View File

@ -0,0 +1,67 @@
# Generated by Django 3.2.14 on 2022-10-24 09:09
from django.db import migrations, models
import uuid
class Migration(migrations.Migration):
dependencies = [
('ops', '0026_auto_20221009_2050'),
]
operations = [
migrations.CreateModel(
name='CeleryTaskExecution',
fields=[
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('name', models.CharField(max_length=1024)),
('args', models.JSONField(verbose_name='Args')),
('kwargs', models.JSONField(verbose_name='Kwargs')),
('state', models.CharField(max_length=16, verbose_name='State')),
('is_finished', models.BooleanField(default=False, verbose_name='Finished')),
('date_published', models.DateTimeField(auto_now_add=True)),
('date_start', models.DateTimeField(null=True)),
('date_finished', models.DateTimeField(null=True)),
],
),
migrations.RenameField(
model_name='celerytask',
old_name='date_finished',
new_name='date_last_published',
),
migrations.RemoveField(
model_name='celerytask',
name='args',
),
migrations.RemoveField(
model_name='celerytask',
name='date_published',
),
migrations.RemoveField(
model_name='celerytask',
name='date_start',
),
migrations.RemoveField(
model_name='celerytask',
name='is_finished',
),
migrations.RemoveField(
model_name='celerytask',
name='kwargs',
),
migrations.RemoveField(
model_name='celerytask',
name='state',
),
migrations.AddField(
model_name='celerytask',
name='description',
field=models.CharField(max_length=2048, null=True),
),
migrations.AddField(
model_name='celerytask',
name='verbose_name',
field=models.CharField(max_length=1024, null=True),
),
]

View File

@ -0,0 +1,25 @@
# Generated by Django 3.2.14 on 2022-10-24 09:12
from django.db import migrations
class Migration(migrations.Migration):
dependencies = [
('ops', '0027_auto_20221024_1709'),
]
operations = [
migrations.RemoveField(
model_name='celerytask',
name='date_last_published',
),
migrations.RemoveField(
model_name='celerytask',
name='description',
),
migrations.RemoveField(
model_name='celerytask',
name='verbose_name',
),
]

View File

@ -7,8 +7,27 @@ from django.utils.translation import gettext_lazy as _
from django.conf import settings from django.conf import settings
from django.db import models from django.db import models
from ops.celery import app
class CeleryTask(models.Model): class CeleryTask(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024)
@property
def verbose_name(self):
task = app.tasks.get(self.name, None)
if task:
return getattr(task, 'verbose_name', None)
@property
def description(self):
task = app.tasks.get(self.name, None)
if task:
return getattr(task, 'description', None)
class CeleryTaskExecution(models.Model):
LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery') LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery')
id = models.UUIDField(primary_key=True, default=uuid.uuid4) id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024) name = models.CharField(max_length=1024)

View File

@ -5,10 +5,12 @@ from rest_framework import serializers
from django_celery_beat.models import PeriodicTask from django_celery_beat.models import PeriodicTask
__all__ = [ __all__ = [
'CeleryResultSerializer', 'CeleryTaskSerializer', 'CeleryResultSerializer', 'CeleryTaskExecutionSerializer',
'CeleryPeriodTaskSerializer' 'CeleryPeriodTaskSerializer', 'CeleryTaskSerializer'
] ]
from ops.models import CeleryTask, CeleryTaskExecution
class CeleryResultSerializer(serializers.Serializer): class CeleryResultSerializer(serializers.Serializer):
id = serializers.UUIDField() id = serializers.UUIDField()
@ -16,10 +18,6 @@ class CeleryResultSerializer(serializers.Serializer):
state = serializers.CharField(max_length=16) state = serializers.CharField(max_length=16)
class CeleryTaskSerializer(serializers.Serializer):
pass
class CeleryPeriodTaskSerializer(serializers.ModelSerializer): class CeleryPeriodTaskSerializer(serializers.ModelSerializer):
class Meta: class Meta:
model = PeriodicTask model = PeriodicTask
@ -27,3 +25,19 @@ class CeleryPeriodTaskSerializer(serializers.ModelSerializer):
'name', 'task', 'enabled', 'description', 'name', 'task', 'enabled', 'description',
'last_run_at', 'total_run_count' 'last_run_at', 'total_run_count'
] ]
class CeleryTaskSerializer(serializers.ModelSerializer):
class Meta:
model = CeleryTask
fields = [
'name', 'verbose_name', 'description',
]
class CeleryTaskExecutionSerializer(serializers.ModelSerializer):
class Meta:
model = CeleryTaskExecution
fields = [
"name", "args", "kwargs", "state", "is_finished", "date_published", "date_start", "date_finished"
]

View File

@ -1,12 +1,15 @@
import ast import ast
from django.db import transaction
from django.dispatch import receiver
from django.utils import translation, timezone from django.utils import translation, timezone
from django.core.cache import cache from django.core.cache import cache
from celery import signals from celery import signals, current_app
from common.db.utils import close_old_connections, get_logger from common.db.utils import close_old_connections, get_logger
from .models import CeleryTask from common.signals import django_ready
from .celery import app
from .models import CeleryTaskExecution, CeleryTask
logger = get_logger(__name__) logger = get_logger(__name__)
@ -14,6 +17,14 @@ TASK_LANG_CACHE_KEY = 'TASK_LANG_{}'
TASK_LANG_CACHE_TTL = 1800 TASK_LANG_CACHE_TTL = 1800
@receiver(django_ready)
def sync_registered_tasks(*args, **kwargs):
with transaction.atomic():
CeleryTask.objects.all().delete()
for key in app.tasks:
CeleryTask(name=key).save()
@signals.before_task_publish.connect @signals.before_task_publish.connect
def before_task_publish(headers=None, **kwargs): def before_task_publish(headers=None, **kwargs):
task_id = headers.get('id') task_id = headers.get('id')
@ -25,7 +36,7 @@ def before_task_publish(headers=None, **kwargs):
@signals.task_prerun.connect @signals.task_prerun.connect
def on_celery_task_pre_run(task_id='', **kwargs): def on_celery_task_pre_run(task_id='', **kwargs):
# 更新状态 # 更新状态
CeleryTask.objects.filter(id=task_id).update(state='RUNNING', date_start=timezone.now()) CeleryTaskExecution.objects.filter(id=task_id).update(state='RUNNING', date_start=timezone.now())
# 关闭之前的数据库连接 # 关闭之前的数据库连接
close_old_connections() close_old_connections()
@ -41,7 +52,7 @@ def on_celery_task_post_run(task_id='', state='', **kwargs):
close_old_connections() close_old_connections()
print("Task post run: ", task_id, state) print("Task post run: ", task_id, state)
CeleryTask.objects.filter(id=task_id).update( CeleryTaskExecution.objects.filter(id=task_id).update(
state=state, date_finished=timezone.now(), is_finished=True state=state, date_finished=timezone.now(), is_finished=True
) )
@ -72,4 +83,4 @@ def task_sent_handler(headers=None, body=None, **kwargs):
'args': args, 'args': args,
'kwargs': kwargs 'kwargs': kwargs
} }
CeleryTask.objects.create(**data) CeleryTaskExecution.objects.create(**data)

View File

@ -20,7 +20,7 @@ from .celery.utils import (
create_or_update_celery_periodic_tasks, get_celery_periodic_task, create_or_update_celery_periodic_tasks, get_celery_periodic_task,
disable_celery_periodic_task, delete_celery_periodic_task disable_celery_periodic_task, delete_celery_periodic_task
) )
from .models import CeleryTask, AdHoc, Playbook from .models import CeleryTaskExecution, AdHoc, Playbook
from .notifications import ServerPerformanceCheckUtil from .notifications import ServerPerformanceCheckUtil
logger = get_logger(__file__) logger = get_logger(__file__)
@ -94,9 +94,9 @@ def clean_celery_tasks_period():
logger.debug("Start clean celery task history") logger.debug("Start clean celery task history")
expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS') expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS')
days_ago = timezone.now() - timezone.timedelta(days=expire_days) days_ago = timezone.now() - timezone.timedelta(days=expire_days)
tasks = CeleryTask.objects.filter(date_start__lt=days_ago) tasks = CeleryTaskExecution.objects.filter(date_start__lt=days_ago)
tasks.delete() tasks.delete()
tasks = CeleryTask.objects.filter(date_start__isnull=True) tasks = CeleryTaskExecution.objects.filter(date_start__isnull=True)
tasks.delete() tasks.delete()
command = "find %s -mtime +%s -name '*.log' -type f -exec rm -f {} \\;" % ( command = "find %s -mtime +%s -name '*.log' -type f -exec rm -f {} \\;" % (
settings.CELERY_LOG_DIR, expire_days settings.CELERY_LOG_DIR, expire_days

View File

@ -6,7 +6,6 @@ from rest_framework.routers import DefaultRouter
from rest_framework_bulk.routes import BulkRouter from rest_framework_bulk.routes import BulkRouter
from .. import api from .. import api
app_name = "ops" app_name = "ops"
router = DefaultRouter() router = DefaultRouter()
@ -15,9 +14,11 @@ bulk_router = BulkRouter()
router.register(r'adhoc', api.AdHocViewSet, 'adhoc') router.register(r'adhoc', api.AdHocViewSet, 'adhoc')
router.register(r'adhoc-executions', api.AdHocExecutionViewSet, 'execution') router.register(r'adhoc-executions', api.AdHocExecutionViewSet, 'execution')
router.register(r'celery/period-tasks', api.CeleryPeriodTaskViewSet, 'celery-period-task') router.register(r'celery/period-tasks', api.CeleryPeriodTaskViewSet, 'celery-period-task')
router.register(r'celery/tasks', api.CeleryTaskViewSet, 'celery-task')
router.register(r'celery/task-executions', api.CeleryTaskExecutionViewSet, 'task-execution')
urlpatterns = [ urlpatterns = [
path('celery/task/<uuid:pk>/log/', api.CeleryTaskLogApi.as_view(), name='celery-task-log'), path('celery/task/<uuid:pk>/log/', api.CeleryTaskExecutionLogApi.as_view(), name='celery-task-log'),
path('celery/task/<uuid:pk>/result/', api.CeleryResultApi.as_view(), name='celery-result'), path('celery/task/<uuid:pk>/result/', api.CeleryResultApi.as_view(), name='celery-result'),
path('ansible/task/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'), path('ansible/task/<uuid:pk>/log/', api.AnsibleTaskLogApi.as_view(), name='ansible-task-log'),