diff --git a/apps/common/decorator.py b/apps/common/decorator.py index 6edc4f3c3..1fc2b2a88 100644 --- a/apps/common/decorator.py +++ b/apps/common/decorator.py @@ -10,3 +10,15 @@ def on_transaction_commit(func): def inner(*args, **kwargs): transaction.on_commit(lambda: func(*args, **kwargs)) return inner + + +class Singleton(object): + """ 单例类 """ + def __init__(self, cls): + self._cls = cls + self._instance = {} + + def __call__(self): + if self._cls not in self._instance: + self._instance[self._cls] = self._cls() + return self._instance[self._cls] diff --git a/apps/common/management/__init__.py b/apps/common/management/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/__init__.py b/apps/common/management/commands/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/restart.py b/apps/common/management/commands/restart.py new file mode 100644 index 000000000..57285f9c9 --- /dev/null +++ b/apps/common/management/commands/restart.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Restart services' + action = Action.restart.value diff --git a/apps/common/management/commands/services/__init__.py b/apps/common/management/commands/services/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/common/management/commands/services/command.py b/apps/common/management/commands/services/command.py new file mode 100644 index 000000000..17a09251b --- /dev/null +++ b/apps/common/management/commands/services/command.py @@ -0,0 +1,139 @@ +from django.core.management.base import BaseCommand, CommandError +from django.db.models import TextChoices +from .utils import ServicesUtil +from .hands import * + + +class Services(TextChoices): + gunicorn = 'gunicorn', 'gunicorn' + daphne = 'daphne', 'daphne' + celery_ansible = 'celery_ansible', 'celery_ansible' + celery_default = 'celery_default', 'celery_default' + beat = 'beat', 'beat' + flower = 'flower', 'flower' + ws = 'ws', 'ws' + web = 'web', 'web' + celery = 'celery', 'celery' + task = 'task', 'task' + all = 'all', 'all' + + @classmethod + def get_service_object_class(cls, name): + from . import services + services_map = { + cls.gunicorn.value: services.GunicornService, + cls.daphne: services.DaphneService, + cls.flower: services.FlowerService, + cls.celery_default: services.CeleryDefaultService, + cls.celery_ansible: services.CeleryAnsibleService, + cls.beat: services.BeatService + } + return services_map.get(name) + + @classmethod + def ws_services(cls): + return [cls.daphne] + + @classmethod + def web_services(cls): + return [cls.gunicorn, cls.daphne] + + @classmethod + def celery_services(cls): + return [cls.celery_ansible, cls.celery_default] + + @classmethod + def task_services(cls): + return cls.celery_services() + [cls.beat] + + @classmethod + def all_services(cls): + return cls.web_services() + cls.task_services() + + @classmethod + def export_services_values(cls): + return [cls.all.value, cls.web.value, cls.task.value] + + @classmethod + def get_service_objects(cls, service_names, **kwargs): + services = set() + for name in service_names: + method_name = f'{name}_services' + if hasattr(cls, method_name): + _services = getattr(cls, method_name)() + elif hasattr(cls, name): + _services = [getattr(cls, name)] + else: + continue + services.update(set(_services)) + + service_objects = [] + for s in services: + service_class = cls.get_service_object_class(s.value) + if not service_class: + continue + kwargs.update({ + 'name': s.value + }) + service_object = service_class(**kwargs) + service_objects.append(service_object) + return service_objects + + +class Action(TextChoices): + start = 'start', 'start' + status = 'status', 'status' + stop = 'stop', 'stop' + restart = 'restart', 'restart' + + +class BaseActionCommand(BaseCommand): + help = 'Service Base Command' + + action = None + util = None + + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + def add_arguments(self, parser): + parser.add_argument( + 'services', nargs='+', choices=Services.export_services_values(), help='Service', + ) + parser.add_argument('-d', '--daemon', nargs="?", const=True) + parser.add_argument('-w', '--worker', type=int, nargs="?", default=4) + parser.add_argument('-f', '--force', nargs="?", const=True) + + def initial_util(self, *args, **options): + service_names = options.get('services') + service_kwargs = { + 'worker_gunicorn': options.get('worker') + } + services = Services.get_service_objects(service_names=service_names, **service_kwargs) + + kwargs = { + 'services': services, + 'daemon_run': options.get('daemon', False), + 'daemon_stop': Services.all.value in service_names, + 'force_stop': options.get('force', False), + } + self.util = ServicesUtil(**kwargs) + + def handle(self, *args, **options): + self.initial_util(*args, **options) + assert self.action in Action.values, f'The action {self.action} is not in the optional list' + _handle = getattr(self, f'_handle_{self.action}', lambda: None) + _handle() + + def _handle_start(self): + self.util.start_and_watch() + os._exit(0) + + def _handle_stop(self): + self.util.stop() + + def _handle_restart(self): + self.util.restart() + + def _handle_status(self): + self.util.show_status() diff --git a/apps/common/management/commands/services/hands.py b/apps/common/management/commands/services/hands.py new file mode 100644 index 000000000..eb6aba418 --- /dev/null +++ b/apps/common/management/commands/services/hands.py @@ -0,0 +1,26 @@ +import os +import sys +import logging +from django.conf import settings + +from apps.jumpserver.const import CONFIG + +try: + from apps.jumpserver import const + __version__ = const.VERSION +except ImportError as e: + print("Not found __version__: {}".format(e)) + print("Python is: ") + logging.info(sys.executable) + __version__ = 'Unknown' + sys.exit(1) + + +HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' +HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 +WS_PORT = CONFIG.WS_LISTEN_PORT or 8082 +DEBUG = CONFIG.DEBUG or False +BASE_DIR = os.path.dirname(settings.BASE_DIR) +LOG_DIR = os.path.join(BASE_DIR, 'logs') +APPS_DIR = os.path.join(BASE_DIR, 'apps') +TMP_DIR = os.path.join(BASE_DIR, 'tmp') diff --git a/apps/common/management/commands/services/services/__init__.py b/apps/common/management/commands/services/services/__init__.py new file mode 100644 index 000000000..cceb9627c --- /dev/null +++ b/apps/common/management/commands/services/services/__init__.py @@ -0,0 +1,6 @@ +from .beat import * +from .celery_ansible import * +from .celery_default import * +from .daphne import * +from .flower import * +from .gunicorn import * diff --git a/apps/common/management/commands/services/services/base.py b/apps/common/management/commands/services/services/base.py new file mode 100644 index 000000000..e158dacb7 --- /dev/null +++ b/apps/common/management/commands/services/services/base.py @@ -0,0 +1,188 @@ +import abc +import time +import shutil +import datetime +import threading +import subprocess +from ..hands import * + + +class BaseService(object): + + def __init__(self, **kwargs): + self.name = kwargs['name'] + self.process = None + self.STOP_TIMEOUT = 10 + self.max_retry = 0 + self.retry = 3 + self.LOG_KEEP_DAYS = 7 + self.EXIT_EVENT = threading.Event() + self.LOCK = threading.Lock() + + @property + @abc.abstractmethod + def cmd(self): + return [] + + @property + @abc.abstractmethod + def cwd(self): + return '' + + @property + def is_running(self): + if self.pid == 0: + return False + try: + os.kill(self.pid, 0) + except (OSError, ProcessLookupError): + return False + else: + return True + + def show_status(self): + if self.is_running: + msg = f'{self.name} is running: {self.pid}.' + else: + msg = f'{self.name} is stopped.' + print(msg) + + # -- log -- + @property + def log_filename(self): + return f'{self.name}.log' + + @property + def log_filepath(self): + return os.path.join(LOG_DIR, self.log_filename) + + @property + def log_file(self): + return open(self.log_filepath, 'a') + + @property + def log_dir(self): + return os.path.dirname(self.log_filepath) + # -- end log -- + + # -- pid -- + @property + def pid_filepath(self): + return os.path.join(TMP_DIR, f'{self.name}.pid') + + @property + def pid(self): + if not os.path.isfile(self.pid_filepath): + return 0 + with open(self.pid_filepath) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = 0 + return pid + + def write_pid(self): + with open(self.pid_filepath, 'w') as f: + f.write(str(self.process.pid)) + + def remove_pid(self): + if os.path.isfile(self.pid_filepath): + os.unlink(self.pid_filepath) + # -- end pid -- + + # -- action -- + def open_subprocess(self): + kwargs = {'cwd': self.cwd, 'stderr': self.log_file, 'stdout': self.log_file} + self.process = subprocess.Popen(self.cmd, **kwargs) + + def start(self): + if self.is_running: + self.show_status() + return + self.remove_pid() + self.open_subprocess() + self.write_pid() + self.start_other() + + def start_other(self): + pass + + def stop(self, force=True): + if not self.is_running: + self.show_status() + # self.remove_pid() + return + + print(f'Stop service: {self.name}', end='') + sig = 9 if force else 15 + os.kill(self.pid, sig) + + try: + self.process.wait(2) + except: + pass + + for i in range(self.STOP_TIMEOUT): + if i == self.STOP_TIMEOUT - 1: + print("\033[31m Error\033[0m") + if not self.is_running: + print("\033[32m Ok\033[0m") + self.remove_pid() + break + else: + time.sleep(1) + continue + + def watch(self): + with self.LOCK: + self._check() + if not self.is_running: + self._restart() + self._rotate_log() + + def _check(self): + now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') + print(f"{now} Check service status: {self.name} -> ", end='') + try: + self.process.wait(timeout=1) # 不wait,子进程可能无法回收 + except subprocess.TimeoutExpired: + pass + if self.is_running: + print(f'running at {self.process.pid}') + else: + print(f'stopped with code: {self.process.returncode}({self.process.pid})') + + def _restart(self): + if self.retry > self.max_retry: + logging.info("Service start failed, exit: ", self.name) + self.EXIT_EVENT.set() + return + self.retry += 1 + logging.info(f'> Find {self.name} stopped, retry {self.retry}, {self.process.pid}') + self.start() + + def _rotate_log(self): + now = datetime.datetime.now() + _time = now.strftime('%H:%M') + if _time != '23:59': + return + + backup_date = now.strftime('%Y-%m-%d') + backup_log_dir = os.path.join(self.log_dir, backup_date) + if not os.path.exists(backup_log_dir): + os.mkdir(backup_log_dir) + + backup_log_path = os.path.join(backup_log_dir, self.log_filename) + if os.path.isfile(self.log_filepath) and not os.path.isfile(backup_log_path): + logging.info(f'Rotate log file: {self.log_filepath} => {backup_log_path}') + shutil.copy(self.log_filepath, backup_log_path) + with open(self.log_filepath, 'w') as f: + pass + + to_delete_date = now - datetime.timedelta(days=self.LOG_KEEP_DAYS) + to_delete_dir = os.path.join(LOG_DIR, to_delete_date.strftime('%Y-%m-%d')) + if os.path.exists(to_delete_dir): + logging.info(f'Remove old log: {to_delete_dir}') + shutil.rmtree(to_delete_dir, ignore_errors=True) + # -- end action -- + diff --git a/apps/common/management/commands/services/services/beat.py b/apps/common/management/commands/services/services/beat.py new file mode 100644 index 000000000..de1f9f268 --- /dev/null +++ b/apps/common/management/commands/services/services/beat.py @@ -0,0 +1,25 @@ +from ..hands import * +from .base import BaseService +from django.core.cache import cache + + +__all__ = ['BeatService'] + + +class BeatService(BaseService): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + self.lock = cache.lock('beat-distribute-start-lock', expire=60) + + @property + def cmd(self): + print("\n- Start Beat as Periodic Task Scheduler") + cmd = [ + sys.executable, 'start_celery_beat.py', + ] + return cmd + + @property + def cwd(self): + return os.path.join(BASE_DIR, 'utils') diff --git a/apps/common/management/commands/services/services/celery_ansible.py b/apps/common/management/commands/services/services/celery_ansible.py new file mode 100644 index 000000000..a6c6608b7 --- /dev/null +++ b/apps/common/management/commands/services/services/celery_ansible.py @@ -0,0 +1,11 @@ +from .celery_base import CeleryBaseService + +__all__ = ['CeleryAnsibleService'] + + +class CeleryAnsibleService(CeleryBaseService): + + def __init__(self, **kwargs): + kwargs['queue'] = 'ansible' + super().__init__(**kwargs) + diff --git a/apps/common/management/commands/services/services/celery_base.py b/apps/common/management/commands/services/services/celery_base.py new file mode 100644 index 000000000..5542fd72f --- /dev/null +++ b/apps/common/management/commands/services/services/celery_base.py @@ -0,0 +1,38 @@ +from ..hands import * +from .base import BaseService + + +class CeleryBaseService(BaseService): + + def __init__(self, queue, num=10, **kwargs): + super().__init__(**kwargs) + self.queue = queue + self.num = num + + @property + def cmd(self): + print('\n- Start Celery as Distributed Task Queue: {}'.format(self.queue.capitalize())) + + os.environ.setdefault('PYTHONOPTIMIZE', '1') + os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True') + + if os.getuid() == 0: + os.environ.setdefault('C_FORCE_ROOT', '1') + server_hostname = os.environ.get("SERVER_HOSTNAME") + if not server_hostname: + server_hostname = '%h' + + cmd = [ + 'celery', 'worker', + '-P', 'threads', + '-A', 'ops', + '-l', 'INFO', + '-c', str(self.num), + '-Q', self.queue, + '-n', f'{self.queue}@{server_hostname}' + ] + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/services/celery_default.py b/apps/common/management/commands/services/services/celery_default.py new file mode 100644 index 000000000..ad3d69fe1 --- /dev/null +++ b/apps/common/management/commands/services/services/celery_default.py @@ -0,0 +1,16 @@ +from .celery_base import CeleryBaseService + +__all__ = ['CeleryDefaultService'] + + +class CeleryDefaultService(CeleryBaseService): + + def __init__(self, **kwargs): + kwargs['queue'] = 'celery' + super().__init__(**kwargs) + + def start_other(self): + from terminal.startup import CeleryTerminal + celery_terminal = CeleryTerminal() + celery_terminal.start_heartbeat_thread() + diff --git a/apps/common/management/commands/services/services/daphne.py b/apps/common/management/commands/services/services/daphne.py new file mode 100644 index 000000000..09dd337a6 --- /dev/null +++ b/apps/common/management/commands/services/services/daphne.py @@ -0,0 +1,25 @@ +from ..hands import * +from .base import BaseService + +__all__ = ['DaphneService'] + + +class DaphneService(BaseService): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @property + def cmd(self): + print("\n- Start Daphne ASGI WS Server") + + cmd = [ + 'daphne', 'jumpserver.asgi:application', + '-b', HTTP_HOST, + '-p', str(WS_PORT), + ] + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/services/flower.py b/apps/common/management/commands/services/services/flower.py new file mode 100644 index 000000000..df2230776 --- /dev/null +++ b/apps/common/management/commands/services/services/flower.py @@ -0,0 +1,31 @@ +from ..hands import * +from .base import BaseService + +__all__ = ['FlowerService'] + + +class FlowerService(BaseService): + + def __init__(self, **kwargs): + super().__init__(**kwargs) + + @property + def cmd(self): + print("\n- Start Flower as Task Monitor") + + if os.getuid() == 0: + os.environ.setdefault('C_FORCE_ROOT', '1') + cmd = [ + 'celery', 'flower', + '-A', 'ops', + '-l', 'INFO', + '--url_prefix=/core/flower', + '--auto_refresh=False', + '--max_tasks=1000', + '--tasks_columns=uuid,name,args,state,received,started,runtime,worker' + ] + return cmd + + @property + def cwd(self): + return APPS_DIR diff --git a/apps/common/management/commands/services/services/gunicorn.py b/apps/common/management/commands/services/services/gunicorn.py new file mode 100644 index 000000000..bfaeea8c4 --- /dev/null +++ b/apps/common/management/commands/services/services/gunicorn.py @@ -0,0 +1,40 @@ +from ..hands import * +from .base import BaseService + +__all__ = ['GunicornService'] + + +class GunicornService(BaseService): + + def __init__(self, **kwargs): + self.worker = kwargs['worker_gunicorn'] + super().__init__(**kwargs) + + @property + def cmd(self): + print("\n- Start Gunicorn WSGI HTTP Server") + + log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s ' + bind = f'{HTTP_HOST}:{HTTP_PORT}' + cmd = [ + 'gunicorn', 'jumpserver.wsgi', + '-b', bind, + '-k', 'gthread', + '--threads', '10', + '-w', str(self.worker), + '--max-requests', '4096', + '--access-logformat', log_format, + '--access-logfile', '-' + ] + if DEBUG: + cmd.append('--reload') + return cmd + + @property + def cwd(self): + return APPS_DIR + + def start_other(self): + from terminal.startup import CoreTerminal + core_terminal = CoreTerminal() + core_terminal.start_heartbeat_thread() diff --git a/apps/common/management/commands/services/utils.py b/apps/common/management/commands/services/utils.py new file mode 100644 index 000000000..769810e7a --- /dev/null +++ b/apps/common/management/commands/services/utils.py @@ -0,0 +1,141 @@ +import threading +import signal +import time +import daemon +from daemon import pidfile +from .hands import * +from .hands import __version__ +from .services.base import BaseService + + +class ServicesUtil(object): + + def __init__(self, services, daemon_run=False, force_stop=True, daemon_stop=False): + self._services = services + self.daemon_run = daemon_run + self.force_stop = force_stop + self.daemon_stop = daemon_stop + self.EXIT_EVENT = threading.Event() + self.check_interval = 30 + self.files_preserve_map = {} + + def restart(self): + self.stop() + time.sleep(5) + self.start_and_watch() + + def start_and_watch(self): + logging.info(time.ctime()) + logging.info(f'JumpServer version {__version__}, more see https://www.jumpserver.org') + self.start() + if self.daemon_run: + self.show_status() + with self.daemon_context: + self.watch() + else: + self.watch() + + def start(self): + for service in self._services: + service: BaseService + service.start() + self.files_preserve_map[service.name] = service.log_file + time.sleep(1) + + def stop(self): + for service in self._services: + service: BaseService + service.stop(force=self.force_stop) + + if self.daemon_stop: + self.stop_daemon() + + # -- watch -- + def watch(self): + while not self.EXIT_EVENT.is_set(): + try: + _exit = self._watch() + if _exit: + break + time.sleep(self.check_interval) + except KeyboardInterrupt: + print('Start stop services') + break + self.clean_up() + + def _watch(self): + for service in self._services: + service: BaseService + service.watch() + if service.EXIT_EVENT.is_set(): + self.EXIT_EVENT.set() + return True + return False + # -- end watch -- + + def clean_up(self): + if not self.EXIT_EVENT.is_set(): + self.EXIT_EVENT.set() + for service in self._services: + service: BaseService + service.stop(force=self.force_stop) + + def show_status(self): + for service in self._services: + service: BaseService + service.show_status() + + # -- daemon -- + def stop_daemon(self): + if self.daemon_pid and self.daemon_is_running: + os.kill(self.daemon_pid, 15) + self.remove_daemon_pid() + + def remove_daemon_pid(self): + if os.path.isfile(self.daemon_pid_filepath): + os.unlink(self.daemon_pid_filepath) + + @property + def daemon_pid(self): + if not os.path.isfile(self.daemon_pid_filepath): + return 0 + with open(self.daemon_pid_filepath) as f: + try: + pid = int(f.read().strip()) + except ValueError: + pid = 0 + return pid + + @property + def daemon_is_running(self): + try: + os.kill(self.daemon_pid, 0) + except (OSError, ProcessLookupError): + return False + else: + return True + + @property + def daemon_pid_filepath(self): + return os.path.join(TMP_DIR, 'jms.pid') + + @property + def daemon_log_filepath(self): + return os.path.join(LOG_DIR, 'jms.log') + + @property + def daemon_context(self): + daemon_log_file = open(self.daemon_log_filepath, 'a') + context = daemon.DaemonContext( + pidfile=pidfile.TimeoutPIDLockFile(self.daemon_pid_filepath), + signal_map={ + signal.SIGTERM: lambda x, y: self.clean_up(), + signal.SIGHUP: 'terminate', + }, + stdout=daemon_log_file, + stderr=daemon_log_file, + files_preserve=list(self.files_preserve_map.values()), + detach_process=True, + ) + return context + # -- end daemon -- diff --git a/apps/common/management/commands/start.py b/apps/common/management/commands/start.py new file mode 100644 index 000000000..4c078a876 --- /dev/null +++ b/apps/common/management/commands/start.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Start services' + action = Action.start.value diff --git a/apps/common/management/commands/status.py b/apps/common/management/commands/status.py new file mode 100644 index 000000000..36f0d3608 --- /dev/null +++ b/apps/common/management/commands/status.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Show services status' + action = Action.status.value diff --git a/apps/common/management/commands/stop.py b/apps/common/management/commands/stop.py new file mode 100644 index 000000000..a79a5335c --- /dev/null +++ b/apps/common/management/commands/stop.py @@ -0,0 +1,6 @@ +from .services.command import BaseActionCommand, Action + + +class Command(BaseActionCommand): + help = 'Stop services' + action = Action.stop.value diff --git a/apps/common/utils/common.py b/apps/common/utils/common.py index ac2bc0c90..6aaebaab1 100644 --- a/apps/common/utils/common.py +++ b/apps/common/utils/common.py @@ -7,8 +7,6 @@ import logging import datetime import uuid from functools import wraps -import string -import random import time import ipaddress import psutil @@ -242,11 +240,8 @@ class lazyproperty: return value -def get_disk_usage(): - partitions = psutil.disk_partitions() - mount_points = [p.mountpoint for p in partitions] - usages = {p: psutil.disk_usage(p) for p in mount_points} - return usages +def get_disk_usage(path): + return psutil.disk_usage(path=path).percent def get_cpu_load(): @@ -257,7 +252,7 @@ def get_cpu_load(): return float(single_cpu_load_1) -def get_memory_used(): +def get_memory_usage(): return psutil.virtual_memory().percent diff --git a/apps/jumpserver/conf.py b/apps/jumpserver/conf.py index 495ff5d50..b24b1f460 100644 --- a/apps/jumpserver/conf.py +++ b/apps/jumpserver/conf.py @@ -301,7 +301,6 @@ class Config(dict): 'CONNECTION_TOKEN_ENABLED': False, 'ONLY_ALLOW_EXIST_USER_AUTH': False, 'ONLY_ALLOW_AUTH_FROM_SOURCE': False, - 'DISK_CHECK_ENABLED': True, 'SESSION_SAVE_EVERY_REQUEST': True, 'SESSION_EXPIRE_AT_BROWSER_CLOSE_FORCE': False, 'FORGOT_PASSWORD_URL': '', diff --git a/apps/jumpserver/settings/custom.py b/apps/jumpserver/settings/custom.py index cc6875083..722682bb0 100644 --- a/apps/jumpserver/settings/custom.py +++ b/apps/jumpserver/settings/custom.py @@ -119,7 +119,6 @@ TICKETS_ENABLED = CONFIG.TICKETS_ENABLED REFERER_CHECK_ENABLED = CONFIG.REFERER_CHECK_ENABLED CONNECTION_TOKEN_ENABLED = CONFIG.CONNECTION_TOKEN_ENABLED -DISK_CHECK_ENABLED = CONFIG.DISK_CHECK_ENABLED FORGOT_PASSWORD_URL = CONFIG.FORGOT_PASSWORD_URL diff --git a/apps/ops/notifications.py b/apps/ops/notifications.py index 5ca460c8f..289871d74 100644 --- a/apps/ops/notifications.py +++ b/apps/ops/notifications.py @@ -1,11 +1,9 @@ from django.utils.translation import gettext_lazy as _ -from django.conf import settings from notifications.notifications import SystemMessage from notifications.models import SystemMsgSubscription from users.models import User from notifications.backends import BACKEND -from common.utils import get_disk_usage, get_cpu_load, get_memory_used from terminal.models import Status, Terminal __all__ = ('ServerPerformanceMessage', 'ServerPerformanceCheckUtil') @@ -31,114 +29,75 @@ class ServerPerformanceMessage(SystemMessage): class ServerPerformanceCheckUtil(object): + items_mapper = { + 'is_alive': { + 'default': False, + 'max_threshold': False, + 'alarm_msg_format': _('[Alive] The terminal is offline: {name}') + }, + 'disk_usage': { + 'default': 0, + 'max_threshold': 80, + 'alarm_msg_format': _( + '[Disk] Disk used more than {max_threshold}%: => {value} ({name})' + ) + }, + 'memory_usage': { + 'default': 0, + 'max_threshold': 85, + 'alarm_msg_format': _( + '[Memory] Memory used more than {max_threshold}%: => {value} ({name})' + ), + }, + 'cpu_load': { + 'default': 0, + 'max_threshold': 5, + 'alarm_msg_format': _( + '[CPU] CPU load more than {max_threshold}: => {value} ({name})' + ), + }, + } def __init__(self): self.alarm_messages = [] - self.disk_usage_threshold = 20 # 80 - self.cpu_load_threshold = 1 # 5 - self.memory_usage_threshold = 20 # 85 - # checking terminal + self._terminals = [] self._terminal = None def check_and_publish(self): self.check() self.publish() + def check(self): + self.alarm_messages = [] + self.initial_terminals() + for item, data in self.items_mapper.items(): + for self._terminal in self._terminals: + self.check_item(item, data) + + def check_item(self, item, data): + default = data['default'] + max_threshold = data['max_threshold'] + value = getattr(self._terminal.stat, item, default) + print(value, max_threshold, self._terminal.name, self._terminal.id) + if isinstance(value, bool) and value != max_threshold: + return + elif isinstance(value, (int, float)) and value < max_threshold: + return + msg = data['alarm_msg_format'] + msg = msg.format(max_threshold=max_threshold, value=value, name=self._terminal.name) + self.alarm_messages.append(msg) + def publish(self): if not self.alarm_messages: return msg = '
'.join(self.alarm_messages) ServerPerformanceMessage(msg).publish() - def check(self): - check_items = ['disk_usage', 'cpu_load', 'memory_usage'] - - # Check local - if settings.DISK_CHECK_ENABLED: - self.check_items(check_items) - - # Check terminal - check_items += ['is_alive'] - terminals = self.get_terminals() - for terminal in terminals: - self._terminal = terminal - self.check_items(check_items) - - @staticmethod - def get_terminals(): + def initial_terminals(self): terminals = [] - for terminal in Terminal.objects.filter(is_accepted=True, is_deleted=False): + for terminal in Terminal.objects.filter(is_deleted=False): if not terminal.is_active: continue - terminal.status = Status.get_terminal_latest_stat(terminal) + terminal.stat = Status.get_terminal_latest_stat(terminal) terminals.append(terminal) - return terminals - - def check_items(self, items): - for item in items: - messages = getattr(self, f'check_{item}', lambda: None)() - self.alarm_messages.extend(messages) - - def check_is_alive(self): - message = [] - if not self._terminal and not self._terminal.is_alive: - name = self._terminal.name - msg = _('The terminal is offline: {}').format(name) - message.append(msg) - return message - - def check_disk_usage(self): - messages = [] - if self._terminal: - name = self._terminal.name - disk_used = getattr(self._terminal.status, 'disk_used', None) - disks_used = [['/', disk_used]] if disk_used else [] - else: - name = 'Core' - disks_used = self._get_local_disk_usage() - - for disk, used in disks_used: - if used <= self.disk_usage_threshold: - continue - msg = _("Disk used more than {}%: {} => {} ({})").format(self.disk_usage_threshold, disk, used, name) - messages.append(msg) - return messages - - @staticmethod - def _get_local_disk_usage(): - disks_usage = [] - usages = get_disk_usage() - uncheck_paths = ['/etc', '/boot'] - for path, usage in usages.items(): - if len(path) > 4 and path[:4] in uncheck_paths: - continue - disks_usage.append([path, usage.percent]) - return disks_usage - - def check_cpu_load(self): - messages = [] - if self._terminal: - name = self._terminal.name - cpu_load = getattr(self._terminal.status, 'cpu_load', 0) - else: - name = 'Core' - cpu_load = get_cpu_load() - - if cpu_load > self.cpu_load_threshold: - msg = _('CPU load more than {}: => {} ({})').format(self.cpu_load_threshold, cpu_load, name) - messages.append(msg) - return messages - - def check_memory_usage(self): - messages = [] - if self._terminal: - name = self._terminal.name - memory_usage = getattr(self._terminal.status, 'memory_usage', 0) - else: - name = 'Core' - memory_usage = get_memory_used() - - if memory_usage > self.memory_usage_threshold: - msg = _('Memory used more than {}%: => {} ({})').format(self.memory_usage_threshold, memory_usage, name) - messages.append(msg) - return messages + self._terminals = terminals diff --git a/apps/terminal/const.py b/apps/terminal/const.py index c2512e024..74844d714 100644 --- a/apps/terminal/const.py +++ b/apps/terminal/const.py @@ -45,6 +45,8 @@ class TerminalTypeChoices(TextChoices): omnidb = 'omnidb', 'OmniDB' xrdp = 'xrdp', 'Xrdp' lion = 'lion', 'Lion' + core = 'core', 'Core' + celery = 'celery', 'Celery' @classmethod def types(cls): diff --git a/apps/terminal/migrations/0039_auto_20210805_1552.py b/apps/terminal/migrations/0039_auto_20210805_1552.py new file mode 100644 index 000000000..060e22548 --- /dev/null +++ b/apps/terminal/migrations/0039_auto_20210805_1552.py @@ -0,0 +1,18 @@ +# Generated by Django 3.1.6 on 2021-08-05 07:52 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('terminal', '0038_task_kwargs'), + ] + + operations = [ + migrations.AlterField( + model_name='terminal', + name='type', + field=models.CharField(choices=[('koko', 'KoKo'), ('guacamole', 'Guacamole'), ('omnidb', 'OmniDB'), ('xrdp', 'Xrdp'), ('lion', 'Lion'), ('core', 'Core'), ('celery', 'Celery')], default='koko', max_length=64, verbose_name='type'), + ), + ] diff --git a/apps/terminal/models/status.py b/apps/terminal/models/status.py index dddf8f350..cece6b5ce 100644 --- a/apps/terminal/models/status.py +++ b/apps/terminal/models/status.py @@ -54,6 +54,7 @@ class Status(models.Model): data.pop('terminal', None) stat = cls(**data) stat.terminal = terminal + stat.is_alive = terminal.is_alive return stat def save(self, force_insert=False, force_update=False, using=None, diff --git a/apps/terminal/signals_handler.py b/apps/terminal/signals_handler.py index 3d98261b1..ec51c5a2b 100644 --- a/apps/terminal/signals_handler.py +++ b/apps/terminal/signals_handler.py @@ -1,3 +1,2 @@ # -*- coding: utf-8 -*- # - diff --git a/apps/terminal/startup.py b/apps/terminal/startup.py new file mode 100644 index 000000000..1e77c71a0 --- /dev/null +++ b/apps/terminal/startup.py @@ -0,0 +1,75 @@ +import os +import time +import socket +import threading +from django.conf import settings +from common.decorator import Singleton +from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger +from .serializers.terminal import TerminalRegistrationSerializer, StatusSerializer +from .const import TerminalTypeChoices +from .models.terminal import Terminal + +__all__ = ['CoreTerminal', 'CeleryTerminal'] + + +class BaseTerminal(object): + + def __init__(self, suffix_name, _type): + self.server_hostname = os.environ.get('SERVER_HOSTNAME') or socket.gethostname() + self.name = f'[{suffix_name}] {self.server_hostname}' + self.interval = 30 + self.remote_addr = socket.gethostbyname(socket.gethostname()) + self.type = _type + + def start_heartbeat_thread(self): + print(f'- Start heartbeat thread => ({self.name})') + t = threading.Thread(target=self.start_heartbeat) + t.setDaemon(True) + t.start() + + def start_heartbeat(self): + while True: + heartbeat_data = { + 'cpu_load': get_cpu_load(), + 'memory_used': get_memory_usage(), + 'disk_used': get_disk_usage(path=settings.BASE_DIR), + 'sessions': [], + } + status_serializer = StatusSerializer(data=heartbeat_data) + status_serializer.is_valid() + status_serializer.validated_data.pop('sessions', None) + terminal = self.get_or_register_terminal() + status_serializer.validated_data['terminal'] = terminal + status_serializer.save() + + time.sleep(self.interval) + + def get_or_register_terminal(self): + terminal = Terminal.objects.filter(name=self.name, type=self.type, is_deleted=False).first() + if not terminal: + terminal = self.register_terminal() + return terminal + + def register_terminal(self): + data = {'name': self.name, 'type': self.type, 'remote_addr': self.remote_addr} + serializer = TerminalRegistrationSerializer(data=data) + serializer.is_valid() + terminal = serializer.save() + return terminal + + +@Singleton +class CoreTerminal(BaseTerminal): + + def __init__(self): + super().__init__( + suffix_name=TerminalTypeChoices.core.label, _type=TerminalTypeChoices.core.value + ) + + +@Singleton +class CeleryTerminal(BaseTerminal): + def __init__(self): + super().__init__( + suffix_name=TerminalTypeChoices.celery.label, _type=TerminalTypeChoices.celery.value + ) diff --git a/jms b/jms index 6bd71849d..0899f054d 100755 --- a/jms +++ b/jms @@ -3,19 +3,11 @@ import os import subprocess -import threading -import datetime import logging import logging.handlers -import psutil import time import argparse import sys -import shutil -import signal -from collections import defaultdict -import daemon -from daemon import pidfile BASE_DIR = os.path.dirname(os.path.abspath(__file__)) sys.path.insert(0, BASE_DIR) @@ -40,29 +32,12 @@ except ImportError as e: sys.exit(1) os.environ["PYTHONIOENCODING"] = "UTF-8" -APPS_DIR = os.path.join(BASE_DIR, 'apps') -LOG_DIR = os.path.join(BASE_DIR, 'logs') -TMP_DIR = os.path.join(BASE_DIR, 'tmp') -HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1' -HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080 -WS_PORT = CONFIG.WS_LISTEN_PORT or 8082 -DEBUG = CONFIG.DEBUG or False -LOG_LEVEL = CONFIG.LOG_LEVEL or 'INFO' -START_TIMEOUT = 40 -WORKERS = 4 -DAEMON = False -LOG_KEEP_DAYS = 7 logging.basicConfig( format='%(asctime)s %(message)s', level=logging.INFO, datefmt='%Y-%m-%d %H:%M:%S' ) -EXIT_EVENT = threading.Event() -LOCK = threading.Lock() -files_preserve = [] -STOP_TIMEOUT = 10 - logger = logging.getLogger() try: @@ -76,8 +51,8 @@ def check_database_connection(): os.chdir(os.path.join(BASE_DIR, 'apps')) for i in range(60): logging.info("Check database connection ...") - code = subprocess.call("python manage.py showmigrations users ", shell=True) - if code == 0: + _code = subprocess.call("python manage.py showmigrations users ", shell=True) + if _code == 0: logging.info("Database connect success") return time.sleep(1) @@ -86,10 +61,11 @@ def check_database_connection(): def check_migrations(): - apps_dir = os.path.join(BASE_DIR, 'apps') - code = subprocess.call("python manage.py showmigrations | grep '\[.\]' | grep -v '\[X\]'", shell=True, cwd=apps_dir) + _apps_dir = os.path.join(BASE_DIR, 'apps') + _cmd = "python manage.py showmigrations | grep '\[.\]' | grep -v '\[X\]'" + _code = subprocess.call(_cmd, shell=True, cwd=_apps_dir) - if code == 1: + if _code == 1: return # for i in range(3): # print("!!! Warning: Has SQL migrations not perform, 有 SQL 变更没有执行") @@ -98,10 +74,10 @@ def check_migrations(): def expire_caches(): - apps_dir = os.path.join(BASE_DIR, 'apps') - code = subprocess.call("python manage.py expire_caches", shell=True, cwd=apps_dir) + _apps_dir = os.path.join(BASE_DIR, 'apps') + _code = subprocess.call("python manage.py expire_caches", shell=True, cwd=_apps_dir) - if code == 1: + if _code == 1: return @@ -115,8 +91,8 @@ def perform_db_migrate(): def collect_static(): logging.info("Collect static files") os.chdir(os.path.join(BASE_DIR, 'apps')) - command = 'python3 manage.py collectstatic --no-input -c &> /dev/null ' - subprocess.call(command, shell=True) + _cmd = 'python3 manage.py collectstatic --no-input -c &> /dev/null ' + subprocess.call(_cmd, shell=True) logging.info("Collect static files done") @@ -127,394 +103,6 @@ def prepare(): expire_caches() -def check_pid(pid): - """ Check For the existence of a unix pid. """ - try: - os.kill(pid, 0) - except (OSError, ProcessLookupError): - return False - else: - return True - - -def get_pid_file_path(s): - return os.path.join(TMP_DIR, '{}.pid'.format(s)) - - -def get_log_file_path(s): - return os.path.join(LOG_DIR, '{}.log'.format(s)) - - -def get_pid_from_file(path): - if os.path.isfile(path): - with open(path) as f: - try: - return int(f.read().strip()) - except ValueError: - return 0 - return 0 - - -def get_pid(s): - pid_file = get_pid_file_path(s) - return get_pid_from_file(pid_file) - - -def is_running(s, unlink=True): - pid_file = get_pid_file_path(s) - - if os.path.isfile(pid_file): - pid = get_pid(s) - if pid == 0: - return False - elif check_pid(pid): - return True - - if unlink: - os.unlink(pid_file) - return False - - -def parse_service(s): - web_services = ['gunicorn', 'flower', 'daphne'] - celery_services = [ - "celery_ansible", "celery_default" - ] - task_services = celery_services + ['beat'] - all_services = web_services + task_services - if s == 'all': - return all_services - elif s == "web": - return web_services - elif s == 'ws': - return ['daphne'] - elif s == "task": - return task_services - elif s == "celery": - return celery_services - elif "," in s: - services = set() - for i in s.split(','): - services.update(parse_service(i)) - return services - else: - return [s] - - -def get_start_gunicorn_kwargs(): - print("\n- Start Gunicorn WSGI HTTP Server") - prepare() - bind = '{}:{}'.format(HTTP_HOST, HTTP_PORT) - log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s ' - - cmd = [ - 'gunicorn', 'jumpserver.wsgi', - '-b', bind, - '-k', 'gthread', - '--threads', '10', - '-w', str(WORKERS), - '--max-requests', '4096', - '--access-logformat', log_format, - '--access-logfile', '-' - ] - - if DEBUG: - cmd.append('--reload') - return {'cmd': cmd, 'cwd': APPS_DIR} - - -def get_start_daphne_kwargs(): - print("\n- Start Daphne ASGI WS Server") - cmd = [ - 'daphne', 'jumpserver.asgi:application', - '-b', HTTP_HOST, - '-p', str(WS_PORT), - ] - return {'cmd': cmd, 'cwd': APPS_DIR} - - -def get_start_celery_ansible_kwargs(): - print("\n- Start Celery as Distributed Task Queue: Ansible") - return get_start_worker_kwargs('ansible', 10) - - -def get_start_celery_default_kwargs(): - print("\n- Start Celery as Distributed Task Queue: Celery") - return get_start_worker_kwargs('celery', 10) - - -def get_start_worker_kwargs(queue, num): - # Todo: Must set this environment, otherwise not no ansible result return - os.environ.setdefault('PYTHONOPTIMIZE', '1') - os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True') - - if os.getuid() == 0: - os.environ.setdefault('C_FORCE_ROOT', '1') - server_hostname = os.environ.get("SERVER_HOSTNAME") - if not server_hostname: - server_hostname = '%h' - - cmd = [ - 'celery', 'worker', - '-P', 'threads', - '-A', 'ops', - '-l', 'INFO', - '-c', str(num), - '-Q', queue, - '-n', '{}@{}'.format(queue, server_hostname) - ] - return {"cmd": cmd, "cwd": APPS_DIR} - - -def get_start_flower_kwargs(): - print("\n- Start Flower as Task Monitor") - if os.getuid() == 0: - os.environ.setdefault('C_FORCE_ROOT', '1') - - cmd = [ - 'celery', 'flower', - '-A', 'ops', - '-l', 'INFO', - '--url_prefix=/core/flower', - '--auto_refresh=False', - '--max_tasks=1000', - '--tasks_columns=uuid,name,args,state,received,started,runtime,worker' - ] - return {"cmd": cmd, "cwd": APPS_DIR} - - -def get_start_beat_kwargs(): - print("\n- Start Beat as Periodic Task Scheduler") - utils_dir = os.path.join(BASE_DIR, 'utils') - cmd = [ - sys.executable, 'start_celery_beat.py', - ] - return {"cmd": cmd, 'cwd': utils_dir} - - -processes = {} - - -def watch_services(): - max_retry = 3 - services_retry = defaultdict(int) - stopped_services = {} - - def check_services(): - now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') - for s, p in processes.items(): - print("{} Check service status: {} -> ".format(now, s), end='') - try: - p.wait(timeout=1) # 不wait,子进程可能无法回收 - except subprocess.TimeoutExpired: - pass - ok = is_running(s) - if not ok: - stopped_services[s] = '' - print("stopped with code: {}({})".format(p.returncode, p.pid)) - else: - print("running at {}".format(p.pid)) - stopped_services.pop(s, None) - services_retry.pop(s, None) - - def retry_start_stopped_services(): - for s in stopped_services: - if services_retry[s] > max_retry: - logging.info("Service start failed, exit: ", s) - EXIT_EVENT.set() - break - - p = start_service(s) - logging.info("> Find {} stopped, retry {}, {}".format( - s, services_retry[s] + 1, p.pid) - ) - processes[s] = p - services_retry[s] += 1 - - def rotate_log_if_need(): - now = datetime.datetime.now() - tm = now.strftime('%H:%M') - if tm != '23:59': - return - suffix = now.strftime('%Y-%m-%d') - services = list(processes.keys()) - services.append('jms') - - for s in services: - log_path = get_log_file_path(s) - log_dir = os.path.dirname(log_path) - filename = os.path.basename(log_path) - pre_log_dir = os.path.join(log_dir, suffix) - if not os.path.exists(pre_log_dir): - os.mkdir(pre_log_dir) - - pre_log_path = os.path.join(pre_log_dir, filename) - if os.path.isfile(log_path) and not os.path.isfile(pre_log_path): - logging.info("Rotate log file: {} => {}".format(log_path, pre_log_path)) - shutil.copy(log_path, pre_log_path) - with open(log_path, 'w') as f: - pass - some_days_ago = now - datetime.timedelta(days=LOG_KEEP_DAYS) - days_ago_dir = os.path.join(LOG_DIR, some_days_ago.strftime('%Y-%m-%d')) - if os.path.exists(days_ago_dir): - logger.info("Remove old log: {}".format(days_ago_dir)) - shutil.rmtree(days_ago_dir, ignore_errors=True) - - while not EXIT_EVENT.is_set(): - try: - with LOCK: - check_services() - retry_start_stopped_services() - rotate_log_if_need() - time.sleep(30) - except KeyboardInterrupt: - print("Start stop service") - time.sleep(1) - break - clean_up() - - -def start_service(s): - services_kwargs = { - "gunicorn": get_start_gunicorn_kwargs, - "celery_ansible": get_start_celery_ansible_kwargs, - "celery_default": get_start_celery_default_kwargs, - "beat": get_start_beat_kwargs, - "flower": get_start_flower_kwargs, - "daphne": get_start_daphne_kwargs, - } - - kwargs = services_kwargs.get(s)() - pid_file = get_pid_file_path(s) - - if os.path.isfile(pid_file): - os.unlink(pid_file) - cmd = kwargs.pop('cmd') - - log_file_path = get_log_file_path(s) - log_file_f = open(log_file_path, 'a') - files_preserve.append(log_file_f) - kwargs['stderr'] = log_file_f - kwargs['stdout'] = log_file_f - p = subprocess.Popen(cmd, **kwargs) - with open(pid_file, 'w') as f: - f.write(str(p.pid)) - return p - - -def start_services_and_watch(s): - logging.info(time.ctime()) - logging.info('JumpServer version {}, more see https://www.jumpserver.org'.format( - __version__) - ) - - services_set = parse_service(s) - for i in services_set: - if is_running(i): - show_service_status(i) - continue - p = start_service(i) - time.sleep(2) - processes[i] = p - - if not DAEMON: - watch_services() - else: - show_service_status(s) - context = get_daemon_context() - with context: - watch_services() - - -def get_daemon_context(): - daemon_pid_file = get_pid_file_path('jms') - daemon_log_f = open(get_log_file_path('jms'), 'a') - files_preserve.append(daemon_log_f) - context = daemon.DaemonContext( - pidfile=pidfile.TimeoutPIDLockFile(daemon_pid_file), - signal_map={ - signal.SIGTERM: lambda x, y: clean_up(), - signal.SIGHUP: 'terminate', - }, - stdout=daemon_log_f, - stderr=daemon_log_f, - files_preserve=files_preserve, - detach_process=True, - ) - return context - - -def stop_service(srv, sig=15): - services_set = parse_service(srv) - for s in services_set: - if not is_running(s): - show_service_status(s) - continue - print("Stop service: {}".format(s), end='') - pid = get_pid(s) - os.kill(pid, sig) - with LOCK: - process = processes.pop(s, None) - if process is None: - try: - process = psutil.Process(pid) - except: - pass - if process is None: - print("\033[31m No process found\033[0m") - continue - try: - process.wait(1) - except: - pass - for i in range(STOP_TIMEOUT): - if i == STOP_TIMEOUT - 1: - print("\033[31m Error\033[0m") - if not is_running(s): - print("\033[32m Ok\033[0m") - break - else: - time.sleep(1) - continue - - if srv == "all": - stop_daemon_service() - - -def stop_daemon_service(): - pid = get_pid('jms') - if pid and check_pid(pid): - os.kill(pid, 15) - - -def stop_multi_services(services): - for s in services: - stop_service(s, sig=9) - - -def stop_service_force(s): - stop_service(s, sig=9) - - -def clean_up(): - if not EXIT_EVENT.is_set(): - EXIT_EVENT.set() - processes_dump = {k: v for k, v in processes.items()} - for s1, p1 in processes_dump.items(): - stop_service(s1) - p1.wait() - - -def show_service_status(s): - services_set = parse_service(s) - for ns in services_set: - if is_running(ns): - pid = get_pid(ns) - print("{} is running: {}".format(ns, pid)) - else: - print("{} is stopped".format(ns)) - - def upgrade_db(): collect_static() perform_db_migrate() @@ -536,38 +124,37 @@ if __name__ == '__main__': help="Action to run" ) parser.add_argument( - "service", type=str, default="all", nargs="?", - choices=("all", "web", "task", "gunicorn", "celery", "beat", "celery,beat", "flower", "ws"), + "services", type=str, default='all', nargs="*", + choices=("all", "web", "task"), help="The service to start", ) - parser.add_argument('-d', '--daemon', nargs="?", const=1) - parser.add_argument('-w', '--worker', type=int, nargs="?", const=4) - parser.add_argument('-f', '--force', nargs="?", const=1) - args = parser.parse_args() - if args.daemon: - DAEMON = True + parser.add_argument('-d', '--daemon', nargs="?", const=True) + parser.add_argument('-w', '--worker', type=int, nargs="?", default=4) + parser.add_argument('-f', '--force', nargs="?", const=True) - if args.worker: - WORKERS = args.worker + args = parser.parse_args() action = args.action - srv = args.service - - if action == "start": - start_services_and_watch(srv) - os._exit(0) - elif action == "stop": - print("Stop service") - if args.force: - stop_service_force(srv) - else: - stop_service(srv) - elif action == "restart": - DAEMON = True - stop_service(srv) - time.sleep(5) - start_services_and_watch(srv) - elif action == "upgrade_db": + if action == "upgrade_db": upgrade_db() else: - show_service_status(srv) + services = args.services if isinstance(args.services, list) else [args.services] + if action == 'start' and ({'gunicorn', 'all'} & set(services)): + prepare() + + services_string = ' '.join(services) + cmd = f'python manage.py {args.action} {services_string}' + if args.daemon: + cmd += ' --daemon' + if args.worker: + cmd += f' --worker {args.worker}' + if args.force: + cmd += ' --force' + apps_dir = os.path.join(BASE_DIR, 'apps') + + try: + # processes: main(3s) -> call(0.25s) -> service -> sub-process + code = subprocess.run(cmd, shell=True, cwd=apps_dir) + except KeyboardInterrupt: + time.sleep(1) + pass