spug/spug_api/apps/schedule/executors.py

71 lines
2.5 KiB
Python

# Copyright: (c) OpenSpug Organization. https://github.com/openspug/spug
# Copyright: (c) <spug.dev@gmail.com>
# Released under the AGPL-3.0 License.
from libs.ssh import AuthenticationException
from django.db import close_old_connections, transaction
from apps.host.models import Host
from apps.schedule.models import History, Task
from apps.schedule.utils import send_fail_notify
import subprocess
import socket
import time
import json
def local_executor(command):
code, out, now = 1, None, time.time()
task = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
try:
code = task.wait(3600)
out = task.stdout.read() + task.stderr.read()
out = out.decode()
except subprocess.TimeoutExpired:
# task.kill()
out = 'timeout, wait more than 1 hour'
return code, round(time.time() - now, 3), out
def host_executor(host, command):
code, out, now = 1, None, time.time()
try:
with host.get_ssh() as ssh:
code, out = ssh.exec_command_raw(command)
except AuthenticationException:
out = 'ssh authentication fail'
except socket.error as e:
out = f'network error {e}'
return code, round(time.time() - now, 3), out
def dispatch_job(host_id, interpreter, command):
if interpreter == 'python':
attach = 'INTERPRETER=python\ncommand -v python3 &> /dev/null && INTERPRETER=python3'
command = f'{attach}\n$INTERPRETER << EOF\n# -*- coding: UTF-8 -*-\n{command}\nEOF'
if host_id == 'local':
code, duration, out = local_executor(command)
else:
host = Host.objects.filter(pk=host_id).first()
if not host:
code, duration, out = 1, 0, f'unknown host id for {host_id!r}'
else:
code, duration, out = host_executor(host, command)
return code, duration, out
def schedule_worker_handler(job):
history_id, host_id, interpreter, command = json.loads(job)
code, duration, out = dispatch_job(host_id, interpreter, command)
close_old_connections()
with transaction.atomic():
history = History.objects.select_for_update().get(pk=history_id)
output = json.loads(history.output)
output[str(host_id)] = [code, duration, out]
history.output = json.dumps(output)
if all(output.values()):
history.status = '1' if sum(x[0] for x in output.values()) == 0 else '2'
history.save()
if history.status == '2':
task = Task.objects.get(pk=history.task_id)
send_fail_notify(task)