mirror of https://github.com/jumpserver/jumpserver
perf: 基本完成 adhoc runner
parent
0fb4b52232
commit
a543a2ee37
|
@ -1,6 +1,7 @@
|
||||||
# ~*~ coding: utf-8 ~*~
|
# ~*~ coding: utf-8 ~*~
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import json
|
import json
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
__all__ = ['JMSInventory']
|
__all__ = ['JMSInventory']
|
||||||
|
@ -136,15 +137,19 @@ class JMSInventory:
|
||||||
account = self.select_account(asset)
|
account = self.select_account(asset)
|
||||||
host = self.asset_to_host(asset, account, automation, protocols)
|
host = self.asset_to_host(asset, account, automation, protocols)
|
||||||
hosts.append(host)
|
hosts.append(host)
|
||||||
return hosts
|
|
||||||
|
|
||||||
def write_to_file(self, path):
|
|
||||||
hosts = self.generate()
|
|
||||||
data = {'all': {'hosts': {}}}
|
data = {'all': {'hosts': {}}}
|
||||||
for host in hosts:
|
for host in hosts:
|
||||||
name = host.pop('name')
|
name = host.pop('name')
|
||||||
var = host.pop('vars', {})
|
var = host.pop('vars', {})
|
||||||
host.update(var)
|
host.update(var)
|
||||||
data['all']['hosts'][name] = host
|
data['all']['hosts'][name] = host
|
||||||
|
return data
|
||||||
|
|
||||||
|
def write_to_file(self, path):
|
||||||
|
data = self.generate()
|
||||||
|
path_dir = os.path.dirname(path)
|
||||||
|
if not os.path.exists(path_dir):
|
||||||
|
os.makedirs(path_dir, 0o700, True)
|
||||||
with open(path, 'w') as f:
|
with open(path, 'w') as f:
|
||||||
f.write(json.dumps(data, indent=4))
|
f.write(json.dumps(data, indent=4))
|
||||||
|
|
|
@ -1,7 +1,9 @@
|
||||||
import uuid
|
import uuid
|
||||||
import ansible_runner
|
import os
|
||||||
|
|
||||||
|
import ansible_runner
|
||||||
from django.conf import settings
|
from django.conf import settings
|
||||||
|
|
||||||
from .callback import DefaultCallback
|
from .callback import DefaultCallback
|
||||||
|
|
||||||
|
|
||||||
|
@ -11,7 +13,7 @@ class AdHocRunner:
|
||||||
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
|
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
|
||||||
]
|
]
|
||||||
|
|
||||||
def __init__(self, inventory, module, module_args, pattern='*', project_dir='/tmp/'):
|
def __init__(self, inventory, module, module_args='', pattern='*', project_dir='/tmp/'):
|
||||||
self.id = uuid.uuid4()
|
self.id = uuid.uuid4()
|
||||||
self.inventory = inventory
|
self.inventory = inventory
|
||||||
self.pattern = pattern
|
self.pattern = pattern
|
||||||
|
@ -32,6 +34,12 @@ class AdHocRunner:
|
||||||
if verbosity is None and settings.DEBUG:
|
if verbosity is None and settings.DEBUG:
|
||||||
verbosity = 1
|
verbosity = 1
|
||||||
|
|
||||||
|
if not os.path.exists(self.project_dir):
|
||||||
|
os.mkdir(self.project_dir, 0o755)
|
||||||
|
|
||||||
|
print("inventory: ")
|
||||||
|
print(self.inventory)
|
||||||
|
|
||||||
ansible_runner.run(
|
ansible_runner.run(
|
||||||
host_pattern=self.pattern,
|
host_pattern=self.pattern,
|
||||||
private_data_dir=self.project_dir,
|
private_data_dir=self.project_dir,
|
||||||
|
|
|
@ -3,3 +3,4 @@
|
||||||
|
|
||||||
from .adhoc import *
|
from .adhoc import *
|
||||||
from .celery import *
|
from .celery import *
|
||||||
|
from .playbook import *
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
# ~*~ coding: utf-8 ~*~
|
# ~*~ coding: utf-8 ~*~
|
||||||
|
import os.path
|
||||||
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.utils.translation import ugettext_lazy as _
|
from django.utils.translation import ugettext_lazy as _
|
||||||
|
@ -18,7 +18,12 @@ class AdHoc(BaseAnsibleTask):
|
||||||
pattern = models.CharField(max_length=1024, verbose_name=_("Pattern"), default='all')
|
pattern = models.CharField(max_length=1024, verbose_name=_("Pattern"), default='all')
|
||||||
module = models.CharField(max_length=128, default='shell', verbose_name=_('Module'))
|
module = models.CharField(max_length=128, default='shell', verbose_name=_('Module'))
|
||||||
args = models.CharField(max_length=1024, default='', verbose_name=_('Args'))
|
args = models.CharField(max_length=1024, default='', verbose_name=_('Args'))
|
||||||
last_execution = models.ForeignKey('AdHocExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True)
|
last_execution = models.ForeignKey('AdHocExecution', verbose_name=_("Last execution"),
|
||||||
|
on_delete=models.SET_NULL, null=True, blank=True)
|
||||||
|
|
||||||
|
def get_register_task(self):
|
||||||
|
from ops.tasks import run_adhoc
|
||||||
|
return "run_adhoc_{}".format(self.id), run_adhoc, (str(self.id),), {}
|
||||||
|
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return "{}: {}".format(self.module, self.args)
|
return "{}: {}".format(self.module, self.args)
|
||||||
|
@ -31,10 +36,14 @@ class AdHocExecution(BaseAnsibleExecution):
|
||||||
task = models.ForeignKey('AdHoc', verbose_name=_("Adhoc"), related_name='executions', on_delete=models.CASCADE)
|
task = models.ForeignKey('AdHoc', verbose_name=_("Adhoc"), related_name='executions', on_delete=models.CASCADE)
|
||||||
|
|
||||||
def get_runner(self):
|
def get_runner(self):
|
||||||
return AdHocRunner(
|
inv = self.task.inventory
|
||||||
self.task.inventory, self.task.module, self.task.args,
|
inv.write_to_file(self.inventory_path)
|
||||||
|
|
||||||
|
runner = AdHocRunner(
|
||||||
|
self.inventory_path, self.task.module, module_args=self.task.args,
|
||||||
pattern=self.task.pattern, project_dir=self.private_dir
|
pattern=self.task.pattern, project_dir=self.private_dir
|
||||||
)
|
)
|
||||||
|
return runner
|
||||||
|
|
||||||
class Meta:
|
class Meta:
|
||||||
db_table = "ops_adhoc_execution"
|
db_table = "ops_adhoc_execution"
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
import os.path
|
import os.path
|
||||||
import uuid
|
import uuid
|
||||||
|
import logging
|
||||||
|
|
||||||
from django.db import models
|
from django.db import models
|
||||||
from django.utils.translation import gettext_lazy as _
|
from django.utils.translation import gettext_lazy as _
|
||||||
|
@ -25,7 +26,7 @@ class BaseAnsibleTask(PeriodTaskModelMixin, JMSOrgBaseModel):
|
||||||
@property
|
@property
|
||||||
def inventory(self):
|
def inventory(self):
|
||||||
inv = JMSInventory(self.assets.all(), self.account, self.account_policy)
|
inv = JMSInventory(self.assets.all(), self.account, self.account_policy)
|
||||||
return inv.generate()
|
return inv
|
||||||
|
|
||||||
def get_register_task(self):
|
def get_register_task(self):
|
||||||
raise NotImplemented
|
raise NotImplemented
|
||||||
|
@ -33,11 +34,19 @@ class BaseAnsibleTask(PeriodTaskModelMixin, JMSOrgBaseModel):
|
||||||
def to_json(self):
|
def to_json(self):
|
||||||
raise NotImplemented
|
raise NotImplemented
|
||||||
|
|
||||||
|
def create_execution(self):
|
||||||
|
execution = self.executions.create()
|
||||||
|
return execution
|
||||||
|
|
||||||
|
def run(self, *args, **kwargs):
|
||||||
|
execution = self.create_execution()
|
||||||
|
return execution.start()
|
||||||
|
|
||||||
|
|
||||||
class BaseAnsibleExecution(models.Model):
|
class BaseAnsibleExecution(models.Model):
|
||||||
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
|
id = models.UUIDField(primary_key=True, default=uuid.uuid4)
|
||||||
status = models.CharField(max_length=16, verbose_name=_('Status'), default='running')
|
status = models.CharField(max_length=16, verbose_name=_('Status'), default='running')
|
||||||
task = models.ForeignKey(BaseAnsibleTask, on_delete=models.CASCADE, null=True)
|
task = models.ForeignKey(BaseAnsibleTask, on_delete=models.CASCADE, related_name='executions', null=True)
|
||||||
result = models.JSONField(blank=True, null=True, verbose_name=_('Result'))
|
result = models.JSONField(blank=True, null=True, verbose_name=_('Result'))
|
||||||
summary = models.JSONField(default=dict, verbose_name=_('Summary'))
|
summary = models.JSONField(default=dict, verbose_name=_('Summary'))
|
||||||
creator = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True)
|
creator = models.ForeignKey('users.User', verbose_name=_("Creator"), on_delete=models.SET_NULL, null=True)
|
||||||
|
@ -52,13 +61,40 @@ class BaseAnsibleExecution(models.Model):
|
||||||
def __str__(self):
|
def __str__(self):
|
||||||
return str(self.id)
|
return str(self.id)
|
||||||
|
|
||||||
|
@property
|
||||||
def private_dir(self):
|
def private_dir(self):
|
||||||
uniq = self.date_created.strftime('%Y%m%d_%H%M%S') + '_' + self.short_id
|
uniq = self.date_created.strftime('%Y%m%d_%H%M%S') + '_' + self.short_id
|
||||||
return os.path.join(settings.ANSIBLE_DIR, self.task.name, uniq)
|
return os.path.join(settings.ANSIBLE_DIR, self.task.name, uniq)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def inventory_path(self):
|
||||||
|
return os.path.join(self.private_dir, 'inventory', 'hosts')
|
||||||
|
|
||||||
def get_runner(self):
|
def get_runner(self):
|
||||||
raise NotImplemented
|
raise NotImplemented
|
||||||
|
|
||||||
|
def finish_task(self):
|
||||||
|
self.date_finished = timezone.now()
|
||||||
|
self.save(update_fields=['result', 'status', 'summary', 'date_finished'])
|
||||||
|
self.update_task()
|
||||||
|
|
||||||
|
def set_error(self, error):
|
||||||
|
this = self.__class__.objects.get(id=self.id) # 重新获取一次,避免数据库超时连接超时
|
||||||
|
this.status = 'failed'
|
||||||
|
this.summary['error'] = str(error)
|
||||||
|
this.finish_task()
|
||||||
|
|
||||||
|
def set_result(self, cb):
|
||||||
|
status_mapper = {
|
||||||
|
'successful': 'succeeded',
|
||||||
|
}
|
||||||
|
this = self.__class__.objects.get(id=self.id)
|
||||||
|
this.status = status_mapper.get(cb.status, cb.status)
|
||||||
|
this.summary = cb.summary
|
||||||
|
this.result = cb.result
|
||||||
|
this.finish_task()
|
||||||
|
print("Finished")
|
||||||
|
|
||||||
def update_task(self):
|
def update_task(self):
|
||||||
self.task.last_execution = self
|
self.task.last_execution = self
|
||||||
self.task.date_last_run = timezone.now()
|
self.task.date_last_run = timezone.now()
|
||||||
|
@ -68,16 +104,11 @@ class BaseAnsibleExecution(models.Model):
|
||||||
runner = self.get_runner()
|
runner = self.get_runner()
|
||||||
try:
|
try:
|
||||||
cb = runner.run(**kwargs)
|
cb = runner.run(**kwargs)
|
||||||
self.status = cb.status
|
self.set_result(cb)
|
||||||
self.summary = cb.summary
|
return cb
|
||||||
self.result = cb.result
|
|
||||||
self.date_finished = timezone.now()
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.status = 'failed'
|
logging.error(e, exc_info=True)
|
||||||
self.summary = {'error': str(e)}
|
self.set_error(e)
|
||||||
finally:
|
|
||||||
self.save()
|
|
||||||
self.update_task()
|
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def is_finished(self):
|
def is_finished(self):
|
||||||
|
|
|
@ -27,7 +27,11 @@ class Playbook(BaseAnsibleTask):
|
||||||
last_execution = models.ForeignKey('PlaybookExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True)
|
last_execution = models.ForeignKey('PlaybookExecution', verbose_name=_("Last execution"), on_delete=models.SET_NULL, null=True, blank=True)
|
||||||
|
|
||||||
def get_register_task(self):
|
def get_register_task(self):
|
||||||
pass
|
name = "automation_strategy_period_{}".format(str(self.id)[:8])
|
||||||
|
task = execute_automation_strategy.name
|
||||||
|
args = (str(self.id), Trigger.timing)
|
||||||
|
kwargs = {}
|
||||||
|
return name, task, args, kwargs
|
||||||
|
|
||||||
|
|
||||||
class PlaybookExecution(BaseAnsibleExecution):
|
class PlaybookExecution(BaseAnsibleExecution):
|
||||||
|
|
|
@ -20,7 +20,7 @@ from .celery.utils import (
|
||||||
create_or_update_celery_periodic_tasks, get_celery_periodic_task,
|
create_or_update_celery_periodic_tasks, get_celery_periodic_task,
|
||||||
disable_celery_periodic_task, delete_celery_periodic_task
|
disable_celery_periodic_task, delete_celery_periodic_task
|
||||||
)
|
)
|
||||||
from .models import CommandExecution, CeleryTask
|
from .models import CeleryTask, AdHoc, Playbook
|
||||||
from .notifications import ServerPerformanceCheckUtil
|
from .notifications import ServerPerformanceCheckUtil
|
||||||
|
|
||||||
logger = get_logger(__file__)
|
logger = get_logger(__file__)
|
||||||
|
@ -30,41 +30,48 @@ def rerun_task():
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|
||||||
@shared_task(queue="ansible", verbose_name=_("Run ansible task"))
|
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible task"))
|
||||||
def run_ansible_task(tid, callback=None, **kwargs):
|
def run_adhoc(tid, **kwargs):
|
||||||
"""
|
"""
|
||||||
:param tid: is the tasks serialized data
|
:param tid: is the tasks serialized data
|
||||||
:param callback: callback function name
|
:param callback: callback function name
|
||||||
:return:
|
:return:
|
||||||
"""
|
"""
|
||||||
with tmp_to_root_org():
|
with tmp_to_root_org():
|
||||||
task = get_object_or_none(Task, id=tid)
|
task = get_object_or_none(AdHoc, id=tid)
|
||||||
if not task:
|
if not task:
|
||||||
logger.error("No task found")
|
logger.error("No task found")
|
||||||
return
|
return
|
||||||
with tmp_to_org(task.org):
|
with tmp_to_org(task.org):
|
||||||
result = task.run()
|
execution = task.create_execution()
|
||||||
if callback is not None:
|
try:
|
||||||
subtask(callback).delay(result, task_name=task.name)
|
execution.start(**kwargs)
|
||||||
return result
|
except SoftTimeLimitExceeded:
|
||||||
|
execution.set_error('Run timeout')
|
||||||
|
logger.error("Run adhoc timeout")
|
||||||
|
except Exception as e:
|
||||||
|
execution.set_error(e)
|
||||||
|
logger.error("Start adhoc execution error: {}".format(e))
|
||||||
|
|
||||||
|
|
||||||
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command"))
|
@shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command"))
|
||||||
def run_command_execution(cid, **kwargs):
|
def run_playbook(pid, **kwargs):
|
||||||
with tmp_to_root_org():
|
with tmp_to_root_org():
|
||||||
execution = get_object_or_none(CommandExecution, id=cid)
|
task = get_object_or_none(Playbook, id=pid)
|
||||||
if not execution:
|
if not task:
|
||||||
logger.error("Not found the execution id: {}".format(cid))
|
logger.error("No task found")
|
||||||
return
|
return
|
||||||
with tmp_to_org(execution.run_as.org):
|
|
||||||
|
with tmp_to_org(task.org):
|
||||||
|
execution = task.create_execution()
|
||||||
try:
|
try:
|
||||||
os.environ.update({
|
execution.start(**kwargs)
|
||||||
"TERM_ROWS": kwargs.get("rows", ""),
|
|
||||||
"TERM_COLS": kwargs.get("cols", ""),
|
|
||||||
})
|
|
||||||
execution.run()
|
|
||||||
except SoftTimeLimitExceeded:
|
except SoftTimeLimitExceeded:
|
||||||
logger.error("Run time out")
|
execution.set_error('Run timeout')
|
||||||
|
logger.error("Run playbook timeout")
|
||||||
|
except Exception as e:
|
||||||
|
execution.set_error(e)
|
||||||
|
logger.error("Run playbook execution error: {}".format(e))
|
||||||
|
|
||||||
|
|
||||||
@shared_task
|
@shared_task
|
||||||
|
|
Loading…
Reference in New Issue