From 33ac2eff6e6481c8f850422b8f4b2f8556daecce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=9B=B7=E4=BA=8C=E7=8C=9B?= Date: Sun, 24 Nov 2019 17:05:58 +0800 Subject: [PATCH] U api add ssh batch execution --- spug_api/apps/consumer/__init__.py | 0 spug_api/apps/consumer/consumers.py | 22 +++++++++++ spug_api/apps/consumer/executors.py | 58 +++++++++++++++++++++++++++++ spug_api/apps/consumer/routing.py | 6 +++ spug_api/apps/exec/urls.py | 1 + spug_api/apps/exec/views.py | 21 +++++++++++ spug_api/libs/channel.py | 23 ++++++++++++ spug_api/libs/ssh.py | 36 ++++++++++++------ spug_api/requirements.txt | 3 +- spug_api/spug/routing.py | 9 ++++- spug_api/spug/settings.py | 19 ++++++++++ 11 files changed, 184 insertions(+), 14 deletions(-) create mode 100644 spug_api/apps/consumer/__init__.py create mode 100644 spug_api/apps/consumer/consumers.py create mode 100644 spug_api/apps/consumer/executors.py create mode 100644 spug_api/apps/consumer/routing.py create mode 100644 spug_api/libs/channel.py diff --git a/spug_api/apps/consumer/__init__.py b/spug_api/apps/consumer/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/spug_api/apps/consumer/consumers.py b/spug_api/apps/consumer/consumers.py new file mode 100644 index 0000000..2bb042a --- /dev/null +++ b/spug_api/apps/consumer/consumers.py @@ -0,0 +1,22 @@ +from channels.generic.websocket import WebsocketConsumer +from django_redis import get_redis_connection + + +class ExecConsumer(WebsocketConsumer): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.token = self.scope['url_route']['kwargs']['token'] + self.rds = get_redis_connection() + + def connect(self): + self.accept() + + def disconnect(self, code): + self.rds.close() + + def receive(self, **kwargs): + response = self.rds.blpop(self.token, timeout=5) + while response: + self.send(text_data=response[1].decode()) + response = self.rds.blpop(self.token, timeout=5) + self.send(text_data='pong') diff --git a/spug_api/apps/consumer/executors.py b/spug_api/apps/consumer/executors.py new file mode 100644 index 0000000..029e204 --- /dev/null +++ b/spug_api/apps/consumer/executors.py @@ -0,0 +1,58 @@ +from channels.consumer import SyncConsumer +from apps.setting.utils import AppSetting +from django_redis import get_redis_connection +from libs.ssh import SSH +import threading +import socket +import json + + +class SSHExecutor(SyncConsumer): + def exec(self, job): + pkey = AppSetting.get('private_key') + job = Job(pkey=pkey, **job) + threading.Thread(target=job.run).start() + + +class Job: + def __init__(self, hostname, port, username, pkey, command, token=None, **kwargs): + self.ssh_cli = SSH(hostname, port, username, pkey) + self.key = f'{hostname}:{port}' + self.command = command + self.token = token + self.rds_cli = None + + def _send(self, message): + if self.rds_cli is None: + self.rds_cli = get_redis_connection() + self.rds_cli.rpush(self.token, json.dumps(message)) + + def send(self, data): + message = {'key': self.key, 'type': 'info', 'data': data} + self._send(message) + + def send_system(self, data): + message = {'key': self.key, 'type': 'system', 'data': data} + self._send(message) + + def send_error(self, data): + message = {'key': self.key, 'type': 'error', 'data': data} + self._send(message) + + def send_status(self, code): + message = {'key': self.key, 'status': code} + self._send(message) + + def run(self): + if not self.token: + return self.ssh_cli.exec_command(self.command) + self.send_system('### Executing') + code = -1 + try: + for code, out in self.ssh_cli.exec_command_with_stream(self.command, timeout=5): + self.send(out) + except socket.timeout: + code = 130 + self.send_error('### Time out') + finally: + self.send_status(code) diff --git a/spug_api/apps/consumer/routing.py b/spug_api/apps/consumer/routing.py new file mode 100644 index 0000000..f58379d --- /dev/null +++ b/spug_api/apps/consumer/routing.py @@ -0,0 +1,6 @@ +from django.urls import path +from .consumers import * + +websocket_urlpatterns = [ + path('ws/exec//', ExecConsumer), +] diff --git a/spug_api/apps/exec/urls.py b/spug_api/apps/exec/urls.py index 639d609..5019c92 100644 --- a/spug_api/apps/exec/urls.py +++ b/spug_api/apps/exec/urls.py @@ -4,4 +4,5 @@ from .views import * urlpatterns = [ url(r'template/$', TemplateView.as_view()), + url(r'do/$', do_task), ] diff --git a/spug_api/apps/exec/views.py b/spug_api/apps/exec/views.py index 7b46c23..8eedb7c 100644 --- a/spug_api/apps/exec/views.py +++ b/spug_api/apps/exec/views.py @@ -1,6 +1,8 @@ from django.views.generic import View from libs import json_response, JsonParser, Argument, human_time +from libs.channel import Channel from apps.exec.models import ExecTemplate +from apps.host.models import Host class TemplateView(View): @@ -34,3 +36,22 @@ class TemplateView(View): if error is None: ExecTemplate.objects.filter(pk=form.id).delete() return json_response(error=error) + + +def do_task(request): + form, error = JsonParser( + Argument('host_ids', type=list, filter=lambda x: len(x), help='请选择执行主机'), + Argument('command', help='请输入执行命令内容') + ).parse(request.body) + if error is None: + token = Channel.get_token() + for host in Host.objects.filter(id__in=form.host_ids): + Channel.send_ssh_executor( + token=token, + hostname=host.hostname, + port=host.port, + username=host.username, + command=form.command + ) + return json_response(token) + return json_response(error=error) diff --git a/spug_api/libs/channel.py b/spug_api/libs/channel.py new file mode 100644 index 0000000..2ad3e8f --- /dev/null +++ b/spug_api/libs/channel.py @@ -0,0 +1,23 @@ +from channels.layers import get_channel_layer +from asgiref.sync import async_to_sync +import uuid + +layer = get_channel_layer() + + +class Channel: + @staticmethod + def get_token(): + return uuid.uuid4().hex + + @staticmethod + def send_ssh_executor(hostname, port, username, command, token=None): + message = { + 'type': 'exec', + 'token': token, + 'hostname': hostname, + 'port': port, + 'username': username, + 'command': command + } + async_to_sync(layer.send)('ssh_exec', message) diff --git a/spug_api/libs/ssh.py b/spug_api/libs/ssh.py index af4e9c1..9a8d3d8 100644 --- a/spug_api/libs/ssh.py +++ b/spug_api/libs/ssh.py @@ -30,27 +30,39 @@ class SSH: command = f'mkdir -p -m 700 ~/.ssh && \ echo {public_key!r} >> ~/.ssh/authorized_keys && \ chmod 600 ~/.ssh/authorized_keys' - code, stdout, stderr = self.exec_command(command) + code, out = self.exec_command(command) if code != 0: - raise Exception(stdout + stderr) + raise Exception(out) def ping(self): with self: return True - def exec_command(self, command): + def exec_command(self, command, timeout=1800, environment=None): with self as cli: - _, stdout, stderr = cli.exec_command(command) - return stdout.channel.recv_exit_status(), ''.join(stdout), ''.join(stderr) + chan = cli.get_transport().open_session() + chan.settimeout(timeout) + chan.set_combine_stderr(True) + if environment: + chan.update_environment(environment) + chan.exec_command(command) + out = chan.makefile("r", -1) + return chan.recv_exit_status(), out.read() - def exec_command_with_stream(self, command): + def exec_command_with_stream(self, command, timeout=1800, environment=None): with self as cli: - _, stdout, _ = cli.exec_command(command, get_pty=True) - while True: - message = stdout.readline() - if not message: - break - yield message + chan = cli.get_transport().open_session() + chan.settimeout(timeout) + chan.set_combine_stderr(True) + if environment: + chan.update_environment(environment) + chan.exec_command(command) + stdout = chan.makefile("r", -1) + out = stdout.readline() + while out: + yield chan.exit_status, out + out = stdout.readline() + return chan.exit_status, out def __enter__(self): if self.client is not None: diff --git a/spug_api/requirements.txt b/spug_api/requirements.txt index 6495ab2..ed53582 100644 --- a/spug_api/requirements.txt +++ b/spug_api/requirements.txt @@ -1,3 +1,4 @@ Django==2.2.7 channels==2.3.1 -paramiko==2.6.0 \ No newline at end of file +paramiko==2.6.0 +django-redis==4.10.0 \ No newline at end of file diff --git a/spug_api/spug/routing.py b/spug_api/spug/routing.py index 0d2daae..0dcac66 100644 --- a/spug_api/spug/routing.py +++ b/spug_api/spug/routing.py @@ -1,4 +1,11 @@ -from channels.routing import ProtocolTypeRouter, URLRouter +from channels.routing import ProtocolTypeRouter, ChannelNameRouter, URLRouter +from apps.consumer import routing, executors application = ProtocolTypeRouter({ + 'channel': ChannelNameRouter({ + 'ssh_exec': executors.SSHExecutor, + }), + 'websocket': URLRouter( + routing.websocket_urlpatterns + ) }) diff --git a/spug_api/spug/settings.py b/spug_api/spug/settings.py index 5f8343f..ea4233b 100644 --- a/spug_api/spug/settings.py +++ b/spug_api/spug/settings.py @@ -58,6 +58,25 @@ DATABASES = { } } +CACHES = { + "default": { + "BACKEND": "django_redis.cache.RedisCache", + "LOCATION": "redis://127.0.0.1:6379/1", + "OPTIONS": { + "CLIENT_CLASS": "django_redis.client.DefaultClient", + } + } +} + +CHANNEL_LAYERS = { + "default": { + "BACKEND": "channels_redis.core.RedisChannelLayer", + "CONFIG": { + "hosts": [("127.0.0.1", 6379)], + }, + }, +} + # Internationalization # https://docs.djangoproject.com/en/2.2/topics/i18n/