From a4c843ff13265d4ce9d280df62732fdbd8e11169 Mon Sep 17 00:00:00 2001 From: ibuler Date: Mon, 2 Apr 2018 13:19:31 +0800 Subject: [PATCH] =?UTF-8?q?[Update]=20=E8=BF=81=E7=A7=BBcelery=E5=88=B0ops?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/assets/tasks.py | 6 +- apps/common/__init__.py | 2 +- apps/common/api.py | 26 ++-- apps/common/const.py | 3 +- apps/common/models.py | 9 ++ apps/common/tasks.py | 4 +- .../{tail_file.html => celery_task_log.html} | 4 +- apps/common/urls/api_urls.py | 2 +- apps/common/urls/view_urls.py | 3 +- apps/common/utils.py | 18 +++ apps/common/views.py | 32 ++--- apps/ops/__init__.py | 2 +- apps/ops/celery/__init__.py | 18 +++ apps/ops/celery/const.py | 3 + apps/ops/celery/signal_handler.py | 89 ++++++++++++++ .../{common/celery.py => ops/celery/utils.py} | 115 +++++++----------- apps/ops/models.py | 12 +- apps/ops/templates/ops/task_list.html | 3 +- apps/terminal/signals_handler.py | 2 - apps/terminal/tasks.py | 2 +- jms | 4 +- 21 files changed, 224 insertions(+), 135 deletions(-) rename apps/common/templates/common/{tail_file.html => celery_task_log.html} (95%) create mode 100644 apps/ops/celery/__init__.py create mode 100644 apps/ops/celery/const.py create mode 100644 apps/ops/celery/signal_handler.py rename apps/{common/celery.py => ops/celery/utils.py} (64%) diff --git a/apps/assets/tasks.py b/apps/assets/tasks.py index e47d15d41..bfdafe734 100644 --- a/apps/assets/tasks.py +++ b/apps/assets/tasks.py @@ -3,15 +3,15 @@ import json import re import os -import paramiko from celery import shared_task from django.core.cache import cache from django.utils.translation import ugettext as _ from common.utils import get_object_or_none, capacity_convert, \ sum_capacity, encrypt_password, get_logger -from common.celery import register_as_period_task, after_app_shutdown_clean, \ - after_app_ready_start, app as celery_app +from ops.celery.utils import register_as_period_task, after_app_shutdown_clean, \ + after_app_ready_start +from ops.celery import app as celery_app from .models import SystemUser, AdminUser, Asset from . import const diff --git a/apps/common/__init__.py b/apps/common/__init__.py index b64e43e83..fdb34b225 100644 --- a/apps/common/__init__.py +++ b/apps/common/__init__.py @@ -2,4 +2,4 @@ from __future__ import absolute_import # This will make sure the app is always imported when # Django starts so that shared_task will use this app. -from .celery import app as celery_app + diff --git a/apps/common/api.py b/apps/common/api.py index 0e626cacd..11828a81e 100644 --- a/apps/common/api.py +++ b/apps/common/api.py @@ -5,8 +5,8 @@ import json import uuid from django.core.cache import cache -from rest_framework.views import APIView -from rest_framework.views import Response +from rest_framework.generics import RetrieveAPIView +from rest_framework.views import Response, APIView from ldap3 import Server, Connection from django.core.mail import get_connection, send_mail from django.utils.translation import ugettext_lazy as _ @@ -14,7 +14,8 @@ from django.conf import settings from .permissions import IsSuperUser, IsAppUser from .serializers import MailTestSerializer, LDAPTestSerializer -from .const import FILE_END_GUARD +from .celery import FINISHED +from .const import FILE_END_GUARD, celery_task_pre_key class MailTestingAPI(APIView): @@ -111,28 +112,27 @@ class DjangoSettingsAPI(APIView): return Response(configs) -class FileTailApi(APIView): +class CeleryTaskLogApi(APIView): permission_classes = (IsSuperUser,) - default_buff_size = 1024 * 10 + buff_size = 1024 * 10 end = False - buff_size = None def get(self, request, *args, **kwargs): - file_path = request.query_params.get("file") - self.buff_size = request.query_params.get('buffer') or self.default_buff_size + task_id = kwargs.get('pk') + info = cache.get(celery_task_pre_key + task_id, {}) + log_path = info.get("log_path") mark = request.query_params.get("mark") or str(uuid.uuid4()) - if not os.path.isfile(file_path): + if not log_path or not os.path.isfile(log_path): return Response({"data": _("Waiting ...")}, status=203) - with open(file_path, 'r') as f: + with open(log_path, 'r') as f: offset = cache.get(mark, 0) f.seek(offset) data = f.read(self.buff_size).replace('\n', '\r\n') mark = str(uuid.uuid4()) cache.set(mark, f.tell(), 5) - if FILE_END_GUARD in data: - data = data.replace(FILE_END_GUARD, '') + if data == '' and info["status"] == FINISHED: self.end = True - return Response({"data": data, 'end': self.end, 'mark': mark}) \ No newline at end of file + return Response({"data": data, 'end': self.end, 'mark': mark}) diff --git a/apps/common/const.py b/apps/common/const.py index f3669c49d..6652593cb 100644 --- a/apps/common/const.py +++ b/apps/common/const.py @@ -1,8 +1,9 @@ # -*- coding: utf-8 -*- # -from django.utils.translation import ugettext as _ +from django.utils.translation import ugettext_lazy as _ create_success_msg = _("%(name)s was created successfully") update_success_msg = _("%(name)s was updated successfully") FILE_END_GUARD = ">>> Content End <<<" +celery_task_pre_key = "CELERY_" diff --git a/apps/common/models.py b/apps/common/models.py index 1f634bce2..24922300f 100644 --- a/apps/common/models.py +++ b/apps/common/models.py @@ -79,3 +79,12 @@ class Setting(models.Model): class Meta: db_table = "settings" + + +class CeleryTask(models.Model): + id = models.UUIDField() + name = models.CharField(max_length=1024) + status = models.CharField(max_length=128) + date_published = models.DateTimeField(auto_now_add=True) + date_start = models.DateTimeField(null=True) + date_finished = models.DateTimeField(null=True) \ No newline at end of file diff --git a/apps/common/tasks.py b/apps/common/tasks.py index e8d6ba8b0..dec738921 100644 --- a/apps/common/tasks.py +++ b/apps/common/tasks.py @@ -1,13 +1,13 @@ from django.core.mail import send_mail from django.conf import settings -from .celery import app +from celery import shared_task from .utils import get_logger logger = get_logger(__file__) -@app.task +@shared_task def send_mail_async(*args, **kwargs): """ Using celery to send email async diff --git a/apps/common/templates/common/tail_file.html b/apps/common/templates/common/celery_task_log.html similarity index 95% rename from apps/common/templates/common/tail_file.html rename to apps/common/templates/common/celery_task_log.html index 8c24fa707..23b676dd2 100644 --- a/apps/common/templates/common/tail_file.html +++ b/apps/common/templates/common/celery_task_log.html @@ -35,7 +35,7 @@ var rowHeight = 1; var colWidth = 1; var mark = ''; - var url = "{% url 'api-common:tail-file' %}?file={{ file_path }}"; + var url = "{% url 'api-common:celery-task-log' pk=task_id %}"; var term; var end = false; var error = false; @@ -54,7 +54,7 @@ function requestAndWrite() { if (!end) { $.ajax({ - url: url + '&mark=' + mark, + url: url + '?mark=' + mark, method: "GET", contentType: "application/json; charset=utf-8" }).done(function(data, textStatue, jqXHR) { diff --git a/apps/common/urls/api_urls.py b/apps/common/urls/api_urls.py index e1e6c19ee..37629b801 100644 --- a/apps/common/urls/api_urls.py +++ b/apps/common/urls/api_urls.py @@ -10,5 +10,5 @@ urlpatterns = [ url(r'^v1/mail/testing/$', api.MailTestingAPI.as_view(), name='mail-testing'), url(r'^v1/ldap/testing/$', api.LDAPTestingAPI.as_view(), name='ldap-testing'), url(r'^v1/django-settings/$', api.DjangoSettingsAPI.as_view(), name='django-settings'), - url(r'^v1/tail-file/$', api.FileTailApi.as_view(), name='tail-file'), + url(r'^v1/celery/task/(?P[0-9a-zA-Z\-]{36})/log/$', api.CeleryTaskLogApi.as_view(), name='celery-task-log'), ] diff --git a/apps/common/urls/view_urls.py b/apps/common/urls/view_urls.py index d2135f2d4..0483d4dbb 100644 --- a/apps/common/urls/view_urls.py +++ b/apps/common/urls/view_urls.py @@ -12,6 +12,5 @@ urlpatterns = [ url(r'^ldap/$', views.LDAPSettingView.as_view(), name='ldap-setting'), url(r'^terminal/$', views.TerminalSettingView.as_view(), name='terminal-setting'), - url(r'^tail-file/$', views.TailFileView.as_view(), name='tail-file'), - url(r'^celery/task/log/$', views.CeleryTaskLogView.as_view(), name='celery-task-log'), + url(r'^celery/task/(?P[0-9a-zA-Z\-]{36})/log/$', views.CeleryTaskLogView.as_view(), name='celery-task-log'), ] diff --git a/apps/common/utils.py b/apps/common/utils.py index 7e7aa9aae..b4dd0aef8 100644 --- a/apps/common/utils.py +++ b/apps/common/utils.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # import re +import sys from collections import OrderedDict from six import string_types import base64 @@ -360,3 +361,20 @@ def get_signer(): signer = Signer(settings.SECRET_KEY) return signer + +class TeeObj: + origin_stdout = sys.stdout + + def __init__(self, file_obj): + self.file_obj = file_obj + + def write(self, msg): + self.origin_stdout.write(msg) + self.file_obj.write(msg.replace('*', '')) + + def flush(self): + self.origin_stdout.flush() + self.file_obj.flush() + + def close(self): + self.file_obj.close() diff --git a/apps/common/views.py b/apps/common/views.py index a57834a77..e51400d12 100644 --- a/apps/common/views.py +++ b/apps/common/views.py @@ -1,6 +1,6 @@ from django.core.cache import cache -from django.views.generic import TemplateView, View +from django.views.generic import TemplateView, View, DetailView from django.shortcuts import render, redirect, Http404, reverse from django.contrib import messages from django.utils.translation import ugettext as _ @@ -8,7 +8,6 @@ from django.conf import settings from .forms import EmailSettingForm, LDAPSettingForm, BasicSettingForm, \ TerminalSettingForm -from .models import Setting from .mixins import AdminUserRequiredMixin from .signals import ldap_auth_enable @@ -123,33 +122,18 @@ class TerminalSettingView(AdminUserRequiredMixin, TemplateView): return render(request, self.template_name, context) -class TailFileView(AdminUserRequiredMixin, TemplateView): - template_name = 'common/tail_file.html' - - def get_context_data(self, **kwargs): - file_path = self.request.GET.get("file") - context = super().get_context_data(**kwargs) - context.update({"file_path": file_path}) - return context - - class CeleryTaskLogView(AdminUserRequiredMixin, TemplateView): - template_name = 'common/tail_file.html' + template_name = 'common/celery_task_log.html' task_log_path = None - def get(self, request, *args, **kwargs): - task = self.request.GET.get('task') - if not task: - raise Http404("Not found task") - - self.task_log_path = cache.get(task) - if not self.task_log_path: - raise Http404("Not found task log file") - return super().get(request, *args, **kwargs) - def get_context_data(self, **kwargs): context = super().get_context_data(**kwargs) + task_id = self.kwargs.get("pk") + + if cache.get(celery_task_pre_key+task_id) is None: + raise Http404() + context.update({ - 'file_path': self.task_log_path + "task_id": self.kwargs.get("pk") }) return context diff --git a/apps/ops/__init__.py b/apps/ops/__init__.py index 8b1378917..cf2e85f6d 100644 --- a/apps/ops/__init__.py +++ b/apps/ops/__init__.py @@ -1 +1 @@ - +from .celery import app as celery_app diff --git a/apps/ops/celery/__init__.py b/apps/ops/celery/__init__.py new file mode 100644 index 000000000..9cdcb3063 --- /dev/null +++ b/apps/ops/celery/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- + +import os + +from celery import Celery + +# set the default Django settings module for the 'celery' program. +os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') + +from django.conf import settings + +app = Celery('jumpserver') + +# Using a string here means the worker will not have to +# pickle the object when using Windows. +app.config_from_object('django.conf:settings', namespace='CELERY') +app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) + diff --git a/apps/ops/celery/const.py b/apps/ops/celery/const.py new file mode 100644 index 000000000..3d98261b1 --- /dev/null +++ b/apps/ops/celery/const.py @@ -0,0 +1,3 @@ +# -*- coding: utf-8 -*- +# + diff --git a/apps/ops/celery/signal_handler.py b/apps/ops/celery/signal_handler.py new file mode 100644 index 000000000..6ce6c5d9d --- /dev/null +++ b/apps/ops/celery/signal_handler.py @@ -0,0 +1,89 @@ +# -*- coding: utf-8 -*- +# +import os +import datetime +import sys + +from django.conf import settings +from django.core.cache import cache +from celery import subtask +from celery.signals import worker_ready, worker_shutdown, task_prerun, \ + task_postrun, after_task_publish + +from django_celery_beat.models import PeriodicTask + +from common.utils import get_logger, TeeObj +from common.const import celery_task_pre_key + +from .utils import get_after_app_ready_tasks, get_after_app_shutdown_clean_tasks + + +logger = get_logger(__file__) + +WAITING = "waiting" +RUNNING = "running" +FINISHED = "finished" + +EXPIRE_TIME = 3600 + + +@worker_ready.connect +def on_app_ready(sender=None, headers=None, body=None, **kwargs): + if cache.get("CELERY_APP_READY", 0) == 1: + return + cache.set("CELERY_APP_READY", 1, 10) + logger.debug("App ready signal recv") + tasks = get_after_app_ready_tasks() + logger.debug("Start need start task: [{}]".format( + ", ".join(tasks)) + ) + for task in tasks: + subtask(task).delay() + + +@worker_shutdown.connect +def after_app_shutdown(sender=None, headers=None, body=None, **kwargs): + if cache.get("CELERY_APP_SHUTDOWN", 0) == 1: + return + cache.set("CELERY_APP_SHUTDOWN", 1, 10) + tasks = get_after_app_shutdown_clean_tasks() + logger.debug("App shutdown signal recv") + logger.debug("Clean need cleaned period tasks: [{}]".format( + ', '.join(tasks)) + ) + PeriodicTask.objects.filter(name__in=tasks).delete() + + +@task_prerun.connect +def pre_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): + task_key = celery_task_pre_key + task_id + info = cache.get(task_key, {}) + now = datetime.datetime.now().strftime("%Y-%m-%d") + log_dir = os.path.join(settings.PROJECT_DIR, "data", "celery", now) + if not os.path.exists(log_dir): + os.makedirs(log_dir) + log_path = os.path.join(log_dir, task_id + '.log') + info.update({"status": RUNNING, "log_path": log_path}) + cache.set(task_key, info, EXPIRE_TIME) + f = open(log_path, 'w') + tee = TeeObj(f) + sys.stdout = tee + task.log_f = tee + + +@task_postrun.connect +def post_run_task_signal_handler(sender, task_id=None, task=None, **kwargs): + task_key = celery_task_pre_key + task_id + info = cache.get(task_key, {}) + info.update({"status": FINISHED}) + cache.set(task_key, info, EXPIRE_TIME) + task.log_f.flush() + sys.stdout = task.log_f.origin_stdout + task.log_f.close() + + +@after_task_publish.connect +def after_task_publish_signal_handler(sender, headers=None, **kwargs): + task_id = headers["id"] + key = celery_task_pre_key + task_id + cache.set(key, {"status": WAITING}, EXPIRE_TIME) \ No newline at end of file diff --git a/apps/common/celery.py b/apps/ops/celery/utils.py similarity index 64% rename from apps/common/celery.py rename to apps/ops/celery/utils.py index 352f1ff45..b4f5a80db 100644 --- a/apps/common/celery.py +++ b/apps/ops/celery/utils.py @@ -1,33 +1,50 @@ -# ~*~ coding: utf-8 ~*~ - -import os +# -*- coding: utf-8 -*- +# import json from functools import wraps -from celery import Celery, subtask -from celery.signals import worker_ready, worker_shutdown, task_prerun, task_postrun from django.db.utils import ProgrammingError, OperationalError - -from .utils import get_logger - -logger = get_logger(__file__) - -# set the default Django settings module for the 'celery' program. -os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') - -from django.conf import settings from django.core.cache import cache +from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule -app = Celery('jumpserver') -# Using a string here means the worker will not have to -# pickle the object when using Windows. -app.config_from_object('django.conf:settings', namespace='CELERY') -app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) +def add_register_period_task(name): + key = "__REGISTER_PERIODIC_TASKS" + value = cache.get(key, []) + value.append(name) + cache.set(key, value) + + +def get_register_period_tasks(): + key = "__REGISTER_PERIODIC_TASKS" + return cache.get(key, []) + + +def add_after_app_shutdown_clean_task(name): + key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" + value = cache.get(key, []) + value.append(name) + cache.set(key, value) + + +def get_after_app_shutdown_clean_tasks(): + key = "__AFTER_APP_SHUTDOWN_CLEAN_TASKS" + return cache.get(key, []) + + +def add_after_app_ready_task(name): + key = "__AFTER_APP_READY_RUN_TASKS" + value = cache.get(key, []) + value.append(name) + cache.set(key, value) + + +def get_after_app_ready_tasks(): + key = "__AFTER_APP_READY_RUN_TASKS" + return cache.get(key, []) def create_or_update_celery_periodic_tasks(tasks): - from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule """ :param tasks: { 'add-every-monday-morning': { @@ -106,11 +123,6 @@ def delete_celery_periodic_task(task_name): PeriodicTask.objects.filter(name=task_name).delete() -__REGISTER_PERIODIC_TASKS = [] -__AFTER_APP_SHUTDOWN_CLEAN_TASKS = [] -__AFTER_APP_READY_RUN_TASKS = [] - - def register_as_period_task(crontab=None, interval=None): """ Warning: Task must be have not any args and kwargs @@ -128,7 +140,7 @@ def register_as_period_task(crontab=None, interval=None): # Because when this decorator run, the task was not created, # So we can't use func.name name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in __REGISTER_PERIODIC_TASKS: + if name not in get_register_period_tasks(): create_or_update_celery_periodic_tasks({ name: { 'task': name, @@ -138,7 +150,7 @@ def register_as_period_task(crontab=None, interval=None): 'enabled': True, } }) - __REGISTER_PERIODIC_TASKS.append(name) + add_register_period_task(name) @wraps(func) def wrapper(*args, **kwargs): @@ -151,13 +163,12 @@ def after_app_ready_start(func): # Because when this decorator run, the task was not created, # So we can't use func.name name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in __AFTER_APP_READY_RUN_TASKS: - __AFTER_APP_READY_RUN_TASKS.append(name) + if name not in get_after_app_ready_tasks(): + add_after_app_ready_task(name) @wraps(func) def decorate(*args, **kwargs): return func(*args, **kwargs) - return decorate @@ -165,50 +176,10 @@ def after_app_shutdown_clean(func): # Because when this decorator run, the task was not created, # So we can't use func.name name = '{func.__module__}.{func.__name__}'.format(func=func) - if name not in __AFTER_APP_READY_RUN_TASKS: - __AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name) + if name not in get_after_app_shutdown_clean_tasks(): + add_after_app_shutdown_clean_task(name) @wraps(func) def decorate(*args, **kwargs): return func(*args, **kwargs) - return decorate - - -@worker_ready.connect -def on_app_ready(sender=None, headers=None, body=None, **kwargs): - if cache.get("CELERY_APP_READY", 0) == 1: - return - cache.set("CELERY_APP_READY", 1, 10) - logger.debug("App ready signal recv") - logger.debug("Start need start task: [{}]".format( - ", ".join(__AFTER_APP_READY_RUN_TASKS)) - ) - for task in __AFTER_APP_READY_RUN_TASKS: - subtask(task).delay() - - -@worker_shutdown.connect -def after_app_shutdown(sender=None, headers=None, body=None, **kwargs): - if cache.get("CELERY_APP_SHUTDOWN", 0) == 1: - return - cache.set("CELERY_APP_SHUTDOWN", 1, 10) - from django_celery_beat.models import PeriodicTask - logger.debug("App shutdown signal recv") - logger.debug("Clean need cleaned period tasks: [{}]".format( - ', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS)) - ) - PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete() - - -@task_prerun.connect -def pre_run_task_signal_handler(sender, task, *args, **kwargs): - - print("Sender: {}".format(sender)) - print("Task: {}".format(task)) - - -@task_postrun.connect -def post_run_task_signal_handler(sender, task, *args, **kwargs): - print("Sender: {}".format(sender)) - print("Task: {}".format(task)) \ No newline at end of file diff --git a/apps/ops/models.py b/apps/ops/models.py index 62bd7d084..636d27b0c 100644 --- a/apps/ops/models.py +++ b/apps/ops/models.py @@ -15,7 +15,7 @@ from django_celery_beat.models import CrontabSchedule, IntervalSchedule, \ PeriodicTask from common.utils import get_signer, get_logger -from common.celery import delete_celery_periodic_task, \ +from .celery.utils import delete_celery_periodic_task, \ create_or_update_celery_periodic_tasks, \ disable_celery_periodic_task from .ansible import AdHocRunner, AnsibleError @@ -218,14 +218,12 @@ class AdHoc(models.Model): hid = str(uuid.uuid4()) history = AdHocRunHistory(id=hid, adhoc=self, task=self.task) time_start = time.time() - # f = open(history.log_path, 'w') try: - date_start = timezone.now().strftime('%Y-%m-%d %H:%M:%S') - # f.write("{} {}\r\n\r\n".format(date_start, self.task.name)) + date_start = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print("{} Start task: {}\r\n".format(date_start, self.task.name)) raw, summary = self._run_only() - # raw, summary = self._run_only(file_obj=f) - date_end = timezone.now().strftime('%Y-%m-%d %H:%M:%S') - # f.write("\r\n{} Task finish\r\n".format(date_end)) + date_end = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print("\r\n{} Task finished".format(date_end)) history.is_finished = True if summary.get('dark'): history.is_success = False diff --git a/apps/ops/templates/ops/task_list.html b/apps/ops/templates/ops/task_list.html index 37a73b4c0..af4c8ea8f 100644 --- a/apps/ops/templates/ops/task_list.html +++ b/apps/ops/templates/ops/task_list.html @@ -113,7 +113,8 @@ $(document).ready(function() { }; var success = function(data) { var task_id = data.task; - window.location = "{% url 'ops:adhoc-history-output' pk=DEFAULT_PK %}".replace("{{ DEFAULT_PK }}", task_id); + var url = '{% url "common:celery-task-log" pk=DEFAULT_PK %}'.replace("{{ DEFAULT_PK }}", task_id); + window.open(url, '', 'width=800,height=800') }; APIUpdateAttr({ url: the_url, diff --git a/apps/terminal/signals_handler.py b/apps/terminal/signals_handler.py index 3926a5751..883dd51f2 100644 --- a/apps/terminal/signals_handler.py +++ b/apps/terminal/signals_handler.py @@ -5,8 +5,6 @@ from django.core.cache import cache from django.db.utils import ProgrammingError, OperationalError from common.utils import get_logger -from common.celery import after_app_ready_start, register_as_period_task, \ - after_app_shutdown_clean from .const import ASSETS_CACHE_KEY, USERS_CACHE_KEY, SYSTEM_USER_CACHE_KEY RUNNING = False diff --git a/apps/terminal/tasks.py b/apps/terminal/tasks.py index e267b30b7..4e57c5f5e 100644 --- a/apps/terminal/tasks.py +++ b/apps/terminal/tasks.py @@ -6,7 +6,7 @@ import datetime from celery import shared_task from django.utils import timezone -from common.celery import register_as_period_task, after_app_ready_start, \ +from ops.celery.utils import register_as_period_task, after_app_ready_start, \ after_app_shutdown_clean from .models import Status, Session diff --git a/jms b/jms index ea48ab6a7..0f07f0760 100755 --- a/jms +++ b/jms @@ -155,7 +155,7 @@ def start_celery(): cmd = [ 'celery', 'worker', - '-A', 'common', + '-A', 'ops', '-l', LOG_LEVEL.lower(), '--pidfile', pid_file, '-c', str(WORKERS), @@ -182,7 +182,7 @@ def start_beat(): scheduler = "django_celery_beat.schedulers:DatabaseScheduler" cmd = [ 'celery', 'beat', - '-A', 'common', + '-A', 'ops', '--pidfile', pid_file, '-l', LOG_LEVEL, '--scheduler', scheduler,