perf: 修改 ansible 逻辑

pull/8970/head
ibuler 2022-09-30 18:49:45 +08:00 committed by 老广
parent 41589c5305
commit df5e63b3be
8 changed files with 151 additions and 998 deletions

View File

@ -1,334 +1,126 @@
# ~*~ coding: utf-8 ~*~
import datetime
import json
import os
from collections import defaultdict
import ansible.constants as C
from ansible.plugins.callback import CallbackBase
from ansible.plugins.callback.default import CallbackModule
from ansible.plugins.callback.minimal import CallbackModule as CMDCallBackModule
from common.utils.strings import safe_str
class CallbackMixin:
def __init__(self, display=None):
# result_raw example: {
# "ok": {"hostname": {"task_name": {}...},..},
# "failed": {"hostname": {"task_name": {}..}, ..},
# "unreachable: {"hostname": {"task_name": {}, ..}},
# "skipped": {"hostname": {"task_name": {}, ..}, ..},
# }
# results_summary example: {
# "contacted": {"hostname": {"task_name": {}}, "hostname": {}},
# "dark": {"hostname": {"task_name": {}, "task_name": {}},...,},
# "success": True
# }
self.results_raw = dict(
class DefaultCallback:
def __init__(self):
self.result = dict(
ok=defaultdict(dict),
failed=defaultdict(dict),
unreachable=defaultdict(dict),
skippe=defaultdict(dict),
)
self.results_summary = dict(
contacted=defaultdict(dict),
failures=defaultdict(dict),
dark=defaultdict(dict),
success=True
skipped=defaultdict(dict),
)
self.results = {
'raw': self.results_raw,
'summary': self.results_summary,
}
super().__init__()
if display:
self._display = display
self.summary = dict(
ok=[],
failures={},
dark={},
skipped=[],
)
self.status = 'starting'
self.finished = False
cols = os.environ.get("TERM_COLS", None)
self._display.columns = 79
if cols and cols.isdigit():
self._display.columns = int(cols) - 1
def is_success(self):
return self.status != 'successful'
def display(self, msg):
self._display.display(msg)
def gather_result(self, t, result):
self._clean_results(result._result, result._task.action)
host = result._host.get_name()
task_name = result.task_name
task_result = result._result
self.results_raw[t][host][task_name] = task_result
self.clean_result(t, host, task_name, task_result)
def close(self):
if hasattr(self._display, 'close'):
self._display.close()
class AdHocResultCallback(CallbackMixin, CallbackModule, CMDCallBackModule):
"""
Task result Callback
"""
context = None
events = [
'runner_on_failed', 'runner_on_ok',
'runner_on_skipped', 'runner_on_unreachable',
]
def event_handler(self, data):
def event_handler(self, data, **kwargs):
event = data.get('event', None)
print("Event: ", event)
print("Event Data: ", json.dumps(data))
if not event:
return
event_data = data.get('event_data', {})
host = event_data.get('remote_addr', '')
task = event_data.get('task', '')
res = event_data.get('res', {})
handler = getattr(self, event, self.on_any)
handler(event_data, host=host, task=task, res=res)
def clean_result(self, t, host, task_name, task_result):
contacted = self.results_summary["contacted"]
dark = self.results_summary["dark"]
if task_result.get('rc') is not None:
cmd = task_result.get('cmd')
if isinstance(cmd, list):
cmd = " ".join(cmd)
else:
cmd = str(cmd)
detail = {
'cmd': cmd,
'stderr': task_result.get('stderr'),
'stdout': safe_str(str(task_result.get('stdout', ''))),
'rc': task_result.get('rc'),
'delta': task_result.get('delta'),
'msg': task_result.get('msg', '')
}
else:
detail = {
"changed": task_result.get('changed', False),
"msg": task_result.get('msg', '')
}
if t in ("ok", "skipped"):
contacted[host][task_name] = detail
else:
dark[host][task_name] = detail
def v2_runner_on_failed(self, result, ignore_errors=False):
self.results_summary['success'] = False
self.gather_result("failed", result)
if result._task.action in C.MODULE_NO_JSON:
CMDCallBackModule.v2_runner_on_failed(self,
result, ignore_errors=ignore_errors
)
else:
super().v2_runner_on_failed(
result, ignore_errors=ignore_errors
)
def v2_runner_on_ok(self, result):
self.gather_result("ok", result)
if result._task.action in C.MODULE_NO_JSON:
CMDCallBackModule.v2_runner_on_ok(self, result)
else:
super().v2_runner_on_ok(result)
def v2_runner_on_skipped(self, result):
self.gather_result("skipped", result)
super().v2_runner_on_skipped(result)
def v2_runner_on_unreachable(self, result):
self.results_summary['success'] = False
self.gather_result("unreachable", result)
super().v2_runner_on_unreachable(result)
def v2_runner_on_start(self, *args, **kwargs):
pass
def display_skipped_hosts(self):
pass
def display_ok_hosts(self):
pass
def display_failed_stderr(self):
pass
def set_play_context(self, context):
# for k, v in context._attributes.items():
# print("{} ==> {}".format(k, v))
if self.context and isinstance(self.context, dict):
for k, v in self.context.items():
setattr(context, k, v)
class CommandResultCallback(AdHocResultCallback):
"""
Command result callback
results_command: {
"cmd": "",
"stderr": "",
"stdout": "",
"rc": 0,
"delta": 0:0:0.123
}
"""
def __init__(self, display=None, **kwargs):
self.results_command = dict()
super().__init__(display)
def gather_result(self, t, res):
super().gather_result(t, res)
self.gather_cmd(t, res)
def v2_playbook_on_play_start(self, play):
now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')
msg = '$ {} ({})'.format(play.name, now)
self._play = play
self._display.banner(msg)
def v2_runner_on_unreachable(self, result):
self.results_summary['success'] = False
self.gather_result("unreachable", result)
msg = result._result.get("msg")
if not msg:
msg = json.dumps(result._result, indent=4)
self._display.display("%s | FAILED! => \n%s" % (
result._host.get_name(),
msg,
), color=C.COLOR_ERROR)
def v2_runner_on_failed(self, result, ignore_errors=False):
self.results_summary['success'] = False
self.gather_result("failed", result)
msg = result._result.get("msg", '')
stderr = result._result.get("stderr")
if stderr:
msg += '\n' + stderr
module_stdout = result._result.get("module_stdout")
if module_stdout:
msg += '\n' + module_stdout
if not msg:
msg = json.dumps(result._result, indent=4)
self._display.display("%s | FAILED! => \n%s" % (
result._host.get_name(),
msg,
), color=C.COLOR_ERROR)
def v2_playbook_on_stats(self, stats):
pass
def _print_task_banner(self, task):
pass
def gather_cmd(self, t, res):
host = res._host.get_name()
cmd = {}
if t == "ok":
cmd['cmd'] = res._result.get('cmd')
cmd['stderr'] = res._result.get('stderr')
cmd['stdout'] = safe_str(str(res._result.get('stdout', '')))
cmd['rc'] = res._result.get('rc')
cmd['delta'] = res._result.get('delta')
else:
cmd['err'] = "Error: {}".format(res)
self.results_command[host] = cmd
class PlaybookResultCallBack(CallbackBase):
"""
Custom callback model for handlering the output data of
execute playbook file,
Base on the build-in callback plugins of ansible which named `json`.
"""
CALLBACK_VERSION = 2.0
CALLBACK_TYPE = 'stdout'
CALLBACK_NAME = 'Dict'
def __init__(self, display=None):
super(PlaybookResultCallBack, self).__init__(display)
self.results = []
self.output = ""
self.item_results = {} # {"host": []}
def _new_play(self, play):
return {
'play': {
'name': play.name,
'id': str(play._uuid)
},
'tasks': []
def runner_on_ok(self, event_data, host=None, task=None, res=None):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': res.get('rc', 0),
'stdout': res.get('stdout', ''),
}
self.result['ok'][host][task] = detail
def _new_task(self, task):
return {
'task': {
'name': task.get_name(),
},
'hosts': {}
def runer_on_failed(self, event_data, host=None, task=None, res=None, **kwargs):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': res.get('rc', 0),
'stdout': res.get('stdout', ''),
'stderr': ';'.join([res.get('stderr', ''), res.get('msg', '')]).strip(';')
}
self.result['failures'][host][task] = detail
def v2_playbook_on_no_hosts_matched(self):
self.output = "skipping: No match hosts."
def runner_on_skipped(self, event_data, host=None, task=None, **kwargs):
detail = {
'action': event_data.get('task_action', ''),
'res': {},
'rc': 0,
}
self.result['skipped'][host][task] = detail
def v2_playbook_on_no_hosts_remaining(self):
def runner_on_unreachable(self, event_data, host=None, task=None, res=None, **kwargs):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': 255,
'stderr': ';'.join([res.get('stderr', ''), res.get('msg', '')]).strip(';')
}
self.result['dark'][host][task] = detail
def runner_on_start(self, event_data, **kwargs):
pass
def v2_playbook_on_task_start(self, task, is_conditional):
self.results[-1]['tasks'].append(self._new_task(task))
def runer_retry(self, event_data, **kwargs):
pass
def v2_playbook_on_play_start(self, play):
self.results.append(self._new_play(play))
def runner_on_file_diff(self, event_data, **kwargs):
pass
def v2_playbook_on_stats(self, stats):
hosts = sorted(stats.processed.keys())
summary = {}
for h in hosts:
s = stats.summarize(h)
summary[h] = s
def runner_item_on_failed(self, event_data, **kwargs):
pass
if self.output:
pass
else:
self.output = {
'plays': self.results,
'stats': summary
}
def runner_item_on_skipped(self, event_data, **kwargs):
pass
def gather_result(self, res):
if res._task.loop and "results" in res._result and res._host.name in self.item_results:
res._result.update({"results": self.item_results[res._host.name]})
del self.item_results[res._host.name]
def playbook_on_play_start(self, event_data, **kwargs):
pass
self.results[-1]['tasks'][-1]['hosts'][res._host.name] = res._result
def playbook_on_stats(self, event_data, **kwargs):
failed = []
for i in ['dark', 'failures']:
for host, tasks in self.result[i].items():
failed.append(host)
error = ''
for task, detail in tasks.items():
error += f'{task}: {detail["stderr"]};'
self.summary[i][host] = error.strip(';')
self.summary['ok'] = list(set(self.result['ok'].keys()) - set(failed))
self.summary['skipped'] = list(set(self.result['skipped'].keys()) - set(failed))
def v2_runner_on_ok(self, res, **kwargs):
if "ansible_facts" in res._result:
del res._result["ansible_facts"]
def playbook_on_include(self, event_data, **kwargs):
pass
self.gather_result(res)
def playbook_on_notify(self, event_data, **kwargs):
pass
def v2_runner_on_failed(self, res, **kwargs):
self.gather_result(res)
def playbook_on_vars_prompt(self, event_data, **kwargs):
pass
def v2_runner_on_unreachable(self, res, **kwargs):
self.gather_result(res)
def playbook_on_handler_task_start(self, event_data, **kwargs):
pass
def v2_runner_on_skipped(self, res, **kwargs):
self.gather_result(res)
def playbook_on_no_hosts_matched(self, event_data, **kwargs):
pass
def gather_item_result(self, res):
self.item_results.setdefault(res._host.name, []).append(res._result)
def v2_runner_item_on_ok(self, res):
self.gather_item_result(res)
def v2_runner_item_on_failed(self, res):
self.gather_item_result(res)
def v2_runner_item_on_skipped(self, res):
self.gather_item_result(res)
def playbook_on_no_hosts_remaining(self, event_data, **kwargs):
pass
def warning(self, event_data, **kwargs):
pass
def on_any(self, event_data, **kwargs):
pass
def status_handler(self, data, **kwargs):
self.status = data.get('status', 'unknown')

