diff --git a/spug_api/consumer/executors.py b/spug_api/apps/exec/executors.py similarity index 90% rename from spug_api/consumer/executors.py rename to spug_api/apps/exec/executors.py index e4bcfa1..a92602b 100644 --- a/spug_api/consumer/executors.py +++ b/spug_api/apps/exec/executors.py @@ -1,7 +1,6 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from channels.consumer import SyncConsumer from django_redis import get_redis_connection from libs.ssh import SSH import threading @@ -9,14 +8,13 @@ import socket import json -class SSHExecutor(SyncConsumer): - def exec(self, job): - job = Job(**job) - threading.Thread(target=job.run).start() +def exec_worker_handler(job): + job = Job(**json.loads(job)) + threading.Thread(target=job.run).start() class Job: - def __init__(self, hostname, port, username, pkey, command, token=None, **kwargs): + def __init__(self, hostname, port, username, pkey, command, token=None): self.ssh_cli = SSH(hostname, port, username, pkey) self.key = f'{hostname}:{port}' self.command = command diff --git a/spug_api/apps/exec/management/commands/runexecutor.py b/spug_api/apps/exec/management/commands/runworker.py similarity index 71% rename from spug_api/apps/exec/management/commands/runexecutor.py rename to spug_api/apps/exec/management/commands/runworker.py index d9ef35a..134c26f 100644 --- a/spug_api/apps/exec/management/commands/runexecutor.py +++ b/spug_api/apps/exec/management/commands/runworker.py @@ -7,8 +7,10 @@ from django_redis import get_redis_connection from concurrent.futures import ThreadPoolExecutor from apps.schedule.executors import schedule_worker_handler from apps.monitor.executors import monitor_worker_handler +from apps.exec.executors import exec_worker_handler import logging +EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY @@ -18,16 +20,19 @@ logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s') class Worker: def __init__(self): self.rds = get_redis_connection() - self._executor = ThreadPoolExecutor(max_workers=100) + self._executor = ThreadPoolExecutor(max_workers=1) def run(self): logging.warning('Running worker') while True: - key, job = self.rds.blpop([SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) - if key.decode() == SCHEDULE_WORKER_KEY: + key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) + key = key.decode() + if key == SCHEDULE_WORKER_KEY: self._executor.submit(schedule_worker_handler, job) - else: + elif key == MONITOR_WORKER_KEY: self._executor.submit(monitor_worker_handler, job) + else: + self._executor.submit(exec_worker_handler, job) class Command(BaseCommand): diff --git a/spug_api/apps/exec/views.py b/spug_api/apps/exec/views.py index cc6024f..2bb455b 100644 --- a/spug_api/apps/exec/views.py +++ b/spug_api/apps/exec/views.py @@ -2,10 +2,13 @@ # Copyright: (c) # Released under the AGPL-3.0 License. from django.views.generic import View +from django_redis import get_redis_connection +from django.conf import settings from libs import json_response, JsonParser, Argument, human_datetime -from libs.channel import Channel from apps.exec.models import ExecTemplate from apps.host.models import Host +import uuid +import json class TemplateView(View): @@ -49,9 +52,9 @@ def do_task(request): if error is None: if not request.user.has_host_perm(form.host_ids): return json_response(error='无权访问主机,请联系管理员') - token = Channel.get_token() + token, rds = uuid.uuid4().hex, get_redis_connection() for host in Host.objects.filter(id__in=form.host_ids): - Channel.send_ssh_executor( + data = dict( token=token, hostname=host.hostname, port=host.port, @@ -59,5 +62,6 @@ def do_task(request): command=form.command, pkey=host.private_key, ) + rds.rpush(settings.EXEC_WORKER_KEY, json.dumps(data)) return json_response(token) return json_response(error=error) diff --git a/spug_api/libs/channel.py b/spug_api/libs/channel.py index 5b66a95..d42ce3f 100644 --- a/spug_api/libs/channel.py +++ b/spug_api/libs/channel.py @@ -13,19 +13,6 @@ class Channel: def get_token(): return uuid.uuid4().hex - @staticmethod - def send_ssh_executor(hostname, port, username, command, pkey, token=None): - message = { - 'type': 'exec', - 'token': token, - 'hostname': hostname, - 'port': port, - 'username': username, - 'command': command, - 'pkey': pkey - } - async_to_sync(layer.send)('ssh_exec', message) - @staticmethod def send_notify(title, content): message = { diff --git a/spug_api/spug/routing.py b/spug_api/spug/routing.py index 76314f1..2a90bb7 100644 --- a/spug_api/spug/routing.py +++ b/spug_api/spug/routing.py @@ -1,12 +1,9 @@ # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) # Released under the AGPL-3.0 License. -from channels.routing import ProtocolTypeRouter, ChannelNameRouter -from consumer import routing, executors +from channels.routing import ProtocolTypeRouter +from consumer import routing application = ProtocolTypeRouter({ - 'channel': ChannelNameRouter({ - 'ssh_exec': executors.SSHExecutor, - }), 'websocket': routing.ws_router }) diff --git a/spug_api/spug/settings.py b/spug_api/spug/settings.py index 3fed071..785ed0e 100644 --- a/spug_api/spug/settings.py +++ b/spug_api/spug/settings.py @@ -34,7 +34,6 @@ ALLOWED_HOSTS = ['127.0.0.1'] # Application definition INSTALLED_APPS = [ - 'channels', 'apps.account', 'apps.host', 'apps.setting', @@ -48,6 +47,7 @@ INSTALLED_APPS = [ 'apps.notify', 'apps.repository', 'apps.home', + 'channels', ] MIDDLEWARE = [ @@ -106,6 +106,7 @@ SCHEDULE_KEY = 'spug:schedule' SCHEDULE_WORKER_KEY = 'spug:schedule:worker' MONITOR_KEY = 'spug:monitor' MONITOR_WORKER_KEY = 'spug:monitor:worker' +EXEC_WORKER_KEY = 'spug:exec:worker' REQUEST_KEY = 'spug:request' BUILD_KEY = 'spug:build' REPOS_DIR = os.path.join(BASE_DIR, 'repos')