mirror of https://github.com/openspug/spug
154 lines
5.1 KiB
Python
154 lines
5.1 KiB
Python
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
|
|
# Copyright: (c) <spug.dev@gmail.com>
|
|
# Released under the AGPL-3.0 License.
|
|
from django.conf import settings
|
|
from django_redis import get_redis_connection
|
|
from asgiref.sync import async_to_sync
|
|
from apps.host.models import Host
|
|
from consumer.utils import BaseConsumer
|
|
from apps.account.utils import has_host_perm
|
|
from libs.utils import str_decode
|
|
from threading import Thread
|
|
import time
|
|
import json
|
|
|
|
|
|
class ComConsumer(BaseConsumer):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
token = self.scope['url_route']['kwargs']['token']
|
|
module = self.scope['url_route']['kwargs']['module']
|
|
if module == 'build':
|
|
self.key = f'{settings.BUILD_KEY}:{token}'
|
|
elif module == 'request':
|
|
self.key = f'{settings.REQUEST_KEY}:{token}'
|
|
elif module == 'pipeline':
|
|
self.key = token
|
|
elif module == 'host':
|
|
self.key = token
|
|
else:
|
|
raise TypeError(f'unknown module for {module}')
|
|
self.rds = get_redis_connection()
|
|
|
|
def disconnect(self, code):
|
|
self.rds.close()
|
|
|
|
def get_response(self, index):
|
|
counter = 0
|
|
while counter < 30:
|
|
response = self.rds.lindex(self.key, index)
|
|
if response:
|
|
return response.decode()
|
|
counter += 1
|
|
time.sleep(0.2)
|
|
|
|
def receive(self, text_data='', **kwargs):
|
|
if text_data.isdigit():
|
|
index = int(text_data)
|
|
response = self.get_response(index)
|
|
while response:
|
|
index += 1
|
|
self.send(text_data=response)
|
|
response = self.get_response(index)
|
|
self.send(text_data='pong')
|
|
|
|
|
|
class SSHConsumer(BaseConsumer):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.id = self.scope['url_route']['kwargs']['id']
|
|
self.chan = None
|
|
self.ssh = None
|
|
|
|
def loop_read(self):
|
|
is_ready, buf_size = False, 4096
|
|
while True:
|
|
data = self.chan.recv(buf_size)
|
|
if not data:
|
|
self.close(3333)
|
|
break
|
|
while self.chan.recv_ready():
|
|
data += self.chan.recv(buf_size)
|
|
try:
|
|
text = data.decode()
|
|
except UnicodeDecodeError:
|
|
try:
|
|
text = data.decode(encoding='GBK')
|
|
except UnicodeDecodeError:
|
|
text = data.decode(errors='ignore')
|
|
|
|
if not is_ready:
|
|
self.send(text_data='\033[2J\033[3J\033[1;1H')
|
|
is_ready = True
|
|
self.send(text_data=text)
|
|
|
|
def receive(self, text_data=None, bytes_data=None):
|
|
data = text_data or bytes_data
|
|
if data and self.chan:
|
|
data = json.loads(data)
|
|
# print('write: {!r}'.format(data))
|
|
resize = data.get('resize')
|
|
if resize and len(resize) == 2:
|
|
self.chan.resize_pty(*resize)
|
|
else:
|
|
self.chan.send(data['data'])
|
|
|
|
def disconnect(self, code):
|
|
if self.chan:
|
|
self.chan.close()
|
|
if self.ssh:
|
|
self.ssh.close()
|
|
|
|
def init(self):
|
|
if has_host_perm(self.user, self.id):
|
|
self.send(text_data='\r\n正在连接至主机 ...')
|
|
host = Host.objects.filter(pk=self.id).first()
|
|
if not host:
|
|
return self.close_with_message('未找到指定主机,请刷新页面重试。')
|
|
|
|
try:
|
|
self.ssh = host.get_ssh().get_client()
|
|
except Exception as e:
|
|
return self.close_with_message(f'连接主机失败: {e}')
|
|
|
|
self.chan = self.ssh.invoke_shell(term='xterm')
|
|
self.chan.transport.set_keepalive(30)
|
|
Thread(target=self.loop_read).start()
|
|
else:
|
|
self.close_with_message('你当前无权限操作该主机,请联系管理员授权。')
|
|
|
|
|
|
class NotifyConsumer(BaseConsumer):
|
|
def init(self):
|
|
async_to_sync(self.channel_layer.group_add)('notify', self.channel_name)
|
|
|
|
def disconnect(self, code):
|
|
async_to_sync(self.channel_layer.group_discard)('notify', self.channel_name)
|
|
|
|
def receive(self, **kwargs):
|
|
self.send(text_data='pong')
|
|
|
|
def notify_message(self, event):
|
|
self.send(text_data=json.dumps(event))
|
|
|
|
|
|
class PubSubConsumer(BaseConsumer):
|
|
def __init__(self, *args, **kwargs):
|
|
super().__init__(*args, **kwargs)
|
|
self.token = self.scope['url_route']['kwargs']['token']
|
|
self.rds = get_redis_connection()
|
|
self.p = self.rds.pubsub(ignore_subscribe_messages=True)
|
|
self.p.subscribe(self.token)
|
|
|
|
def disconnect(self, code):
|
|
self.p.close()
|
|
self.rds.close()
|
|
|
|
def receive(self, **kwargs):
|
|
response = self.p.get_message(timeout=10)
|
|
while response:
|
|
data = str_decode(response['data'])
|
|
self.send(text_data=data)
|
|
response = self.p.get_message(timeout=10)
|
|
self.send(text_data='pong')
|