View File

@ -1,69 +0,0 @@
import errno
import sys
import os
from ansible.utils.display import Display
from ansible.utils.color import stringc
from ansible.utils.singleton import Singleton
from .utils import get_ansible_task_log_path
class UnSingleton(Singleton):
def __init__(cls, name, bases, dct):
type.__init__(cls, name, bases, dct)
def __call__(cls, *args, **kwargs):
return type.__call__(cls, *args, **kwargs)
class AdHocDisplay(Display, metaclass=UnSingleton):
def __init__(self, execution_id, verbosity=0):
super().__init__(verbosity=verbosity)
if execution_id:
log_path = get_ansible_task_log_path(execution_id)
else:
log_path = os.devnull
self.log_file = open(log_path, mode='a')
def close(self):
self.log_file.close()
def set_cowsay_info(self):
# 中断 cowsay 的测试,会频繁开启子进程
return
def _write_to_screen(self, msg, stderr):
if not stderr:
screen = sys.stdout
else:
screen = sys.stderr
screen.write(msg)
try:
screen.flush()
except IOError as e:
# Ignore EPIPE in case fileobj has been prematurely closed, eg.
# when piping to "head -n1"
if e.errno != errno.EPIPE:
raise
def _write_to_log_file(self, msg):
# 这里先不 flushlog 文件不需要那么及时。
self.log_file.write(msg)
def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True):
if log_only:
return
if color:
msg = stringc(msg, color)
if not msg.endswith(u'\n'):
msg2 = msg + u'\n'
else:
msg2 = msg
self._write_to_log_file(msg2)
self._write_to_screen(msg2, stderr)

