mirror of https://github.com/openspug/spug
U api add ssh batch execution
parent
c7a27d81b1
commit
33ac2eff6e
|
@ -0,0 +1,22 @@
|
|||
from channels.generic.websocket import WebsocketConsumer
|
||||
from django_redis import get_redis_connection
|
||||
|
||||
|
||||
class ExecConsumer(WebsocketConsumer):
|
||||
def __init__(self, *args, **kwargs):
|
||||
super().__init__(*args, **kwargs)
|
||||
self.token = self.scope['url_route']['kwargs']['token']
|
||||
self.rds = get_redis_connection()
|
||||
|
||||
def connect(self):
|
||||
self.accept()
|
||||
|
||||
def disconnect(self, code):
|
||||
self.rds.close()
|
||||
|
||||
def receive(self, **kwargs):
|
||||
response = self.rds.blpop(self.token, timeout=5)
|
||||
while response:
|
||||
self.send(text_data=response[1].decode())
|
||||
response = self.rds.blpop(self.token, timeout=5)
|
||||
self.send(text_data='pong')
|
|
@ -0,0 +1,58 @@
|
|||
from channels.consumer import SyncConsumer
|
||||
from apps.setting.utils import AppSetting
|
||||
from django_redis import get_redis_connection
|
||||
from libs.ssh import SSH
|
||||
import threading
|
||||
import socket
|
||||
import json
|
||||
|
||||
|
||||
class SSHExecutor(SyncConsumer):
|
||||
def exec(self, job):
|
||||
pkey = AppSetting.get('private_key')
|
||||
job = Job(pkey=pkey, **job)
|
||||
threading.Thread(target=job.run).start()
|
||||
|
||||
|
||||
class Job:
|
||||
def __init__(self, hostname, port, username, pkey, command, token=None, **kwargs):
|
||||
self.ssh_cli = SSH(hostname, port, username, pkey)
|
||||
self.key = f'{hostname}:{port}'
|
||||
self.command = command
|
||||
self.token = token
|
||||
self.rds_cli = None
|
||||
|
||||
def _send(self, message):
|
||||
if self.rds_cli is None:
|
||||
self.rds_cli = get_redis_connection()
|
||||
self.rds_cli.rpush(self.token, json.dumps(message))
|
||||
|
||||
def send(self, data):
|
||||
message = {'key': self.key, 'type': 'info', 'data': data}
|
||||
self._send(message)
|
||||
|
||||
def send_system(self, data):
|
||||
message = {'key': self.key, 'type': 'system', 'data': data}
|
||||
self._send(message)
|
||||
|
||||
def send_error(self, data):
|
||||
message = {'key': self.key, 'type': 'error', 'data': data}
|
||||
self._send(message)
|
||||
|
||||
def send_status(self, code):
|
||||
message = {'key': self.key, 'status': code}
|
||||
self._send(message)
|
||||
|
||||
def run(self):
|
||||
if not self.token:
|
||||
return self.ssh_cli.exec_command(self.command)
|
||||
self.send_system('### Executing')
|
||||
code = -1
|
||||
try:
|
||||
for code, out in self.ssh_cli.exec_command_with_stream(self.command, timeout=5):
|
||||
self.send(out)
|
||||
except socket.timeout:
|
||||
code = 130
|
||||
self.send_error('### Time out')
|
||||
finally:
|
||||
self.send_status(code)
|
|
@ -0,0 +1,6 @@
|
|||
from django.urls import path
|
||||
from .consumers import *
|
||||
|
||||
websocket_urlpatterns = [
|
||||
path('ws/exec/<str:token>/', ExecConsumer),
|
||||
]
|
|
@ -4,4 +4,5 @@ from .views import *
|
|||
|
||||
urlpatterns = [
|
||||
url(r'template/$', TemplateView.as_view()),
|
||||
url(r'do/$', do_task),
|
||||
]
|
||||
|
|
|
@ -1,6 +1,8 @@
|
|||
from django.views.generic import View
|
||||
from libs import json_response, JsonParser, Argument, human_time
|
||||
from libs.channel import Channel
|
||||
from apps.exec.models import ExecTemplate
|
||||
from apps.host.models import Host
|
||||
|
||||
|
||||
class TemplateView(View):
|
||||
|
@ -34,3 +36,22 @@ class TemplateView(View):
|
|||
if error is None:
|
||||
ExecTemplate.objects.filter(pk=form.id).delete()
|
||||
return json_response(error=error)
|
||||
|
||||
|
||||
def do_task(request):
|
||||
form, error = JsonParser(
|
||||
Argument('host_ids', type=list, filter=lambda x: len(x), help='请选择执行主机'),
|
||||
Argument('command', help='请输入执行命令内容')
|
||||
).parse(request.body)
|
||||
if error is None:
|
||||
token = Channel.get_token()
|
||||
for host in Host.objects.filter(id__in=form.host_ids):
|
||||
Channel.send_ssh_executor(
|
||||
token=token,
|
||||
hostname=host.hostname,
|
||||
port=host.port,
|
||||
username=host.username,
|
||||
command=form.command
|
||||
)
|
||||
return json_response(token)
|
||||
return json_response(error=error)
|
||||
|
|
|
@ -0,0 +1,23 @@
|
|||
from channels.layers import get_channel_layer
|
||||
from asgiref.sync import async_to_sync
|
||||
import uuid
|
||||
|
||||
layer = get_channel_layer()
|
||||
|
||||
|
||||
class Channel:
|
||||
@staticmethod
|
||||
def get_token():
|
||||
return uuid.uuid4().hex
|
||||
|
||||
@staticmethod
|
||||
def send_ssh_executor(hostname, port, username, command, token=None):
|
||||
message = {
|
||||
'type': 'exec',
|
||||
'token': token,
|
||||
'hostname': hostname,
|
||||
'port': port,
|
||||
'username': username,
|
||||
'command': command
|
||||
}
|
||||
async_to_sync(layer.send)('ssh_exec', message)
|
|
@ -30,27 +30,39 @@ class SSH:
|
|||
command = f'mkdir -p -m 700 ~/.ssh && \
|
||||
echo {public_key!r} >> ~/.ssh/authorized_keys && \
|
||||
chmod 600 ~/.ssh/authorized_keys'
|
||||
code, stdout, stderr = self.exec_command(command)
|
||||
code, out = self.exec_command(command)
|
||||
if code != 0:
|
||||
raise Exception(stdout + stderr)
|
||||
raise Exception(out)
|
||||
|
||||
def ping(self):
|
||||
with self:
|
||||
return True
|
||||
|
||||
def exec_command(self, command):
|
||||
def exec_command(self, command, timeout=1800, environment=None):
|
||||
with self as cli:
|
||||
_, stdout, stderr = cli.exec_command(command)
|
||||
return stdout.channel.recv_exit_status(), ''.join(stdout), ''.join(stderr)
|
||||
chan = cli.get_transport().open_session()
|
||||
chan.settimeout(timeout)
|
||||
chan.set_combine_stderr(True)
|
||||
if environment:
|
||||
chan.update_environment(environment)
|
||||
chan.exec_command(command)
|
||||
out = chan.makefile("r", -1)
|
||||
return chan.recv_exit_status(), out.read()
|
||||
|
||||
def exec_command_with_stream(self, command):
|
||||
def exec_command_with_stream(self, command, timeout=1800, environment=None):
|
||||
with self as cli:
|
||||
_, stdout, _ = cli.exec_command(command, get_pty=True)
|
||||
while True:
|
||||
message = stdout.readline()
|
||||
if not message:
|
||||
break
|
||||
yield message
|
||||
chan = cli.get_transport().open_session()
|
||||
chan.settimeout(timeout)
|
||||
chan.set_combine_stderr(True)
|
||||
if environment:
|
||||
chan.update_environment(environment)
|
||||
chan.exec_command(command)
|
||||
stdout = chan.makefile("r", -1)
|
||||
out = stdout.readline()
|
||||
while out:
|
||||
yield chan.exit_status, out
|
||||
out = stdout.readline()
|
||||
return chan.exit_status, out
|
||||
|
||||
def __enter__(self):
|
||||
if self.client is not None:
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
Django==2.2.7
|
||||
channels==2.3.1
|
||||
paramiko==2.6.0
|
||||
django-redis==4.10.0
|
|
@ -1,4 +1,11 @@
|
|||
from channels.routing import ProtocolTypeRouter, URLRouter
|
||||
from channels.routing import ProtocolTypeRouter, ChannelNameRouter, URLRouter
|
||||
from apps.consumer import routing, executors
|
||||
|
||||
application = ProtocolTypeRouter({
|
||||
'channel': ChannelNameRouter({
|
||||
'ssh_exec': executors.SSHExecutor,
|
||||
}),
|
||||
'websocket': URLRouter(
|
||||
routing.websocket_urlpatterns
|
||||
)
|
||||
})
|
||||
|
|
|
@ -58,6 +58,25 @@ DATABASES = {
|
|||
}
|
||||
}
|
||||
|
||||
CACHES = {
|
||||
"default": {
|
||||
"BACKEND": "django_redis.cache.RedisCache",
|
||||
"LOCATION": "redis://127.0.0.1:6379/1",
|
||||
"OPTIONS": {
|
||||
"CLIENT_CLASS": "django_redis.client.DefaultClient",
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
CHANNEL_LAYERS = {
|
||||
"default": {
|
||||
"BACKEND": "channels_redis.core.RedisChannelLayer",
|
||||
"CONFIG": {
|
||||
"hosts": [("127.0.0.1", 6379)],
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
# Internationalization
|
||||
# https://docs.djangoproject.com/en/2.2/topics/i18n/
|
||||
|
||||
|
|
Loading…
Reference in New Issue