feat: 系统监控添加 Core/Celery Terminal; 修改检测终端状态逻辑; (#6570)

* feat: 系统监控添加 Core Terminal; 修改检测终端状态逻辑;

* feat: 添加management包

* feat: 添加management包

* feat: 添加 start 模块

* feat: 修改 start 模块

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动命令目录结构

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

* feat: 修改启动脚本

Co-authored-by: Bai <bugatti_it@163.com>
pull/6613/head
fit2bot 2021-08-06 19:16:18 +08:00 committed by GitHub
parent 8ad78ffef8
commit 39ce60c93a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 912 additions and 556 deletions

View File

@ -10,3 +10,15 @@ def on_transaction_commit(func):
def inner(*args, **kwargs):
transaction.on_commit(lambda: func(*args, **kwargs))
return inner
class Singleton(object):
""" 单例类 """
def __init__(self, cls):
self._cls = cls
self._instance = {}
def __call__(self):
if self._cls not in self._instance:
self._instance[self._cls] = self._cls()
return self._instance[self._cls]

View File

View File

@ -0,0 +1,6 @@
from .services.command import BaseActionCommand, Action
class Command(BaseActionCommand):
help = 'Restart services'
action = Action.restart.value

View File

@ -0,0 +1,139 @@
from django.core.management.base import BaseCommand, CommandError
from django.db.models import TextChoices
from .utils import ServicesUtil
from .hands import *
class Services(TextChoices):
gunicorn = 'gunicorn', 'gunicorn'
daphne = 'daphne', 'daphne'
celery_ansible = 'celery_ansible', 'celery_ansible'
celery_default = 'celery_default', 'celery_default'
beat = 'beat', 'beat'
flower = 'flower', 'flower'
ws = 'ws', 'ws'
web = 'web', 'web'
celery = 'celery', 'celery'
task = 'task', 'task'
all = 'all', 'all'
@classmethod
def get_service_object_class(cls, name):
from . import services
services_map = {
cls.gunicorn.value: services.GunicornService,
cls.daphne: services.DaphneService,
cls.flower: services.FlowerService,
cls.celery_default: services.CeleryDefaultService,
cls.celery_ansible: services.CeleryAnsibleService,
cls.beat: services.BeatService
}
return services_map.get(name)
@classmethod
def ws_services(cls):
return [cls.daphne]
@classmethod
def web_services(cls):
return [cls.gunicorn, cls.daphne]
@classmethod
def celery_services(cls):
return [cls.celery_ansible, cls.celery_default]
@classmethod
def task_services(cls):
return cls.celery_services() + [cls.beat]
@classmethod
def all_services(cls):
return cls.web_services() + cls.task_services()
@classmethod
def export_services_values(cls):
return [cls.all.value, cls.web.value, cls.task.value]
@classmethod
def get_service_objects(cls, service_names, **kwargs):
services = set()
for name in service_names:
method_name = f'{name}_services'
if hasattr(cls, method_name):
_services = getattr(cls, method_name)()
elif hasattr(cls, name):
_services = [getattr(cls, name)]
else:
continue
services.update(set(_services))
service_objects = []
for s in services:
service_class = cls.get_service_object_class(s.value)
if not service_class:
continue
kwargs.update({
'name': s.value
})
service_object = service_class(**kwargs)
service_objects.append(service_object)
return service_objects
class Action(TextChoices):
start = 'start', 'start'
status = 'status', 'status'
stop = 'stop', 'stop'
restart = 'restart', 'restart'
class BaseActionCommand(BaseCommand):
help = 'Service Base Command'
action = None
util = None
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def add_arguments(self, parser):
parser.add_argument(
'services', nargs='+', choices=Services.export_services_values(), help='Service',
)
parser.add_argument('-d', '--daemon', nargs="?", const=True)
parser.add_argument('-w', '--worker', type=int, nargs="?", default=4)
parser.add_argument('-f', '--force', nargs="?", const=True)
def initial_util(self, *args, **options):
service_names = options.get('services')
service_kwargs = {
'worker_gunicorn': options.get('worker')
}
services = Services.get_service_objects(service_names=service_names, **service_kwargs)
kwargs = {
'services': services,
'daemon_run': options.get('daemon', False),
'daemon_stop': Services.all.value in service_names,
'force_stop': options.get('force', False),
}
self.util = ServicesUtil(**kwargs)
def handle(self, *args, **options):
self.initial_util(*args, **options)
assert self.action in Action.values, f'The action {self.action} is not in the optional list'
_handle = getattr(self, f'_handle_{self.action}', lambda: None)
_handle()
def _handle_start(self):
self.util.start_and_watch()
os._exit(0)
def _handle_stop(self):
self.util.stop()
def _handle_restart(self):
self.util.restart()
def _handle_status(self):
self.util.show_status()

View File

@ -0,0 +1,26 @@
import os
import sys
import logging
from django.conf import settings
from apps.jumpserver.const import CONFIG
try:
from apps.jumpserver import const
__version__ = const.VERSION
except ImportError as e:
print("Not found __version__: {}".format(e))
print("Python is: ")
logging.info(sys.executable)
__version__ = 'Unknown'
sys.exit(1)
HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080
WS_PORT = CONFIG.WS_LISTEN_PORT or 8082
DEBUG = CONFIG.DEBUG or False
BASE_DIR = os.path.dirname(settings.BASE_DIR)
LOG_DIR = os.path.join(BASE_DIR, 'logs')
APPS_DIR = os.path.join(BASE_DIR, 'apps')
TMP_DIR = os.path.join(BASE_DIR, 'tmp')

View File

@ -0,0 +1,6 @@
from .beat import *
from .celery_ansible import *
from .celery_default import *
from .daphne import *
from .flower import *
from .gunicorn import *

View File

@ -0,0 +1,188 @@
import abc
import time
import shutil
import datetime
import threading
import subprocess
from ..hands import *
class BaseService(object):
def __init__(self, **kwargs):
self.name = kwargs['name']
self.process = None
self.STOP_TIMEOUT = 10
self.max_retry = 0
self.retry = 3
self.LOG_KEEP_DAYS = 7
self.EXIT_EVENT = threading.Event()
self.LOCK = threading.Lock()
@property
@abc.abstractmethod
def cmd(self):
return []
@property
@abc.abstractmethod
def cwd(self):
return ''
@property
def is_running(self):
if self.pid == 0:
return False
try:
os.kill(self.pid, 0)
except (OSError, ProcessLookupError):
return False
else:
return True
def show_status(self):
if self.is_running:
msg = f'{self.name} is running: {self.pid}.'
else:
msg = f'{self.name} is stopped.'
print(msg)
# -- log --
@property
def log_filename(self):
return f'{self.name}.log'
@property
def log_filepath(self):
return os.path.join(LOG_DIR, self.log_filename)
@property
def log_file(self):
return open(self.log_filepath, 'a')
@property
def log_dir(self):
return os.path.dirname(self.log_filepath)
# -- end log --
# -- pid --
@property
def pid_filepath(self):
return os.path.join(TMP_DIR, f'{self.name}.pid')
@property
def pid(self):
if not os.path.isfile(self.pid_filepath):
return 0
with open(self.pid_filepath) as f:
try:
pid = int(f.read().strip())
except ValueError:
pid = 0
return pid
def write_pid(self):
with open(self.pid_filepath, 'w') as f:
f.write(str(self.process.pid))
def remove_pid(self):
if os.path.isfile(self.pid_filepath):
os.unlink(self.pid_filepath)
# -- end pid --
# -- action --
def open_subprocess(self):
kwargs = {'cwd': self.cwd, 'stderr': self.log_file, 'stdout': self.log_file}
self.process = subprocess.Popen(self.cmd, **kwargs)
def start(self):
if self.is_running:
self.show_status()
return
self.remove_pid()
self.open_subprocess()
self.write_pid()
self.start_other()
def start_other(self):
pass
def stop(self, force=True):
if not self.is_running:
self.show_status()
# self.remove_pid()
return
print(f'Stop service: {self.name}', end='')
sig = 9 if force else 15
os.kill(self.pid, sig)
try:
self.process.wait(2)
except:
pass
for i in range(self.STOP_TIMEOUT):
if i == self.STOP_TIMEOUT - 1:
print("\033[31m Error\033[0m")
if not self.is_running:
print("\033[32m Ok\033[0m")
self.remove_pid()
break
else:
time.sleep(1)
continue
def watch(self):
with self.LOCK:
self._check()
if not self.is_running:
self._restart()
self._rotate_log()
def _check(self):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
print(f"{now} Check service status: {self.name} -> ", end='')
try:
self.process.wait(timeout=1) # 不wait子进程可能无法回收
except subprocess.TimeoutExpired:
pass
if self.is_running:
print(f'running at {self.process.pid}')
else:
print(f'stopped with code: {self.process.returncode}({self.process.pid})')
def _restart(self):
if self.retry > self.max_retry:
logging.info("Service start failed, exit: ", self.name)
self.EXIT_EVENT.set()
return
self.retry += 1
logging.info(f'> Find {self.name} stopped, retry {self.retry}, {self.process.pid}')
self.start()
def _rotate_log(self):
now = datetime.datetime.now()
_time = now.strftime('%H:%M')
if _time != '23:59':
return
backup_date = now.strftime('%Y-%m-%d')
backup_log_dir = os.path.join(self.log_dir, backup_date)
if not os.path.exists(backup_log_dir):
os.mkdir(backup_log_dir)
backup_log_path = os.path.join(backup_log_dir, self.log_filename)
if os.path.isfile(self.log_filepath) and not os.path.isfile(backup_log_path):
logging.info(f'Rotate log file: {self.log_filepath} => {backup_log_path}')
shutil.copy(self.log_filepath, backup_log_path)
with open(self.log_filepath, 'w') as f:
pass
to_delete_date = now - datetime.timedelta(days=self.LOG_KEEP_DAYS)
to_delete_dir = os.path.join(LOG_DIR, to_delete_date.strftime('%Y-%m-%d'))
if os.path.exists(to_delete_dir):
logging.info(f'Remove old log: {to_delete_dir}')
shutil.rmtree(to_delete_dir, ignore_errors=True)
# -- end action --

View File

@ -0,0 +1,25 @@
from ..hands import *
from .base import BaseService
from django.core.cache import cache
__all__ = ['BeatService']
class BeatService(BaseService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
self.lock = cache.lock('beat-distribute-start-lock', expire=60)
@property
def cmd(self):
print("\n- Start Beat as Periodic Task Scheduler")
cmd = [
sys.executable, 'start_celery_beat.py',
]
return cmd
@property
def cwd(self):
return os.path.join(BASE_DIR, 'utils')

View File

@ -0,0 +1,11 @@
from .celery_base import CeleryBaseService
__all__ = ['CeleryAnsibleService']
class CeleryAnsibleService(CeleryBaseService):
def __init__(self, **kwargs):
kwargs['queue'] = 'ansible'
super().__init__(**kwargs)

View File

@ -0,0 +1,38 @@
from ..hands import *
from .base import BaseService
class CeleryBaseService(BaseService):
def __init__(self, queue, num=10, **kwargs):
super().__init__(**kwargs)
self.queue = queue
self.num = num
@property
def cmd(self):
print('\n- Start Celery as Distributed Task Queue: {}'.format(self.queue.capitalize()))
os.environ.setdefault('PYTHONOPTIMIZE', '1')
os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True')
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
server_hostname = os.environ.get("SERVER_HOSTNAME")
if not server_hostname:
server_hostname = '%h'
cmd = [
'celery', 'worker',
'-P', 'threads',
'-A', 'ops',
'-l', 'INFO',
'-c', str(self.num),
'-Q', self.queue,
'-n', f'{self.queue}@{server_hostname}'
]
return cmd
@property
def cwd(self):
return APPS_DIR

View File

@ -0,0 +1,16 @@
from .celery_base import CeleryBaseService
__all__ = ['CeleryDefaultService']
class CeleryDefaultService(CeleryBaseService):
def __init__(self, **kwargs):
kwargs['queue'] = 'celery'
super().__init__(**kwargs)
def start_other(self):
from terminal.startup import CeleryTerminal
celery_terminal = CeleryTerminal()
celery_terminal.start_heartbeat_thread()

View File

@ -0,0 +1,25 @@
from ..hands import *
from .base import BaseService
__all__ = ['DaphneService']
class DaphneService(BaseService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@property
def cmd(self):
print("\n- Start Daphne ASGI WS Server")
cmd = [
'daphne', 'jumpserver.asgi:application',
'-b', HTTP_HOST,
'-p', str(WS_PORT),
]
return cmd
@property
def cwd(self):
return APPS_DIR

View File

@ -0,0 +1,31 @@
from ..hands import *
from .base import BaseService
__all__ = ['FlowerService']
class FlowerService(BaseService):
def __init__(self, **kwargs):
super().__init__(**kwargs)
@property
def cmd(self):
print("\n- Start Flower as Task Monitor")
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
cmd = [
'celery', 'flower',
'-A', 'ops',
'-l', 'INFO',
'--url_prefix=/core/flower',
'--auto_refresh=False',
'--max_tasks=1000',
'--tasks_columns=uuid,name,args,state,received,started,runtime,worker'
]
return cmd
@property
def cwd(self):
return APPS_DIR

View File

@ -0,0 +1,40 @@
from ..hands import *
from .base import BaseService
__all__ = ['GunicornService']
class GunicornService(BaseService):
def __init__(self, **kwargs):
self.worker = kwargs['worker_gunicorn']
super().__init__(**kwargs)
@property
def cmd(self):
print("\n- Start Gunicorn WSGI HTTP Server")
log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s '
bind = f'{HTTP_HOST}:{HTTP_PORT}'
cmd = [
'gunicorn', 'jumpserver.wsgi',
'-b', bind,
'-k', 'gthread',
'--threads', '10',
'-w', str(self.worker),
'--max-requests', '4096',
'--access-logformat', log_format,
'--access-logfile', '-'
]
if DEBUG:
cmd.append('--reload')
return cmd
@property
def cwd(self):
return APPS_DIR
def start_other(self):
from terminal.startup import CoreTerminal
core_terminal = CoreTerminal()
core_terminal.start_heartbeat_thread()

View File

@ -0,0 +1,141 @@
import threading
import signal
import time
import daemon
from daemon import pidfile
from .hands import *
from .hands import __version__
from .services.base import BaseService
class ServicesUtil(object):
def __init__(self, services, daemon_run=False, force_stop=True, daemon_stop=False):
self._services = services
self.daemon_run = daemon_run
self.force_stop = force_stop
self.daemon_stop = daemon_stop
self.EXIT_EVENT = threading.Event()
self.check_interval = 30
self.files_preserve_map = {}
def restart(self):
self.stop()
time.sleep(5)
self.start_and_watch()
def start_and_watch(self):
logging.info(time.ctime())
logging.info(f'JumpServer version {__version__}, more see https://www.jumpserver.org')
self.start()
if self.daemon_run:
self.show_status()
with self.daemon_context:
self.watch()
else:
self.watch()
def start(self):
for service in self._services:
service: BaseService
service.start()
self.files_preserve_map[service.name] = service.log_file
time.sleep(1)
def stop(self):
for service in self._services:
service: BaseService
service.stop(force=self.force_stop)
if self.daemon_stop:
self.stop_daemon()
# -- watch --
def watch(self):
while not self.EXIT_EVENT.is_set():
try:
_exit = self._watch()
if _exit:
break
time.sleep(self.check_interval)
except KeyboardInterrupt:
print('Start stop services')
break
self.clean_up()
def _watch(self):
for service in self._services:
service: BaseService
service.watch()
if service.EXIT_EVENT.is_set():
self.EXIT_EVENT.set()
return True
return False
# -- end watch --
def clean_up(self):
if not self.EXIT_EVENT.is_set():
self.EXIT_EVENT.set()
for service in self._services:
service: BaseService
service.stop(force=self.force_stop)
def show_status(self):
for service in self._services:
service: BaseService
service.show_status()
# -- daemon --
def stop_daemon(self):
if self.daemon_pid and self.daemon_is_running:
os.kill(self.daemon_pid, 15)
self.remove_daemon_pid()
def remove_daemon_pid(self):
if os.path.isfile(self.daemon_pid_filepath):
os.unlink(self.daemon_pid_filepath)
@property
def daemon_pid(self):
if not os.path.isfile(self.daemon_pid_filepath):
return 0
with open(self.daemon_pid_filepath) as f:
try:
pid = int(f.read().strip())
except ValueError:
pid = 0
return pid
@property
def daemon_is_running(self):
try:
os.kill(self.daemon_pid, 0)
except (OSError, ProcessLookupError):
return False
else:
return True
@property
def daemon_pid_filepath(self):
return os.path.join(TMP_DIR, 'jms.pid')
@property
def daemon_log_filepath(self):
return os.path.join(LOG_DIR, 'jms.log')
@property
def daemon_context(self):
daemon_log_file = open(self.daemon_log_filepath, 'a')
context = daemon.DaemonContext(
pidfile=pidfile.TimeoutPIDLockFile(self.daemon_pid_filepath),
signal_map={
signal.SIGTERM: lambda x, y: self.clean_up(),
signal.SIGHUP: 'terminate',
},
stdout=daemon_log_file,
stderr=daemon_log_file,
files_preserve=list(self.files_preserve_map.values()),
detach_process=True,
)
return context
# -- end daemon --

View File

@ -0,0 +1,6 @@
from .services.command import BaseActionCommand, Action
class Command(BaseActionCommand):
help = 'Start services'
action = Action.start.value

View File

@ -0,0 +1,6 @@
from .services.command import BaseActionCommand, Action
class Command(BaseActionCommand):
help = 'Show services status'
action = Action.status.value

View File

@ -0,0 +1,6 @@
from .services.command import BaseActionCommand, Action
class Command(BaseActionCommand):
help = 'Stop services'
action = Action.stop.value

View File

@ -7,8 +7,6 @@ import logging
import datetime
import uuid
from functools import wraps
import string
import random
import time
import ipaddress
import psutil
@ -242,11 +240,8 @@ class lazyproperty:
return value
def get_disk_usage():
partitions = psutil.disk_partitions()
mount_points = [p.mountpoint for p in partitions]
usages = {p: psutil.disk_usage(p) for p in mount_points}
return usages
def get_disk_usage(path):
return psutil.disk_usage(path=path).percent
def get_cpu_load():
@ -257,7 +252,7 @@ def get_cpu_load():
return float(single_cpu_load_1)
def get_memory_used():
def get_memory_usage():
return psutil.virtual_memory().percent

View File

@ -301,7 +301,6 @@ class Config(dict):
'CONNECTION_TOKEN_ENABLED': False,
'ONLY_ALLOW_EXIST_USER_AUTH': False,
'ONLY_ALLOW_AUTH_FROM_SOURCE': False,
'DISK_CHECK_ENABLED': True,
'SESSION_SAVE_EVERY_REQUEST': True,
'SESSION_EXPIRE_AT_BROWSER_CLOSE_FORCE': False,
'FORGOT_PASSWORD_URL': '',

View File

@ -119,7 +119,6 @@ TICKETS_ENABLED = CONFIG.TICKETS_ENABLED
REFERER_CHECK_ENABLED = CONFIG.REFERER_CHECK_ENABLED
CONNECTION_TOKEN_ENABLED = CONFIG.CONNECTION_TOKEN_ENABLED
DISK_CHECK_ENABLED = CONFIG.DISK_CHECK_ENABLED
FORGOT_PASSWORD_URL = CONFIG.FORGOT_PASSWORD_URL

View File

@ -1,11 +1,9 @@
from django.utils.translation import gettext_lazy as _
from django.conf import settings
from notifications.notifications import SystemMessage
from notifications.models import SystemMsgSubscription
from users.models import User
from notifications.backends import BACKEND
from common.utils import get_disk_usage, get_cpu_load, get_memory_used
from terminal.models import Status, Terminal
__all__ = ('ServerPerformanceMessage', 'ServerPerformanceCheckUtil')
@ -31,114 +29,75 @@ class ServerPerformanceMessage(SystemMessage):
class ServerPerformanceCheckUtil(object):
items_mapper = {
'is_alive': {
'default': False,
'max_threshold': False,
'alarm_msg_format': _('[Alive] The terminal is offline: {name}')
},
'disk_usage': {
'default': 0,
'max_threshold': 80,
'alarm_msg_format': _(
'[Disk] Disk used more than {max_threshold}%: => {value} ({name})'
)
},
'memory_usage': {
'default': 0,
'max_threshold': 85,
'alarm_msg_format': _(
'[Memory] Memory used more than {max_threshold}%: => {value} ({name})'
),
},
'cpu_load': {
'default': 0,
'max_threshold': 5,
'alarm_msg_format': _(
'[CPU] CPU load more than {max_threshold}: => {value} ({name})'
),
},
}
def __init__(self):
self.alarm_messages = []
self.disk_usage_threshold = 20 # 80
self.cpu_load_threshold = 1 # 5
self.memory_usage_threshold = 20 # 85
# checking terminal
self._terminals = []
self._terminal = None
def check_and_publish(self):
self.check()
self.publish()
def check(self):
self.alarm_messages = []
self.initial_terminals()
for item, data in self.items_mapper.items():
for self._terminal in self._terminals:
self.check_item(item, data)
def check_item(self, item, data):
default = data['default']
max_threshold = data['max_threshold']
value = getattr(self._terminal.stat, item, default)
print(value, max_threshold, self._terminal.name, self._terminal.id)
if isinstance(value, bool) and value != max_threshold:
return
elif isinstance(value, (int, float)) and value < max_threshold:
return
msg = data['alarm_msg_format']
msg = msg.format(max_threshold=max_threshold, value=value, name=self._terminal.name)
self.alarm_messages.append(msg)
def publish(self):
if not self.alarm_messages:
return
msg = '<br>'.join(self.alarm_messages)
ServerPerformanceMessage(msg).publish()
def check(self):
check_items = ['disk_usage', 'cpu_load', 'memory_usage']
# Check local
if settings.DISK_CHECK_ENABLED:
self.check_items(check_items)
# Check terminal
check_items += ['is_alive']
terminals = self.get_terminals()
for terminal in terminals:
self._terminal = terminal
self.check_items(check_items)
@staticmethod
def get_terminals():
def initial_terminals(self):
terminals = []
for terminal in Terminal.objects.filter(is_accepted=True, is_deleted=False):
for terminal in Terminal.objects.filter(is_deleted=False):
if not terminal.is_active:
continue
terminal.status = Status.get_terminal_latest_stat(terminal)
terminal.stat = Status.get_terminal_latest_stat(terminal)
terminals.append(terminal)
return terminals
def check_items(self, items):
for item in items:
messages = getattr(self, f'check_{item}', lambda: None)()
self.alarm_messages.extend(messages)
def check_is_alive(self):
message = []
if not self._terminal and not self._terminal.is_alive:
name = self._terminal.name
msg = _('The terminal is offline: {}').format(name)
message.append(msg)
return message
def check_disk_usage(self):
messages = []
if self._terminal:
name = self._terminal.name
disk_used = getattr(self._terminal.status, 'disk_used', None)
disks_used = [['/', disk_used]] if disk_used else []
else:
name = 'Core'
disks_used = self._get_local_disk_usage()
for disk, used in disks_used:
if used <= self.disk_usage_threshold:
continue
msg = _("Disk used more than {}%: {} => {} ({})").format(self.disk_usage_threshold, disk, used, name)
messages.append(msg)
return messages
@staticmethod
def _get_local_disk_usage():
disks_usage = []
usages = get_disk_usage()
uncheck_paths = ['/etc', '/boot']
for path, usage in usages.items():
if len(path) > 4 and path[:4] in uncheck_paths:
continue
disks_usage.append([path, usage.percent])
return disks_usage
def check_cpu_load(self):
messages = []
if self._terminal:
name = self._terminal.name
cpu_load = getattr(self._terminal.status, 'cpu_load', 0)
else:
name = 'Core'
cpu_load = get_cpu_load()
if cpu_load > self.cpu_load_threshold:
msg = _('CPU load more than {}: => {} ({})').format(self.cpu_load_threshold, cpu_load, name)
messages.append(msg)
return messages
def check_memory_usage(self):
messages = []
if self._terminal:
name = self._terminal.name
memory_usage = getattr(self._terminal.status, 'memory_usage', 0)
else:
name = 'Core'
memory_usage = get_memory_used()
if memory_usage > self.memory_usage_threshold:
msg = _('Memory used more than {}%: => {} ({})').format(self.memory_usage_threshold, memory_usage, name)
messages.append(msg)
return messages
self._terminals = terminals

View File

@ -45,6 +45,8 @@ class TerminalTypeChoices(TextChoices):
omnidb = 'omnidb', 'OmniDB'
xrdp = 'xrdp', 'Xrdp'
lion = 'lion', 'Lion'
core = 'core', 'Core'
celery = 'celery', 'Celery'
@classmethod
def types(cls):

View File

@ -0,0 +1,18 @@
# Generated by Django 3.1.6 on 2021-08-05 07:52
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('terminal', '0038_task_kwargs'),
]
operations = [
migrations.AlterField(
model_name='terminal',
name='type',
field=models.CharField(choices=[('koko', 'KoKo'), ('guacamole', 'Guacamole'), ('omnidb', 'OmniDB'), ('xrdp', 'Xrdp'), ('lion', 'Lion'), ('core', 'Core'), ('celery', 'Celery')], default='koko', max_length=64, verbose_name='type'),
),
]

View File

@ -54,6 +54,7 @@ class Status(models.Model):
data.pop('terminal', None)
stat = cls(**data)
stat.terminal = terminal
stat.is_alive = terminal.is_alive
return stat
def save(self, force_insert=False, force_update=False, using=None,

View File

@ -1,3 +1,2 @@
# -*- coding: utf-8 -*-
#

75
apps/terminal/startup.py Normal file
View File

@ -0,0 +1,75 @@
import os
import time
import socket
import threading
from django.conf import settings
from common.decorator import Singleton
from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger
from .serializers.terminal import TerminalRegistrationSerializer, StatusSerializer
from .const import TerminalTypeChoices
from .models.terminal import Terminal
__all__ = ['CoreTerminal', 'CeleryTerminal']
class BaseTerminal(object):
def __init__(self, suffix_name, _type):
self.server_hostname = os.environ.get('SERVER_HOSTNAME') or socket.gethostname()
self.name = f'[{suffix_name}] {self.server_hostname}'
self.interval = 30
self.remote_addr = socket.gethostbyname(socket.gethostname())
self.type = _type
def start_heartbeat_thread(self):
print(f'- Start heartbeat thread => ({self.name})')
t = threading.Thread(target=self.start_heartbeat)
t.setDaemon(True)
t.start()
def start_heartbeat(self):
while True:
heartbeat_data = {
'cpu_load': get_cpu_load(),
'memory_used': get_memory_usage(),
'disk_used': get_disk_usage(path=settings.BASE_DIR),
'sessions': [],
}
status_serializer = StatusSerializer(data=heartbeat_data)
status_serializer.is_valid()
status_serializer.validated_data.pop('sessions', None)
terminal = self.get_or_register_terminal()
status_serializer.validated_data['terminal'] = terminal
status_serializer.save()
time.sleep(self.interval)
def get_or_register_terminal(self):
terminal = Terminal.objects.filter(name=self.name, type=self.type, is_deleted=False).first()
if not terminal:
terminal = self.register_terminal()
return terminal
def register_terminal(self):
data = {'name': self.name, 'type': self.type, 'remote_addr': self.remote_addr}
serializer = TerminalRegistrationSerializer(data=data)
serializer.is_valid()
terminal = serializer.save()
return terminal
@Singleton
class CoreTerminal(BaseTerminal):
def __init__(self):
super().__init__(
suffix_name=TerminalTypeChoices.core.label, _type=TerminalTypeChoices.core.value
)
@Singleton
class CeleryTerminal(BaseTerminal):
def __init__(self):
super().__init__(
suffix_name=TerminalTypeChoices.celery.label, _type=TerminalTypeChoices.celery.value
)

489
jms
View File

@ -3,19 +3,11 @@
import os
import subprocess
import threading
import datetime
import logging
import logging.handlers
import psutil
import time
import argparse
import sys
import shutil
import signal
from collections import defaultdict
import daemon
from daemon import pidfile
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, BASE_DIR)
@ -40,29 +32,12 @@ except ImportError as e:
sys.exit(1)
os.environ["PYTHONIOENCODING"] = "UTF-8"
APPS_DIR = os.path.join(BASE_DIR, 'apps')
LOG_DIR = os.path.join(BASE_DIR, 'logs')
TMP_DIR = os.path.join(BASE_DIR, 'tmp')
HTTP_HOST = CONFIG.HTTP_BIND_HOST or '127.0.0.1'
HTTP_PORT = CONFIG.HTTP_LISTEN_PORT or 8080
WS_PORT = CONFIG.WS_LISTEN_PORT or 8082
DEBUG = CONFIG.DEBUG or False
LOG_LEVEL = CONFIG.LOG_LEVEL or 'INFO'
START_TIMEOUT = 40
WORKERS = 4
DAEMON = False
LOG_KEEP_DAYS = 7
logging.basicConfig(
format='%(asctime)s %(message)s', level=logging.INFO,
datefmt='%Y-%m-%d %H:%M:%S'
)
EXIT_EVENT = threading.Event()
LOCK = threading.Lock()
files_preserve = []
STOP_TIMEOUT = 10
logger = logging.getLogger()
try:
@ -76,8 +51,8 @@ def check_database_connection():
os.chdir(os.path.join(BASE_DIR, 'apps'))
for i in range(60):
logging.info("Check database connection ...")
code = subprocess.call("python manage.py showmigrations users ", shell=True)
if code == 0:
_code = subprocess.call("python manage.py showmigrations users ", shell=True)
if _code == 0:
logging.info("Database connect success")
return
time.sleep(1)
@ -86,10 +61,11 @@ def check_database_connection():
def check_migrations():
apps_dir = os.path.join(BASE_DIR, 'apps')
code = subprocess.call("python manage.py showmigrations | grep '\[.\]' | grep -v '\[X\]'", shell=True, cwd=apps_dir)
_apps_dir = os.path.join(BASE_DIR, 'apps')
_cmd = "python manage.py showmigrations | grep '\[.\]' | grep -v '\[X\]'"
_code = subprocess.call(_cmd, shell=True, cwd=_apps_dir)
if code == 1:
if _code == 1:
return
# for i in range(3):
# print("!!! Warning: Has SQL migrations not perform, 有 SQL 变更没有执行")
@ -98,10 +74,10 @@ def check_migrations():
def expire_caches():
apps_dir = os.path.join(BASE_DIR, 'apps')
code = subprocess.call("python manage.py expire_caches", shell=True, cwd=apps_dir)
_apps_dir = os.path.join(BASE_DIR, 'apps')
_code = subprocess.call("python manage.py expire_caches", shell=True, cwd=_apps_dir)
if code == 1:
if _code == 1:
return
@ -115,8 +91,8 @@ def perform_db_migrate():
def collect_static():
logging.info("Collect static files")
os.chdir(os.path.join(BASE_DIR, 'apps'))
command = 'python3 manage.py collectstatic --no-input -c &> /dev/null '
subprocess.call(command, shell=True)
_cmd = 'python3 manage.py collectstatic --no-input -c &> /dev/null '
subprocess.call(_cmd, shell=True)
logging.info("Collect static files done")
@ -127,394 +103,6 @@ def prepare():
expire_caches()
def check_pid(pid):
""" Check For the existence of a unix pid. """
try:
os.kill(pid, 0)
except (OSError, ProcessLookupError):
return False
else:
return True
def get_pid_file_path(s):
return os.path.join(TMP_DIR, '{}.pid'.format(s))
def get_log_file_path(s):
return os.path.join(LOG_DIR, '{}.log'.format(s))
def get_pid_from_file(path):
if os.path.isfile(path):
with open(path) as f:
try:
return int(f.read().strip())
except ValueError:
return 0
return 0
def get_pid(s):
pid_file = get_pid_file_path(s)
return get_pid_from_file(pid_file)
def is_running(s, unlink=True):
pid_file = get_pid_file_path(s)
if os.path.isfile(pid_file):
pid = get_pid(s)
if pid == 0:
return False
elif check_pid(pid):
return True
if unlink:
os.unlink(pid_file)
return False
def parse_service(s):
web_services = ['gunicorn', 'flower', 'daphne']
celery_services = [
"celery_ansible", "celery_default"
]
task_services = celery_services + ['beat']
all_services = web_services + task_services
if s == 'all':
return all_services
elif s == "web":
return web_services
elif s == 'ws':
return ['daphne']
elif s == "task":
return task_services
elif s == "celery":
return celery_services
elif "," in s:
services = set()
for i in s.split(','):
services.update(parse_service(i))
return services
else:
return [s]
def get_start_gunicorn_kwargs():
print("\n- Start Gunicorn WSGI HTTP Server")
prepare()
bind = '{}:{}'.format(HTTP_HOST, HTTP_PORT)
log_format = '%(h)s %(t)s %(L)ss "%(r)s" %(s)s %(b)s '
cmd = [
'gunicorn', 'jumpserver.wsgi',
'-b', bind,
'-k', 'gthread',
'--threads', '10',
'-w', str(WORKERS),
'--max-requests', '4096',
'--access-logformat', log_format,
'--access-logfile', '-'
]
if DEBUG:
cmd.append('--reload')
return {'cmd': cmd, 'cwd': APPS_DIR}
def get_start_daphne_kwargs():
print("\n- Start Daphne ASGI WS Server")
cmd = [
'daphne', 'jumpserver.asgi:application',
'-b', HTTP_HOST,
'-p', str(WS_PORT),
]
return {'cmd': cmd, 'cwd': APPS_DIR}
def get_start_celery_ansible_kwargs():
print("\n- Start Celery as Distributed Task Queue: Ansible")
return get_start_worker_kwargs('ansible', 10)
def get_start_celery_default_kwargs():
print("\n- Start Celery as Distributed Task Queue: Celery")
return get_start_worker_kwargs('celery', 10)
def get_start_worker_kwargs(queue, num):
# Todo: Must set this environment, otherwise not no ansible result return
os.environ.setdefault('PYTHONOPTIMIZE', '1')
os.environ.setdefault('ANSIBLE_FORCE_COLOR', 'True')
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
server_hostname = os.environ.get("SERVER_HOSTNAME")
if not server_hostname:
server_hostname = '%h'
cmd = [
'celery', 'worker',
'-P', 'threads',
'-A', 'ops',
'-l', 'INFO',
'-c', str(num),
'-Q', queue,
'-n', '{}@{}'.format(queue, server_hostname)
]
return {"cmd": cmd, "cwd": APPS_DIR}
def get_start_flower_kwargs():
print("\n- Start Flower as Task Monitor")
if os.getuid() == 0:
os.environ.setdefault('C_FORCE_ROOT', '1')
cmd = [
'celery', 'flower',
'-A', 'ops',
'-l', 'INFO',
'--url_prefix=/core/flower',
'--auto_refresh=False',
'--max_tasks=1000',
'--tasks_columns=uuid,name,args,state,received,started,runtime,worker'
]
return {"cmd": cmd, "cwd": APPS_DIR}
def get_start_beat_kwargs():
print("\n- Start Beat as Periodic Task Scheduler")
utils_dir = os.path.join(BASE_DIR, 'utils')
cmd = [
sys.executable, 'start_celery_beat.py',
]
return {"cmd": cmd, 'cwd': utils_dir}
processes = {}
def watch_services():
max_retry = 3
services_retry = defaultdict(int)
stopped_services = {}
def check_services():
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for s, p in processes.items():
print("{} Check service status: {} -> ".format(now, s), end='')
try:
p.wait(timeout=1) # 不wait子进程可能无法回收
except subprocess.TimeoutExpired:
pass
ok = is_running(s)
if not ok:
stopped_services[s] = ''
print("stopped with code: {}({})".format(p.returncode, p.pid))
else:
print("running at {}".format(p.pid))
stopped_services.pop(s, None)
services_retry.pop(s, None)
def retry_start_stopped_services():
for s in stopped_services:
if services_retry[s] > max_retry:
logging.info("Service start failed, exit: ", s)
EXIT_EVENT.set()
break
p = start_service(s)
logging.info("> Find {} stopped, retry {}, {}".format(
s, services_retry[s] + 1, p.pid)
)
processes[s] = p
services_retry[s] += 1
def rotate_log_if_need():
now = datetime.datetime.now()
tm = now.strftime('%H:%M')
if tm != '23:59':
return
suffix = now.strftime('%Y-%m-%d')
services = list(processes.keys())
services.append('jms')
for s in services:
log_path = get_log_file_path(s)
log_dir = os.path.dirname(log_path)
filename = os.path.basename(log_path)
pre_log_dir = os.path.join(log_dir, suffix)
if not os.path.exists(pre_log_dir):
os.mkdir(pre_log_dir)
pre_log_path = os.path.join(pre_log_dir, filename)
if os.path.isfile(log_path) and not os.path.isfile(pre_log_path):
logging.info("Rotate log file: {} => {}".format(log_path, pre_log_path))
shutil.copy(log_path, pre_log_path)
with open(log_path, 'w') as f:
pass
some_days_ago = now - datetime.timedelta(days=LOG_KEEP_DAYS)
days_ago_dir = os.path.join(LOG_DIR, some_days_ago.strftime('%Y-%m-%d'))
if os.path.exists(days_ago_dir):
logger.info("Remove old log: {}".format(days_ago_dir))
shutil.rmtree(days_ago_dir, ignore_errors=True)
while not EXIT_EVENT.is_set():
try:
with LOCK:
check_services()
retry_start_stopped_services()
rotate_log_if_need()
time.sleep(30)
except KeyboardInterrupt:
print("Start stop service")
time.sleep(1)
break
clean_up()
def start_service(s):
services_kwargs = {
"gunicorn": get_start_gunicorn_kwargs,
"celery_ansible": get_start_celery_ansible_kwargs,
"celery_default": get_start_celery_default_kwargs,
"beat": get_start_beat_kwargs,
"flower": get_start_flower_kwargs,
"daphne": get_start_daphne_kwargs,
}
kwargs = services_kwargs.get(s)()
pid_file = get_pid_file_path(s)
if os.path.isfile(pid_file):
os.unlink(pid_file)
cmd = kwargs.pop('cmd')
log_file_path = get_log_file_path(s)
log_file_f = open(log_file_path, 'a')
files_preserve.append(log_file_f)
kwargs['stderr'] = log_file_f
kwargs['stdout'] = log_file_f
p = subprocess.Popen(cmd, **kwargs)
with open(pid_file, 'w') as f:
f.write(str(p.pid))
return p
def start_services_and_watch(s):
logging.info(time.ctime())
logging.info('JumpServer version {}, more see https://www.jumpserver.org'.format(
__version__)
)
services_set = parse_service(s)
for i in services_set:
if is_running(i):
show_service_status(i)
continue
p = start_service(i)
time.sleep(2)
processes[i] = p
if not DAEMON:
watch_services()
else:
show_service_status(s)
context = get_daemon_context()
with context:
watch_services()
def get_daemon_context():
daemon_pid_file = get_pid_file_path('jms')
daemon_log_f = open(get_log_file_path('jms'), 'a')
files_preserve.append(daemon_log_f)
context = daemon.DaemonContext(
pidfile=pidfile.TimeoutPIDLockFile(daemon_pid_file),
signal_map={
signal.SIGTERM: lambda x, y: clean_up(),
signal.SIGHUP: 'terminate',
},
stdout=daemon_log_f,
stderr=daemon_log_f,
files_preserve=files_preserve,
detach_process=True,
)
return context
def stop_service(srv, sig=15):
services_set = parse_service(srv)
for s in services_set:
if not is_running(s):
show_service_status(s)
continue
print("Stop service: {}".format(s), end='')
pid = get_pid(s)
os.kill(pid, sig)
with LOCK:
process = processes.pop(s, None)
if process is None:
try:
process = psutil.Process(pid)
except:
pass
if process is None:
print("\033[31m No process found\033[0m")
continue
try:
process.wait(1)
except:
pass
for i in range(STOP_TIMEOUT):
if i == STOP_TIMEOUT - 1:
print("\033[31m Error\033[0m")
if not is_running(s):
print("\033[32m Ok\033[0m")
break
else:
time.sleep(1)
continue
if srv == "all":
stop_daemon_service()
def stop_daemon_service():
pid = get_pid('jms')
if pid and check_pid(pid):
os.kill(pid, 15)
def stop_multi_services(services):
for s in services:
stop_service(s, sig=9)
def stop_service_force(s):
stop_service(s, sig=9)
def clean_up():
if not EXIT_EVENT.is_set():
EXIT_EVENT.set()
processes_dump = {k: v for k, v in processes.items()}
for s1, p1 in processes_dump.items():
stop_service(s1)
p1.wait()
def show_service_status(s):
services_set = parse_service(s)
for ns in services_set:
if is_running(ns):
pid = get_pid(ns)
print("{} is running: {}".format(ns, pid))
else:
print("{} is stopped".format(ns))
def upgrade_db():
collect_static()
perform_db_migrate()
@ -536,38 +124,37 @@ if __name__ == '__main__':
help="Action to run"
)
parser.add_argument(
"service", type=str, default="all", nargs="?",
choices=("all", "web", "task", "gunicorn", "celery", "beat", "celery,beat", "flower", "ws"),
"services", type=str, default='all', nargs="*",
choices=("all", "web", "task"),
help="The service to start",
)
parser.add_argument('-d', '--daemon', nargs="?", const=1)
parser.add_argument('-w', '--worker', type=int, nargs="?", const=4)
parser.add_argument('-f', '--force', nargs="?", const=1)
args = parser.parse_args()
if args.daemon:
DAEMON = True
parser.add_argument('-d', '--daemon', nargs="?", const=True)
parser.add_argument('-w', '--worker', type=int, nargs="?", default=4)
parser.add_argument('-f', '--force', nargs="?", const=True)
if args.worker:
WORKERS = args.worker
args = parser.parse_args()
action = args.action
srv = args.service
if action == "start":
start_services_and_watch(srv)
os._exit(0)
elif action == "stop":
print("Stop service")
if args.force:
stop_service_force(srv)
else:
stop_service(srv)
elif action == "restart":
DAEMON = True
stop_service(srv)
time.sleep(5)
start_services_and_watch(srv)
elif action == "upgrade_db":
if action == "upgrade_db":
upgrade_db()
else:
show_service_status(srv)
services = args.services if isinstance(args.services, list) else [args.services]
if action == 'start' and ({'gunicorn', 'all'} & set(services)):
prepare()
services_string = ' '.join(services)
cmd = f'python manage.py {args.action} {services_string}'
if args.daemon:
cmd += ' --daemon'
if args.worker:
cmd += f' --worker {args.worker}'
if args.force:
cmd += ' --force'
apps_dir = os.path.join(BASE_DIR, 'apps')
try:
# processes: main(3s) -> call(0.25s) -> service -> sub-process
code = subprocess.run(cmd, shell=True, cwd=apps_dir)
except KeyboardInterrupt:
time.sleep(1)
pass