Browse Source

feat: celery task api

pull/8997/head
Aaron3S 2 years ago
parent
commit
da911651aa
  1. 2
      apps/ops/api/celery.py
  2. 18
      apps/ops/migrations/0028_celerytask_last_published_time.py
  3. 33
      apps/ops/models/celery.py
  4. 2
      apps/ops/serializers/celery.py
  5. 1
      apps/ops/signal_handlers.py
  6. 21
      apps/ops/tasks.py
  7. 1
      apps/rbac/const.py

2
apps/ops/api/celery.py

@ -99,7 +99,7 @@ class CeleryPeriodTaskViewSet(CommonApiMixin, viewsets.ModelViewSet):
class CeleryTaskViewSet(CommonApiMixin, viewsets.ReadOnlyModelViewSet):
queryset = CeleryTask.objects.all()
queryset = CeleryTask.objects.filter(name__in=['ops.tasks.hello', 'ops.tasks.hello_error', 'ops.tasks.hello_random'])
serializer_class = CeleryTaskSerializer
http_method_names = ('get', 'head', 'options',)

18
apps/ops/migrations/0028_celerytask_last_published_time.py

@ -0,0 +1,18 @@
# Generated by Django 3.2.14 on 2022-10-27 06:35
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0027_auto_20221024_1709'),
]
operations = [
migrations.AddField(
model_name='celerytask',
name='last_published_time',
field=models.DateTimeField(null=True),
),
]

33
apps/ops/models/celery.py

@ -13,18 +13,37 @@ from ops.celery import app
class CeleryTask(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024)
last_published_time = models.DateTimeField(null=True)
@property
def verbose_name(self):
def meta(self):
task = app.tasks.get(self.name, None)
if task:
return getattr(task, 'verbose_name', None)
return {
"verbose_name": getattr(task, 'verbose_name', None),
"comment": getattr(task, 'comment', None),
"queue": getattr(task, 'queue', 'default')
}
@property
def description(self):
task = app.tasks.get(self.name, None)
if task:
return getattr(task, 'description', None)
def success_count(self):
return CeleryTaskExecution.objects.filter(name=self.name, state='SUCCESS').count()
@property
def publish_count(self):
return CeleryTaskExecution.objects.filter(name=self.name).count()
@property
def state(self):
last_five_executions = CeleryTaskExecution.objects.filter(name=self.name).order_by('-date_published')[:5]
if len(last_five_executions) > 0:
if last_five_executions[0].state == 'FAILURE':
return "red"
for execution in last_five_executions:
if execution.state == 'FAILURE':
return "yellow"
return "green"
class CeleryTaskExecution(models.Model):

2
apps/ops/serializers/celery.py

@ -31,7 +31,7 @@ class CeleryTaskSerializer(serializers.ModelSerializer):
class Meta:
model = CeleryTask
fields = [
'id', 'name', 'verbose_name', 'description',
'id', 'name', 'meta', 'publish_count', 'state', 'success_count', 'last_published_time',
]

1
apps/ops/signal_handlers.py

@ -92,3 +92,4 @@ def task_sent_handler(headers=None, body=None, **kwargs):
'kwargs': kwargs
}
CeleryTaskExecution.objects.create(**data)
CeleryTask.objects.filter(name=task).update(last_published_time=timezone.now())

21
apps/ops/tasks.py

@ -1,5 +1,6 @@
# coding: utf-8
import os
import random
import subprocess
from django.conf import settings
@ -76,7 +77,7 @@ def run_playbook(pid, **kwargs):
@shared_task
@after_app_shutdown_clean_periodic
@register_as_period_task(interval=3600*24, description=_("Clean task history period"))
@register_as_period_task(interval=3600 * 24, description=_("Clean task history period"))
def clean_tasks_adhoc_period():
logger.debug("Start clean task adhoc and run history")
tasks = Task.objects.all()
@ -89,7 +90,7 @@ def clean_tasks_adhoc_period():
@shared_task
@after_app_shutdown_clean_periodic
@register_as_period_task(interval=3600*24, description=_("Clean celery log period"))
@register_as_period_task(interval=3600 * 24, description=_("Clean celery log period"))
def clean_celery_tasks_period():
logger.debug("Start clean celery task history")
expire_days = get_log_keep_day('TASK_LOG_KEEP_DAYS')
@ -143,7 +144,7 @@ def check_server_performance_period():
ServerPerformanceCheckUtil().check_and_publish()
@shared_task(queue="ansible", verbose_name=_("Hello"))
@shared_task(queue="ansible", verbose_name=_("Hello"), comment="an test shared task")
def hello(name, callback=None):
from users.models import User
import time
@ -155,6 +156,18 @@ def hello(name, callback=None):
return gettext("Hello")
@shared_task(verbose_name="Hello Error", comment="an test shared task error")
def hello_error():
raise Exception("must be error")
@shared_task(verbose_name="Hello Random", comment="some time error and some time success")
def hello_random():
i = random.randint(0, 1)
if i == 1:
raise Exception("must be error")
@shared_task
def hello_callback(result):
print(result)
@ -171,5 +184,3 @@ def execute_automation_strategy(pid, trigger):
return
with tmp_to_org(instance.org):
instance.execute(trigger)

1
apps/rbac/const.py

@ -57,7 +57,6 @@ exclude_permissions = (
('rbac', 'role', '*', '*'),
('ops', 'adhoc', 'delete,change', '*'),
('ops', 'adhocexecution', 'add,delete,change', '*'),
('ops', 'celerytask', '*', '*'),
('ops', 'task', 'add,change', 'task'),
('ops', 'commandexecution', 'delete,change', 'commandexecution'),
('orgs', 'organizationmember', '*', '*'),

Loading…
Cancel
Save