View File

@ -2,161 +2,12 @@
from collections import defaultdict
import json
from ansible.inventory.host import Host
from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager
from ansible.parsing.dataloader import DataLoader
__all__ = [
'BaseHost', 'BaseInventory'
'JMSInventory',
]
class BaseHost(Host):
def __init__(self, host_data):
"""
初始化
:param host_data: {
"name": "",
"ip": "",
"port": "",
# behind is not must be required
"username": "",
"password": "",
"private_key_path": "",
"become": {
"method": "",
"user": "",
"pass": "",
}
"groups": [],
"vars": {},
}
"""
self.host_data = host_data
hostname = host_data.get('name') or host_data.get('ip')
port = host_data.get('port') or 22
super().__init__(hostname, port)
self.__set_required_variables()
self.__set_extra_variables()
def __set_required_variables(self):
host_data = self.host_data
self.set_variable('ansible_host', host_data['address'])
self.set_variable('ansible_port', host_data['port'])
if host_data.get('username'):
self.set_variable('ansible_user', host_data['username'])
# 添加密码和密钥
if host_data.get('password'):
self.set_variable('ansible_ssh_pass', host_data['password'])
if host_data.get('private_key_path'):
self.set_variable('ansible_ssh_private_key_file', host_data['private_key_path'])
# 添加become支持
become = host_data.get("become", False)
if become:
self.set_variable("ansible_become", True)
self.set_variable("ansible_become_method", become.get('method', 'sudo'))
self.set_variable("ansible_become_user", become.get('user', 'root'))
self.set_variable("ansible_become_pass", become.get('pass', ''))
else:
self.set_variable("ansible_become", False)
def __set_extra_variables(self):
for k, v in self.host_data.get('vars', {}).items():
self.set_variable(k, v)
def __repr__(self):
return self.name
class BaseInventory(InventoryManager):
"""
提供生成Ansible inventory对象的方法
"""
loader_class = DataLoader
variable_manager_class = VariableManager
host_manager_class = BaseHost
def __init__(self, host_list=None, group_list=None):
"""
用于生成动态构建Ansible Inventory. super().__init__ 会自动调用
host_list: [{
"name": "",
"address": "",
"port": "",
"username": "",
"password": "",
"private_key": "",
"become": {
"method": "",
"user": "",
"pass": "",
},
"groups": [],
"vars": {},
},
]
group_list: [
{"name: "", children: [""]},
]
:param host_list:
:param group_list
"""
self.host_list = host_list or []
self.group_list = group_list or []
assert isinstance(host_list, list)
self.loader = self.loader_class()
self.variable_manager = self.variable_manager_class()
super().__init__(self.loader)
def get_groups(self):
return self._inventory.groups
def get_group(self, name):
return self._inventory.groups.get(name, None)
def get_or_create_group(self, name):
group = self.get_group(name)
if not group:
self.add_group(name)
return self.get_or_create_group(name)
else:
return group
def parse_groups(self):
for g in self.group_list:
parent = self.get_or_create_group(g.get("name"))
children = [self.get_or_create_group(n) for n in g.get('children', [])]
for child in children:
parent.add_child_group(child)
def parse_hosts(self):
group_all = self.get_or_create_group('all')
ungrouped = self.get_or_create_group('ungrouped')
for host_data in self.host_list:
host = self.host_manager_class(host_data=host_data)
self.hosts[host_data['name']] = host
groups_data = host_data.get('groups')
if groups_data:
for group_name in groups_data:
group = self.get_or_create_group(group_name)
group.add_host(host)
else:
ungrouped.add_host(host)
group_all.add_host(host)
def parse_sources(self, cache=False):
self.parse_groups()
self.parse_hosts()
def get_matched_hosts(self, pattern):
return self.get_hosts(pattern)
class JMSInventory:
def __init__(self, assets, account_username=None, account_policy='smart', host_var_callback=None):
"""

View File

@ -1,65 +0,0 @@
class JMSCallback:
def event_handler(self, data, runner_config):
event = data.get('event', None)
if not event:
return
event_data = data.get('event_data', {})
pass
def runner_on_ok(self, event_data):
pass
def runer_on_failed(self, event_data):
pass
def runner_on_skipped(self, event_data):
pass
def runner_on_unreachable(self, event_data):
pass
def runner_on_start(self, event_data):
pass
def runer_retry(self, event_data):
pass
def runner_on_file_diff(self, event_data):
pass
def runner_item_on_failed(self, event_data):
pass
def runner_item_on_skipped(self, event_data):
pass
def playbook_on_play_start(self, event_data):
pass
def playbook_on_stats(self, event_data):
pass
def playbook_on_include(self, event_data):
pass
def playbook_on_notify(self, event_data):
pass
def playbook_on_vars_prompt(self, event_data):
pass
def playbook_on_handler_task_start(self, event_data):
pass
def playbook_on_no_hosts_matched(self, event_data):
pass
def playbook_on_no_hosts_remaining(self, event_data):
pass
def warning(self):
pass
def status_handler(self):
pass

View File

@ -1,44 +0,0 @@
import uuid
import ansible_runner
from django.conf import settings
class AdHocRunner:
cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
cmd_blacklist = [
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
]
def __init__(self, inventory, module, module_args, pattern='*', project_dir='/tmp/'):
self.id = uuid.uuid4()
self.inventory = inventory
self.pattern = pattern
self.module = module
self.module_args = module_args
self.project_dir = project_dir
def check_module(self):
if self.module not in self.cmd_modules_choices:
return
if self.module_args and self.module_args.split()[0] in self.cmd_blacklist:
raise Exception("command not allowed: {}".format(self.module_args[0]))
def run(self, verbosity=0, **kwargs):
self.check_module()
if verbosity is None and settings.DEBUG:
verbosity = 1
return ansible_runner.run(
host_pattern=self.pattern,
private_data_dir=self.project_dir,
inventory=self.inventory,
module=self.module,
module_args=self.module_args,
verbosity=verbosity,
**kwargs
)
class PlaybookRunner:
pass

View File

@ -1,261 +1,70 @@
# ~*~ coding: utf-8 ~*~
import uuid
import ansible_runner
import os
import shutil
from collections import namedtuple
from ansible import context
from ansible.playbook import Playbook
from ansible.module_utils.common.collections import ImmutableDict
from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.vars.manager import VariableManager
from ansible.parsing.dataloader import DataLoader
from ansible.executor.playbook_executor import PlaybookExecutor
from ansible.playbook.play import Play
import ansible.constants as C
from .callback import (
AdHocResultCallback, PlaybookResultCallBack, CommandResultCallback
)
from common.utils import get_logger
from .exceptions import AnsibleError
from .display import AdHocDisplay
__all__ = ["AdHocRunner", "PlayBookRunner", "CommandRunner"]
C.HOST_KEY_CHECKING = False
logger = get_logger(__name__)
Options = namedtuple('Options', [
'listtags', 'listtasks', 'listhosts', 'syntax', 'connection',
'module_path', 'forks', 'remote_user', 'private_key_file', 'timeout',
'ssh_common_args', 'ssh_extra_args', 'sftp_extra_args',
'scp_extra_args', 'become', 'become_method', 'become_user',
'verbosity', 'check', 'extra_vars', 'playbook_path', 'passwords',
'diff', 'gathering', 'remote_tmp',
])
def get_default_options():
options = dict(
syntax=False,
timeout=30,
connection='ssh',
forks=10,
remote_user='root',
private_key_file=None,
become=None,
become_method=None,
become_user=None,
verbosity=1,
check=False,
diff=False,
gathering='implicit',
remote_tmp='/tmp/.ansible'
)
return options
# JumpServer not use playbook
class PlayBookRunner:
"""
用于执行AnsiblePlaybook的接口.简化Playbook对象的使用.
"""
# Default results callback
results_callback_class = PlaybookResultCallBack
loader_class = DataLoader
variable_manager_class = VariableManager
options = get_default_options()
def __init__(self, inventory=None, options=None):
"""
:param options: Ansible options like ansible.cfg
:param inventory: Ansible inventory
"""
if options:
self.options = options
C.RETRY_FILES_ENABLED = False
self.inventory = inventory
self.loader = self.loader_class()
self.results_callback = self.results_callback_class()
self.playbook_path = options.playbook_path
self.variable_manager = self.variable_manager_class(
loader=self.loader, inventory=self.inventory
)
self.passwords = options.passwords
self.__check()
def __check(self):
if self.options.playbook_path is None or \
not os.path.exists(self.options.playbook_path):
raise AnsibleError(
"Not Found the playbook file: {}.".format(self.options.playbook_path)
)
if not self.inventory.list_hosts('all'):
raise AnsibleError('Inventory is empty')
def run(self):
executor = PlaybookExecutor(
playbooks=[self.playbook_path],
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
passwords={"conn_pass": self.passwords}
)
context.CLIARGS = ImmutableDict(self.options)
if executor._tqm:
executor._tqm._stdout_callback = self.results_callback
executor.run()
executor._tqm.cleanup()
return self.results_callback.output
from django.conf import settings
from .callback import DefaultCallback
class AdHocRunner:
"""
ADHoc Runner接口
"""
results_callback_class = AdHocResultCallback
results_callback = None
loader_class = DataLoader
variable_manager_class = VariableManager
default_options = get_default_options()
command_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
cmd_blacklist = [
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
]
def __init__(self, inventory, options=None):
self.options = self.update_options(options)
def __init__(self, inventory, module, module_args, pattern='*', project_dir='/tmp/'):
self.id = uuid.uuid4()
self.inventory = inventory
self.loader = DataLoader()
self.variable_manager = VariableManager(
loader=self.loader, inventory=self.inventory
)
self.pattern = pattern
self.module = module
self.module_args = module_args
self.project_dir = project_dir
self.cb = DefaultCallback()
self.runner = None
def get_result_callback(self, execution_id=None):
return self.__class__.results_callback_class(display=AdHocDisplay(execution_id))
def check_module(self):
if self.module not in self.cmd_modules_choices:
return
if self.module_args and self.module_args.split()[0] in self.cmd_blacklist:
raise Exception("command not allowed: {}".format(self.module_args[0]))
@staticmethod
def check_module_args(module_name, module_args=''):
if module_name in C.MODULE_REQUIRE_ARGS and not module_args:
err = "No argument passed to '%s' module." % module_name
raise AnsibleError(err)
def run(self, verbosity=0, **kwargs):
self.check_module()
if verbosity is None and settings.DEBUG:
verbosity = 1
def check_pattern(self, pattern):
if not pattern:
raise AnsibleError("Pattern `{}` is not valid!".format(pattern))
if not self.inventory.list_hosts("all"):
raise AnsibleError("Inventory is empty.")
if not self.inventory.list_hosts(pattern):
raise AnsibleError(
"pattern: %s dose not match any hosts." % pattern
)
def clean_args(self, module, args):
if not args:
return ''
if module not in self.command_modules_choices:
return args
if isinstance(args, str):
if args.startswith('executable='):
_args = args.split(' ')
executable, command = _args[0].split('=')[1], ' '.join(_args[1:])
args = {'executable': executable, '_raw_params': command}
else:
args = {'_raw_params': args}
return args
else:
return args
def clean_tasks(self, tasks):
cleaned_tasks = []
for task in tasks:
module = task['action']['module']
args = task['action'].get('args')
cleaned_args = self.clean_args(module, args)
task['action']['args'] = cleaned_args
self.check_module_args(module, cleaned_args)
cleaned_tasks.append(task)
return cleaned_tasks
def update_options(self, options):
_options = {k: v for k, v in self.default_options.items()}
if options and isinstance(options, dict):
_options.update(options)
return _options
def set_control_master_if_need(self, cleaned_tasks):
modules = [task.get('action', {}).get('module') for task in cleaned_tasks]
if {'ping', 'win_ping'} & set(modules):
self.results_callback.context = {
'ssh_args': '-C -o ControlMaster=no'
}
def run(self, tasks, pattern, play_name='Ansible Ad-hoc', gather_facts='no', execution_id=None):
"""
:param tasks: [{'action': {'module': 'shell', 'args': 'ls'}, ...}, ]
:param pattern: all, *, or others
:param play_name: The play name
:param gather_facts:
:return:
"""
self.check_pattern(pattern)
self.results_callback = self.get_result_callback(execution_id)
cleaned_tasks = self.clean_tasks(tasks)
self.set_control_master_if_need(cleaned_tasks)
context.CLIARGS = ImmutableDict(self.options)
play_source = dict(
name=play_name,
hosts=pattern,
gather_facts=gather_facts,
tasks=cleaned_tasks
)
play = Play().load(
play_source,
variable_manager=self.variable_manager,
loader=self.loader,
)
loader = DataLoader()
# used in start callback
playbook = Playbook(loader)
playbook._entries.append(play)
playbook._file_name = '__adhoc_playbook__'
tqm = TaskQueueManager(
ansible_runner.run(
host_pattern=self.pattern,
private_data_dir=self.project_dir,
inventory=self.inventory,
variable_manager=self.variable_manager,
loader=self.loader,
stdout_callback=self.results_callback,
passwords={"conn_pass": self.options.get("password", "")}
module=self.module,
module_args=self.module_args,
verbosity=verbosity,
event_handler=self.cb.event_handler,
status_handler=self.cb.status_handler,
**kwargs
)
try:
tqm.send_callback('v2_playbook_on_start', playbook)
tqm.run(play)
tqm.send_callback('v2_playbook_on_stats', tqm._stats)
return self.results_callback
except Exception as e:
raise AnsibleError(e)
finally:
if tqm is not None:
tqm.cleanup()
shutil.rmtree(C.DEFAULT_LOCAL_TMP, True)
self.results_callback.close()
return self.cb
class CommandRunner(AdHocRunner):
results_callback_class = CommandResultCallback
modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
class PlaybookRunner:
def __init__(self, inventory, playbook, project_dir='/tmp/'):
self.id = uuid.uuid4()
self.inventory = inventory
self.playbook = playbook
self.project_dir = project_dir
self.cb = DefaultCallback()
def execute(self, cmd, pattern, module='shell'):
if module and module not in self.modules_choices:
raise AnsibleError("Module should in {}".format(self.modules_choices))
tasks = [
{"action": {"module": module, "args": cmd}}
]
return self.run(tasks, pattern, play_name=cmd)
def run(self, verbosity=0, **kwargs):
if verbosity is None and settings.DEBUG:
verbosity = 1
ansible_runner.run(
private_data_dir=self.project_dir,
inventory=self.inventory,
playbook=self.playbook,
verbosity=verbosity,
event_handler=self.cb.event_handler,
status_handler=self.cb.status_handler,
**kwargs
)
return self.cb

View File

@ -1,63 +0,0 @@
# -*- coding: utf-8 -*-
#
import sys
import unittest
sys.path.insert(0, '../..')
from ops.ansible.inventory import BaseInventory
class TestJMSInventory(unittest.TestCase):
def setUp(self):
host_list = [{
"name": "testserver1",
"ip": "102.1.1.1",
"port": 22,
"username": "root",
"password": "password",
"private_key": "/tmp/private_key",
"become": {
"method": "sudo",
"user": "root",
"pass": None,
},
"groups": ["group1", "group2"],
"vars": {"sexy": "yes"},
}, {
"name": "testserver2",
"ip": "8.8.8.8",
"port": 2222,
"username": "root",
"password": "password",
"private_key": "/tmp/private_key",
"become": {
"method": "su",
"user": "root",
"pass": "123",
},
"groups": ["group3", "group4"],
"vars": {"love": "yes"},
}]
self.inventory = BaseInventory(host_list=host_list)
def test_hosts(self):
print("#"*10 + "Hosts" + "#"*10)
for host in self.inventory.hosts:
print(host)
def test_groups(self):
print("#" * 10 + "Groups" + "#" * 10)
for group in self.inventory.groups:
print(group)
def test_group_all(self):
print("#" * 10 + "all group hosts" + "#" * 10)
group = self.inventory.get_group('all')
print(group.hosts)
if __name__ == '__main__':
unittest.main()

View File

@ -1,58 +0,0 @@
# -*- coding: utf-8 -*-
#
import unittest
import sys
sys.path.insert(0, "../..")
from ops.ansible.runner import AdHocRunner, CommandRunner
from ops.ansible.inventory import BaseInventory
class TestAdHocRunner(unittest.TestCase):
def setUp(self):
host_data = [
{
"name": "testserver",
"ip": "192.168.244.185",
"port": 22,
"username": "root",
"password": "redhat",
},
]
inventory = BaseInventory(host_data)
self.runner = AdHocRunner(inventory)
def test_run(self):
tasks = [
{"action": {"module": "shell", "args": "ls"}, "name": "run_cmd"},
{"action": {"module": "shell", "args": "whoami"}, "name": "run_whoami"},
]
ret = self.runner.run(tasks, "all")
print(ret.results_summary)
print(ret.results_raw)
class TestCommandRunner(unittest.TestCase):
def setUp(self):
host_data = [
{
"name": "testserver",
"ip": "192.168.244.168",
"port": 22,
"username": "root",
"password": "redhat",
},
]
inventory = BaseInventory(host_data)
self.runner = CommandRunner(inventory)
def test_execute(self):
res = self.runner.execute('ls', 'all')
print(res.results_command)
print(res.results_raw)
if __name__ == "__main__":
unittest.main()