mirror of https://github.com/openspug/spug
73 lines
2.6 KiB
Python
73 lines
2.6 KiB
Python
# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
|
|
# Copyright: (c) <spug.dev@gmail.com>
|
|
# Released under the AGPL-3.0 License.
|
|
from django_redis import get_redis_connection
|
|
from libs.utils import human_seconds_time
|
|
from libs.ssh import SSH
|
|
import threading
|
|
import socket
|
|
import json
|
|
import time
|
|
|
|
|
|
def exec_worker_handler(job):
|
|
job = Job(**json.loads(job))
|
|
threading.Thread(target=job.run).start()
|
|
|
|
|
|
class Job:
|
|
def __init__(self, token, key, name, hostname, port, username, pkey, command, interpreter, params=None, term=None):
|
|
self.ssh = SSH(hostname, port, username, pkey, term=term)
|
|
self.key = key
|
|
self.command = self._handle_command(command, interpreter)
|
|
self.token = token
|
|
self.rds = get_redis_connection()
|
|
self.rds_key = f'PID:{self.token}:{self.key}'
|
|
self.env = dict(
|
|
SPUG_HOST_ID=str(self.key),
|
|
SPUG_HOST_NAME=name,
|
|
SPUG_HOST_HOSTNAME=hostname,
|
|
SPUG_SSH_PORT=str(port),
|
|
SPUG_SSH_USERNAME=username,
|
|
SPUG_INTERPRETER=interpreter
|
|
)
|
|
if isinstance(params, dict):
|
|
self.env.update({f'_SPUG_{k}': str(v) for k, v in params.items()})
|
|
|
|
def _send(self, message):
|
|
self.rds.publish(self.token, json.dumps(message))
|
|
|
|
def _handle_command(self, command, interpreter):
|
|
if interpreter == 'python':
|
|
attach = 'INTERPRETER=python\ncommand -v python3 &> /dev/null && INTERPRETER=python3'
|
|
return f'{attach}\n$INTERPRETER << EOF\n# -*- coding: UTF-8 -*-\n{command}\nEOF'
|
|
return command
|
|
|
|
def send(self, data):
|
|
self._send({'key': self.key, 'data': data})
|
|
|
|
def send_status(self, code):
|
|
self._send({'key': self.key, 'status': code})
|
|
|
|
def run(self):
|
|
flag = time.time()
|
|
self.send('\r\n\x1b[36m### Executing ...\x1b[0m\r\n')
|
|
code = -1
|
|
try:
|
|
with self.ssh:
|
|
self.rds.set(self.rds_key, self.ssh.get_pid(), 3600)
|
|
for code, out in self.ssh.exec_command_with_stream(self.command, self.env):
|
|
self.send(out)
|
|
human_time = human_seconds_time(time.time() - flag)
|
|
self.send(f'\r\n\x1b[36m** 执行结束,耗时:{human_time} **\x1b[0m')
|
|
except socket.timeout:
|
|
code = 130
|
|
self.send('\r\n\x1b[31m### Time out\x1b[0m')
|
|
except Exception as e:
|
|
code = 131
|
|
self.send(f'\r\n\x1b[31m### Exception {e}\x1b[0m')
|
|
raise e
|
|
finally:
|
|
self.rds.delete(self.rds_key)
|
|
self.send_status(code)
|