From 33236aaa4743a7de6128ff298cc6aa419b966552 Mon Sep 17 00:00:00 2001 From: Bai Date: Thu, 12 Aug 2021 15:27:18 +0800 Subject: [PATCH] =?UTF-8?q?fix:=20=E4=BF=AE=E5=A4=8D=E5=90=AF=E5=8A=A8?= =?UTF-8?q?=E8=84=9A=E6=9C=ACbeat=E8=BF=9B=E7=A8=8B=E5=81=B6=E5=B0=94?= =?UTF-8?q?=E4=B8=8D=E4=BC=9A=E7=BB=93=E6=9D=9F=E7=9A=84=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../management/commands/services/command.py | 8 ++-- .../commands/services/services/base.py | 37 ++++++++++++++++--- .../management/commands/services/utils.py | 19 +++++----- jms | 4 +- utils/start_celery_beat.py | 26 +++++++++++-- 5 files changed, 68 insertions(+), 26 deletions(-) diff --git a/apps/common/management/commands/services/command.py b/apps/common/management/commands/services/command.py index 17a09251b..230d59e10 100644 --- a/apps/common/management/commands/services/command.py +++ b/apps/common/management/commands/services/command.py @@ -44,7 +44,7 @@ class Services(TextChoices): @classmethod def task_services(cls): - return cls.celery_services() + [cls.beat] + return cls.celery_services() + [cls.beat, cls.flower] @classmethod def all_services(cls): @@ -113,9 +113,9 @@ class BaseActionCommand(BaseCommand): kwargs = { 'services': services, - 'daemon_run': options.get('daemon', False), - 'daemon_stop': Services.all.value in service_names, - 'force_stop': options.get('force', False), + 'run_daemon': options.get('daemon', False), + 'stop_daemon': self.action == Action.stop.value and Services.all.value in service_names, + 'force_stop': options.get('force') or False, } self.util = ServicesUtil(**kwargs) diff --git a/apps/common/management/commands/services/services/base.py b/apps/common/management/commands/services/services/base.py index f32f30d99..5063fb92e 100644 --- a/apps/common/management/commands/services/services/base.py +++ b/apps/common/management/commands/services/services/base.py @@ -1,6 +1,7 @@ import abc import time import shutil +import psutil import datetime import threading import subprocess @@ -11,13 +12,12 @@ class BaseService(object): def __init__(self, **kwargs): self.name = kwargs['name'] - self.process = None + self._process = None self.STOP_TIMEOUT = 10 self.max_retry = 0 self.retry = 3 self.LOG_KEEP_DAYS = 7 self.EXIT_EVENT = threading.Event() - self.LOCK = threading.Lock() @property @abc.abstractmethod @@ -90,10 +90,22 @@ class BaseService(object): os.unlink(self.pid_filepath) # -- end pid -- + # -- process -- + @property + def process(self): + if not self._process: + try: + self._process = psutil.Process(self.pid) + except: + pass + return self._process + + # -- end process -- + # -- action -- def open_subprocess(self): kwargs = {'cwd': self.cwd, 'stderr': self.log_file, 'stdout': self.log_file} - self.process = subprocess.Popen(self.cmd, **kwargs) + self._process = subprocess.Popen(self.cmd, **kwargs) def start(self): if self.is_running: @@ -107,7 +119,7 @@ class BaseService(object): def start_other(self): pass - def stop(self, force=True): + def stop(self, force=False): if not self.is_running: self.show_status() # self.remove_pid() @@ -117,6 +129,14 @@ class BaseService(object): sig = 9 if force else 15 os.kill(self.pid, sig) + if self.process is None: + print("\033[31m No process found\033[0m") + return + try: + self.process.wait(1) + except: + pass + for i in range(self.STOP_TIMEOUT): if i == self.STOP_TIMEOUT - 1: print("\033[31m Error\033[0m") @@ -129,8 +149,7 @@ class BaseService(object): continue def watch(self): - with self.LOCK: - self._check() + self._check() if not self.is_running: self._restart() self._rotate_log() @@ -138,6 +157,12 @@ class BaseService(object): def _check(self): now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S') print(f"{now} Check service status: {self.name} -> ", end='') + if self.process: + try: + self.process.wait(1) # 不wait,子进程可能无法回收 + except subprocess.TimeoutExpired: + pass + if self.is_running: print(f'running at {self.pid}') else: diff --git a/apps/common/management/commands/services/utils.py b/apps/common/management/commands/services/utils.py index 769810e7a..a5c34d770 100644 --- a/apps/common/management/commands/services/utils.py +++ b/apps/common/management/commands/services/utils.py @@ -10,11 +10,11 @@ from .services.base import BaseService class ServicesUtil(object): - def __init__(self, services, daemon_run=False, force_stop=True, daemon_stop=False): + def __init__(self, services, run_daemon=False, force_stop=False, stop_daemon=False): self._services = services - self.daemon_run = daemon_run + self.run_daemon = run_daemon self.force_stop = force_stop - self.daemon_stop = daemon_stop + self.stop_daemon = stop_daemon self.EXIT_EVENT = threading.Event() self.check_interval = 30 self.files_preserve_map = {} @@ -28,7 +28,7 @@ class ServicesUtil(object): logging.info(time.ctime()) logging.info(f'JumpServer version {__version__}, more see https://www.jumpserver.org') self.start() - if self.daemon_run: + if self.run_daemon: self.show_status() with self.daemon_context: self.watch() @@ -47,8 +47,8 @@ class ServicesUtil(object): service: BaseService service.stop(force=self.force_stop) - if self.daemon_stop: - self.stop_daemon() + if self.stop_daemon: + self._stop_daemon() # -- watch -- def watch(self): @@ -76,9 +76,8 @@ class ServicesUtil(object): def clean_up(self): if not self.EXIT_EVENT.is_set(): self.EXIT_EVENT.set() - for service in self._services: - service: BaseService - service.stop(force=self.force_stop) + + self.stop() def show_status(self): for service in self._services: @@ -86,7 +85,7 @@ class ServicesUtil(object): service.show_status() # -- daemon -- - def stop_daemon(self): + def _stop_daemon(self): if self.daemon_pid and self.daemon_is_running: os.kill(self.daemon_pid, 15) self.remove_daemon_pid() diff --git a/jms b/jms index 83ad10cba..475ef75ca 100755 --- a/jms +++ b/jms @@ -161,7 +161,7 @@ if __name__ == '__main__': try: # processes: main(3s) -> call(0.25s) -> service -> sub-process - code = subprocess.run(cmd, shell=True, cwd=apps_dir) + code = subprocess.call(cmd, shell=True, cwd=apps_dir) except KeyboardInterrupt: - time.sleep(1) + time.sleep(2) pass diff --git a/utils/start_celery_beat.py b/utils/start_celery_beat.py index 466624e3e..34887fb4d 100644 --- a/utils/start_celery_beat.py +++ b/utils/start_celery_beat.py @@ -2,6 +2,7 @@ # import os import sys +import signal import subprocess import redis_lock @@ -28,7 +29,24 @@ cmd = [ '--max-interval', '60' ] -with redis_lock.Lock(redis, name="beat-distribute-start-lock", expire=60, auto_renewal=True): - print("Get beat lock start to run it") - code = subprocess.call(cmd, cwd=APPS_DIR) - sys.exit(code) +processes = [] + + +def stop_beat_process(sig, frame): + for p in processes: + os.kill(p.pid, 15) + + +def main(): + # 父进程结束通知子进程结束 + signal.signal(signal.SIGTERM, stop_beat_process) + + with redis_lock.Lock(redis, name="beat-distribute-start-lock", expire=60, auto_renewal=True): + print("Get beat lock start to run it") + process = subprocess.Popen(cmd, cwd=APPS_DIR) + processes.append(process) + process.wait() + + +if __name__ == '__main__': + main()