Browse Source

fix: 修复启动脚本beat进程偶尔不会结束的问题

pull/6633/head
Bai 3 years ago committed by Jiangjie.Bai
parent
commit
33236aaa47
  1. 8
      apps/common/management/commands/services/command.py
  2. 37
      apps/common/management/commands/services/services/base.py
  3. 19
      apps/common/management/commands/services/utils.py
  4. 4
      jms
  5. 26
      utils/start_celery_beat.py

8
apps/common/management/commands/services/command.py

@ -44,7 +44,7 @@ class Services(TextChoices):
@classmethod
def task_services(cls):
return cls.celery_services() + [cls.beat]
return cls.celery_services() + [cls.beat, cls.flower]
@classmethod
def all_services(cls):
@ -113,9 +113,9 @@ class BaseActionCommand(BaseCommand):
kwargs = {
'services': services,
'daemon_run': options.get('daemon', False),
'daemon_stop': Services.all.value in service_names,
'force_stop': options.get('force', False),
'run_daemon': options.get('daemon', False),
'stop_daemon': self.action == Action.stop.value and Services.all.value in service_names,
'force_stop': options.get('force') or False,
}
self.util = ServicesUtil(**kwargs)

37
apps/common/management/commands/services/services/base.py

@ -1,6 +1,7 @@
import abc
import time
import shutil
import psutil
import datetime
import threading
import subprocess
@ -11,13 +12,12 @@ class BaseService(object):
def __init__(self, **kwargs):
self.name = kwargs['name']
self.process = None
self._process = None
self.STOP_TIMEOUT = 10
self.max_retry = 0
self.retry = 3
self.LOG_KEEP_DAYS = 7
self.EXIT_EVENT = threading.Event()
self.LOCK = threading.Lock()
@property
@abc.abstractmethod
@ -90,10 +90,22 @@ class BaseService(object):
os.unlink(self.pid_filepath)
# -- end pid --
# -- process --
@property
def process(self):
if not self._process:
try:
self._process = psutil.Process(self.pid)
except:
pass
return self._process
# -- end process --
# -- action --
def open_subprocess(self):
kwargs = {'cwd': self.cwd, 'stderr': self.log_file, 'stdout': self.log_file}
self.process = subprocess.Popen(self.cmd, **kwargs)
self._process = subprocess.Popen(self.cmd, **kwargs)
def start(self):
if self.is_running:
@ -107,7 +119,7 @@ class BaseService(object):
def start_other(self):
pass
def stop(self, force=True):
def stop(self, force=False):
if not self.is_running:
self.show_status()
# self.remove_pid()
@ -117,6 +129,14 @@ class BaseService(object):
sig = 9 if force else 15
os.kill(self.pid, sig)
if self.process is None:
print("\033[31m No process found\033[0m")
return
try:
self.process.wait(1)
except:
pass
for i in range(self.STOP_TIMEOUT):
if i == self.STOP_TIMEOUT - 1:
print("\033[31m Error\033[0m")
@ -129,8 +149,7 @@ class BaseService(object):
continue
def watch(self):
with self.LOCK:
self._check()
self._check()
if not self.is_running:
self._restart()
self._rotate_log()
@ -138,6 +157,12 @@ class BaseService(object):
def _check(self):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{now} Check service status: {self.name} -> ", end='')
if self.process:
try:
self.process.wait(1) # 不wait,子进程可能无法回收
except subprocess.TimeoutExpired:
pass
if self.is_running:
print(f'running at {self.pid}')
else:

19
apps/common/management/commands/services/utils.py

@ -10,11 +10,11 @@ from .services.base import BaseService
class ServicesUtil(object):
def __init__(self, services, daemon_run=False, force_stop=True, daemon_stop=False):
def __init__(self, services, run_daemon=False, force_stop=False, stop_daemon=False):
self._services = services
self.daemon_run = daemon_run
self.run_daemon = run_daemon
self.force_stop = force_stop
self.daemon_stop = daemon_stop
self.stop_daemon = stop_daemon
self.EXIT_EVENT = threading.Event()
self.check_interval = 30
self.files_preserve_map = {}
@ -28,7 +28,7 @@ class ServicesUtil(object):
logging.info(time.ctime())
logging.info(f'JumpServer version {__version__}, more see https://www.jumpserver.org')
self.start()
if self.daemon_run:
if self.run_daemon:
self.show_status()
with self.daemon_context:
self.watch()
@ -47,8 +47,8 @@ class ServicesUtil(object):
service: BaseService
service.stop(force=self.force_stop)
if self.daemon_stop:
self.stop_daemon()
if self.stop_daemon:
self._stop_daemon()
# -- watch --
def watch(self):
@ -76,9 +76,8 @@ class ServicesUtil(object):
def clean_up(self):
if not self.EXIT_EVENT.is_set():
self.EXIT_EVENT.set()
for service in self._services:
service: BaseService
service.stop(force=self.force_stop)
self.stop()
def show_status(self):
for service in self._services:
@ -86,7 +85,7 @@ class ServicesUtil(object):
service.show_status()
# -- daemon --
def stop_daemon(self):
def _stop_daemon(self):
if self.daemon_pid and self.daemon_is_running:
os.kill(self.daemon_pid, 15)
self.remove_daemon_pid()

4
jms

@ -161,7 +161,7 @@ if __name__ == '__main__':
try:
# processes: main(3s) -> call(0.25s) -> service -> sub-process
code = subprocess.run(cmd, shell=True, cwd=apps_dir)
code = subprocess.call(cmd, shell=True, cwd=apps_dir)
except KeyboardInterrupt:
time.sleep(1)
time.sleep(2)
pass

26
utils/start_celery_beat.py

@ -2,6 +2,7 @@
#
import os
import sys
import signal
import subprocess
import redis_lock
@ -28,7 +29,24 @@ cmd = [
'--max-interval', '60'
]
with redis_lock.Lock(redis, name="beat-distribute-start-lock", expire=60, auto_renewal=True):
print("Get beat lock start to run it")
code = subprocess.call(cmd, cwd=APPS_DIR)
sys.exit(code)
processes = []
def stop_beat_process(sig, frame):
for p in processes:
os.kill(p.pid, 15)
def main():
# 父进程结束通知子进程结束
signal.signal(signal.SIGTERM, stop_beat_process)
with redis_lock.Lock(redis, name="beat-distribute-start-lock", expire=60, auto_renewal=True):
print("Get beat lock start to run it")
process = subprocess.Popen(cmd, cwd=APPS_DIR)
processes.append(process)
process.wait()
if __name__ == '__main__':
main()

Loading…
Cancel
Save