upgrade worker

pull/330/head
vapao 2021-04-26 10:35:44 +08:00
parent 3553036862
commit 773c2c85fa
6 changed files with 24 additions and 32 deletions

View File

@ -1,7 +1,6 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com> # Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from channels.consumer import SyncConsumer
from django_redis import get_redis_connection from django_redis import get_redis_connection
from libs.ssh import SSH from libs.ssh import SSH
import threading import threading
@ -9,14 +8,13 @@ import socket
import json import json
class SSHExecutor(SyncConsumer): def exec_worker_handler(job):
def exec(self, job): job = Job(**json.loads(job))
job = Job(**job) threading.Thread(target=job.run).start()
threading.Thread(target=job.run).start()
class Job: class Job:
def __init__(self, hostname, port, username, pkey, command, token=None, **kwargs): def __init__(self, hostname, port, username, pkey, command, token=None):
self.ssh_cli = SSH(hostname, port, username, pkey) self.ssh_cli = SSH(hostname, port, username, pkey)
self.key = f'{hostname}:{port}' self.key = f'{hostname}:{port}'
self.command = command self.command = command

View File

@ -7,8 +7,10 @@ from django_redis import get_redis_connection
from concurrent.futures import ThreadPoolExecutor from concurrent.futures import ThreadPoolExecutor
from apps.schedule.executors import schedule_worker_handler from apps.schedule.executors import schedule_worker_handler
from apps.monitor.executors import monitor_worker_handler from apps.monitor.executors import monitor_worker_handler
from apps.exec.executors import exec_worker_handler
import logging import logging
EXEC_WORKER_KEY = settings.EXEC_WORKER_KEY
MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY MONITOR_WORKER_KEY = settings.MONITOR_WORKER_KEY
SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY SCHEDULE_WORKER_KEY = settings.SCHEDULE_WORKER_KEY
@ -18,16 +20,19 @@ logging.basicConfig(level=logging.WARNING, format='%(asctime)s %(message)s')
class Worker: class Worker:
def __init__(self): def __init__(self):
self.rds = get_redis_connection() self.rds = get_redis_connection()
self._executor = ThreadPoolExecutor(max_workers=100) self._executor = ThreadPoolExecutor(max_workers=1)
def run(self): def run(self):
logging.warning('Running worker') logging.warning('Running worker')
while True: while True:
key, job = self.rds.blpop([SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY]) key, job = self.rds.blpop([EXEC_WORKER_KEY, SCHEDULE_WORKER_KEY, MONITOR_WORKER_KEY])
if key.decode() == SCHEDULE_WORKER_KEY: key = key.decode()
if key == SCHEDULE_WORKER_KEY:
self._executor.submit(schedule_worker_handler, job) self._executor.submit(schedule_worker_handler, job)
else: elif key == MONITOR_WORKER_KEY:
self._executor.submit(monitor_worker_handler, job) self._executor.submit(monitor_worker_handler, job)
else:
self._executor.submit(exec_worker_handler, job)
class Command(BaseCommand): class Command(BaseCommand):

View File

@ -2,10 +2,13 @@
# Copyright: (c) <spug.dev@gmail.com> # Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from django.views.generic import View from django.views.generic import View
from django_redis import get_redis_connection
from django.conf import settings
from libs import json_response, JsonParser, Argument, human_datetime from libs import json_response, JsonParser, Argument, human_datetime
from libs.channel import Channel
from apps.exec.models import ExecTemplate from apps.exec.models import ExecTemplate
from apps.host.models import Host from apps.host.models import Host
import uuid
import json
class TemplateView(View): class TemplateView(View):
@ -49,9 +52,9 @@ def do_task(request):
if error is None: if error is None:
if not request.user.has_host_perm(form.host_ids): if not request.user.has_host_perm(form.host_ids):
return json_response(error='无权访问主机,请联系管理员') return json_response(error='无权访问主机,请联系管理员')
token = Channel.get_token() token, rds = uuid.uuid4().hex, get_redis_connection()
for host in Host.objects.filter(id__in=form.host_ids): for host in Host.objects.filter(id__in=form.host_ids):
Channel.send_ssh_executor( data = dict(
token=token, token=token,
hostname=host.hostname, hostname=host.hostname,
port=host.port, port=host.port,
@ -59,5 +62,6 @@ def do_task(request):
command=form.command, command=form.command,
pkey=host.private_key, pkey=host.private_key,
) )
rds.rpush(settings.EXEC_WORKER_KEY, json.dumps(data))
return json_response(token) return json_response(token)
return json_response(error=error) return json_response(error=error)

View File

@ -13,19 +13,6 @@ class Channel:
def get_token(): def get_token():
return uuid.uuid4().hex return uuid.uuid4().hex
@staticmethod
def send_ssh_executor(hostname, port, username, command, pkey, token=None):
message = {
'type': 'exec',
'token': token,
'hostname': hostname,
'port': port,
'username': username,
'command': command,
'pkey': pkey
}
async_to_sync(layer.send)('ssh_exec', message)
@staticmethod @staticmethod
def send_notify(title, content): def send_notify(title, content):
message = { message = {

View File

@ -1,12 +1,9 @@
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug # Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com> # Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License. # Released under the AGPL-3.0 License.
from channels.routing import ProtocolTypeRouter, ChannelNameRouter from channels.routing import ProtocolTypeRouter
from consumer import routing, executors from consumer import routing
application = ProtocolTypeRouter({ application = ProtocolTypeRouter({
'channel': ChannelNameRouter({
'ssh_exec': executors.SSHExecutor,
}),
'websocket': routing.ws_router 'websocket': routing.ws_router
}) })

View File

@ -34,7 +34,6 @@ ALLOWED_HOSTS = ['127.0.0.1']
# Application definition # Application definition
INSTALLED_APPS = [ INSTALLED_APPS = [
'channels',
'apps.account', 'apps.account',
'apps.host', 'apps.host',
'apps.setting', 'apps.setting',
@ -48,6 +47,7 @@ INSTALLED_APPS = [
'apps.notify', 'apps.notify',
'apps.repository', 'apps.repository',
'apps.home', 'apps.home',
'channels',
] ]
MIDDLEWARE = [ MIDDLEWARE = [
@ -106,6 +106,7 @@ SCHEDULE_KEY = 'spug:schedule'
SCHEDULE_WORKER_KEY = 'spug:schedule:worker' SCHEDULE_WORKER_KEY = 'spug:schedule:worker'
MONITOR_KEY = 'spug:monitor' MONITOR_KEY = 'spug:monitor'
MONITOR_WORKER_KEY = 'spug:monitor:worker' MONITOR_WORKER_KEY = 'spug:monitor:worker'
EXEC_WORKER_KEY = 'spug:exec:worker'
REQUEST_KEY = 'spug:request' REQUEST_KEY = 'spug:request'
BUILD_KEY = 'spug:build' BUILD_KEY = 'spug:build'
REPOS_DIR = os.path.join(BASE_DIR, 'repos') REPOS_DIR = os.path.join(BASE_DIR, 'repos')