Browse Source

perf: ansible runner in isolated mode (#13434)

perf: use new ansible runner

perf: change lock

Co-authored-by: ibuler <ibuler@qq.com>
pull/13474/head
fit2bot 5 months ago committed by GitHub
parent
commit
165d030c8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      Dockerfile-ce
  2. 118
      apps/ops/ansible/callback.py
  3. 14
      apps/ops/ansible/runner.py
  4. 19
      poetry.lock
  5. 4
      pyproject.toml
  6. 4
      receptor

1
Dockerfile-ce

@ -94,6 +94,7 @@ ARG TOOLS=" \
sshpass \
telnet \
vim \
bubblewrap \
wget"
ARG APT_MIRROR=http://mirrors.ustc.edu.cn

118
apps/ops/ansible/callback.py

@ -2,16 +2,18 @@ import os
from collections import defaultdict
from functools import reduce
from django.conf import settings
class DefaultCallback:
STATUS_MAPPER = {
'successful': 'success',
'failure': 'failed',
'failed': 'failed',
'running': 'running',
'pending': 'pending',
'timeout': 'timeout',
'unknown': 'unknown'
"successful": "success",
"failure": "failed",
"failed": "failed",
"running": "running",
"pending": "pending",
"timeout": "timeout",
"unknown": "unknown",
}
def __init__(self):
@ -28,7 +30,7 @@ class DefaultCallback:
dark={},
skipped=[],
)
self.status = 'running'
self.status = "running"
self.finished = False
self.local_pid = 0
self.private_data_dir = None
@ -42,61 +44,63 @@ class DefaultCallback:
return results
def is_success(self):
return self.status != 'success'
return self.status != "success"
def event_handler(self, data, **kwargs):
event = data.get('event', None)
event = data.get("event", None)
if not event:
return
pid = data.get('pid', None)
pid = data.get("pid", None)
if pid:
self.write_pid(pid)
event_data = data.get('event_data', {})
host = event_data.get('remote_addr', '')
task = event_data.get('task', '')
res = event_data.get('res', {})
event_data = data.get("event_data", {})
host = event_data.get("remote_addr", "")
task = event_data.get("task", "")
res = event_data.get("res", {})
handler = getattr(self, event, self.on_any)
handler(event_data, host=host, task=task, res=res)
def runner_on_ok(self, event_data, host=None, task=None, res=None):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': res.get('rc', 0),
'stdout': res.get('stdout', ''),
"action": event_data.get("task_action", ""),
"res": res,
"rc": res.get("rc", 0),
"stdout": res.get("stdout", ""),
}
self.result['ok'][host][task] = detail
self.result["ok"][host][task] = detail
def runner_on_skipped(self, event_data, host=None, task=None, **kwargs):
detail = {
'action': event_data.get('task_action', ''),
'res': {},
'rc': 0,
"action": event_data.get("task_action", ""),
"res": {},
"rc": 0,
}
self.result['skipped'][host][task] = detail
self.result["skipped"][host][task] = detail
def runner_on_failed(self, event_data, host=None, task=None, res=None, **kwargs):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': res.get('rc', 0),
'stdout': res.get('stdout', ''),
'stderr': ';'.join([res.get('stderr', ''), res.get('msg', '')]).strip(';')
"action": event_data.get("task_action", ""),
"res": res,
"rc": res.get("rc", 0),
"stdout": res.get("stdout", ""),
"stderr": ";".join([res.get("stderr", ""), res.get("msg", "")]).strip(";"),
}
ignore_errors = event_data.get('ignore_errors', False)
error_key = 'ignored' if ignore_errors else 'failures'
ignore_errors = event_data.get("ignore_errors", False)
error_key = "ignored" if ignore_errors else "failures"
self.result[error_key][host][task] = detail
def runner_on_unreachable(self, event_data, host=None, task=None, res=None, **kwargs):
def runner_on_unreachable(
self, event_data, host=None, task=None, res=None, **kwargs
):
detail = {
'action': event_data.get('task_action', ''),
'res': res,
'rc': 255,
'stderr': ';'.join([res.get('stderr', ''), res.get('msg', '')]).strip(';')
"action": event_data.get("task_action", ""),
"res": res,
"rc": 255,
"stderr": ";".join([res.get("stderr", ""), res.get("msg", "")]).strip(";"),
}
self.result['dark'][host][task] = detail
self.result["dark"][host][task] = detail
def runner_on_start(self, event_data, **kwargs):
pass
@ -117,21 +121,26 @@ class DefaultCallback:
pass
def playbook_on_stats(self, event_data, **kwargs):
error_func = lambda err, task_detail: err + f"{task_detail[0]}: {task_detail[1]['stderr']};"
for tp in ['dark', 'failures']:
error_func = (
lambda err, task_detail: err
+ f"{task_detail[0]}: {task_detail[1]['stderr']};"
)
for tp in ["dark", "failures"]:
for host, tasks in self.result[tp].items():
error = reduce(error_func, tasks.items(), '').strip(';')
error = reduce(error_func, tasks.items(), "").strip(";")
self.summary[tp][host] = error
failures = list(self.result['failures'].keys())
dark_or_failures = list(self.result['dark'].keys()) + failures
failures = list(self.result["failures"].keys())
dark_or_failures = list(self.result["dark"].keys()) + failures
for host, tasks in self.result.get('ignored', {}).items():
ignore_errors = reduce(error_func, tasks.items(), '').strip(';')
for host, tasks in self.result.get("ignored", {}).items():
ignore_errors = reduce(error_func, tasks.items(), "").strip(";")
if host in failures:
self.summary['failures'][host] += ignore_errors
self.summary["failures"][host] += ignore_errors
self.summary['ok'] = list(set(self.result['ok'].keys()) - set(dark_or_failures))
self.summary['skipped'] = list(set(self.result['skipped'].keys()) - set(dark_or_failures))
self.summary["ok"] = list(set(self.result["ok"].keys()) - set(dark_or_failures))
self.summary["skipped"] = list(
set(self.result["skipped"].keys()) - set(dark_or_failures)
)
def playbook_on_include(self, event_data, **kwargs):
pass
@ -151,6 +160,13 @@ class DefaultCallback:
def playbook_on_no_hosts_remaining(self, event_data, **kwargs):
pass
def playbook_on_start(self, event_data, **kwargs):
if settings.DEBUG_DEV:
print("DEBUG: delete inventory: ", os.path.join(self.private_data_dir, 'inventory'))
inventory_path = os.path.join(self.private_data_dir, 'inventory', 'hosts')
if os.path.exists(inventory_path):
os.remove(inventory_path)
def warning(self, event_data, **kwargs):
pass
@ -158,11 +174,11 @@ class DefaultCallback:
pass
def status_handler(self, data, **kwargs):
status = data.get('status', '')
self.status = self.STATUS_MAPPER.get(status, 'unknown')
status = data.get("status", "")
self.status = self.STATUS_MAPPER.get(status, "unknown")
self.private_data_dir = data.get("private_data_dir", None)
def write_pid(self, pid):
pid_filepath = os.path.join(self.private_data_dir, 'local.pid')
with open(pid_filepath, 'w') as f:
pid_filepath = os.path.join(self.private_data_dir, "local.pid")
with open(pid_filepath, "w") as f:
f.write(str(pid))

14
apps/ops/ansible/runner.py

@ -4,9 +4,10 @@ import uuid
from django.conf import settings
from django.utils._os import safe_join
from .interface import interface
from .callback import DefaultCallback
from .exception import CommandInBlackListException
from .interface import interface
from ..utils import get_ansible_log_verbosity
__all__ = ['AdHocRunner', 'PlaybookRunner', 'SuperPlaybookRunner', 'UploadFileRunner']
@ -44,7 +45,7 @@ class AdHocRunner:
def set_local_connection(self):
if self.job_module in self.need_local_connection_modules_choices:
self.envs.update({"LOCAL_CONNECTION_ENABLED": "1"})
self.envs.update({"ANSIBLE_SUPER_MODE": "1"})
def run(self, verbosity=0, **kwargs):
self.check_module()
@ -84,6 +85,7 @@ class PlaybookRunner:
if not callback:
callback = DefaultCallback()
self.cb = callback
self.isolate = True
self.envs = {}
def copy_playbook(self):
@ -101,6 +103,11 @@ class PlaybookRunner:
if os.path.exists(private_env):
shutil.rmtree(private_env)
kwargs = dict(kwargs)
if self.isolate:
kwargs['process_isolation'] = True
kwargs['process_isolation_executable'] = 'bwrap'
interface.run(
private_data_dir=self.project_dir,
inventory=self.inventory,
@ -118,7 +125,8 @@ class PlaybookRunner:
class SuperPlaybookRunner(PlaybookRunner):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.envs = {"LOCAL_CONNECTION_ENABLED": "1"}
self.envs = {"ANSIBLE_SUPER_MODE": "1"}
self.isolate = False
class UploadFileRunner:

19
poetry.lock

@ -480,7 +480,7 @@ description = "Radically simple IT automation"
optional = false
python-versions = ">=3.9"
files = [
{file = "v2.14.1.4.zip", hash = "sha256:1805e06391223ac6198229a18f501f0e001e66c3b334cb7c5061e0ac810297d6"},
{file = "v2.14.1.5.zip", hash = "sha256:3f05961f2902bc5228ebd44875aadc18cff02f3190791f214a40b2adb07735e1"},
]
[package.dependencies]
@ -492,17 +492,16 @@ resolvelib = ">=0.5.3,<0.9.0"
[package.source]
type = "url"
url = "https://github.com/jumpserver/ansible/archive/refs/tags/v2.14.1.4.zip"
url = "https://github.com/jumpserver/ansible/archive/refs/tags/v2.14.1.5.zip"
[[package]]
name = "ansible-runner"
version = "2.3.3"
version = "2.4.0.2"
description = "\"Consistent Ansible Python API and CLI with container and process isolation runtime capabilities\""
optional = false
python-versions = ">=3.8"
python-versions = ">=3.9"
files = [
{file = "ansible-runner-2.3.3.tar.gz", hash = "sha256:38ff635e4b94791de2956c81e265836ec4965b30e9ee35d72fcf3271dc46b98b"},
{file = "ansible_runner-2.3.3-py3-none-any.whl", hash = "sha256:c57ae0d096760d66b2897b0f9009856c7b83fd5428dcb831f470cba348346396"},
{file = "2.4.0.2.zip", hash = "sha256:e8c995ab977a8cb354b64ac84c238699dbfcd28f7157bd169572a05d2edffe6c"},
]
[package.dependencies]
@ -510,12 +509,10 @@ packaging = "*"
pexpect = ">=4.5"
python-daemon = "*"
pyyaml = "*"
six = "*"
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "tsinghua"
type = "url"
url = "https://github.com/jumpserver-dev/ansible-runner/archive/refs/tags/2.4.0.2.zip"
[[package]]
name = "anyio"
@ -2877,6 +2874,7 @@ googleapis-common-protos = ">=1.56.2,<2.0.dev0"
grpcio = {version = ">=1.49.1,<2.0dev", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}
grpcio-status = {version = ">=1.49.1,<2.0.dev0", optional = true, markers = "python_version >= \"3.11\" and extra == \"grpc\""}
proto-plus = ">=1.22.3,<2.0.0dev"
protobuf = ">=3.19.5,<3.20.0 || >3.20.0,<3.20.1 || >3.20.1,<4.21.0 || >4.21.0,<4.21.1 || >4.21.1,<4.21.2 || >4.21.2,<4.21.3 || >4.21.3,<4.21.4 || >4.21.4,<4.21.5 || >4.21.5,<5.0.0.dev0"
requests = ">=2.18.0,<3.0.0.dev0"
@ -8023,3 +8021,4 @@ reference = "tsinghua"
lock-version = "2.0"
python-versions = "^3.11"
content-hash = "de1b3ef35e2119f1ea8a2a2aee7a9586d055ed04837bb9f6f84083aa24c85b85"

4
pyproject.toml

@ -11,9 +11,9 @@ python = "^3.11"
cython = "3.0.0"
aiofiles = "23.1.0"
amqp = "5.1.1"
ansible-core = { url = "https://github.com/jumpserver/ansible/archive/refs/tags/v2.14.1.4.zip" }
ansible-core = { url = "https://github.com/jumpserver-dev/ansible/archive/refs/tags/v2.14.1.6.zip" }
ansible = "7.1.0"
ansible-runner = "2.3.3"
ansible-runner = { url = "https://github.com/jumpserver-dev/ansible-runner/archive/refs/tags/2.4.0.1.zip" }
asn1crypto = "1.5.1"
bcrypt = "4.0.1"
billiard = "4.1.0"

4
receptor

@ -3,10 +3,10 @@
import argparse
import logging
import shutil
import subprocess
import os
import shutil
import signal
import subprocess
import tempfile
from apps.libs.process.ssh import kill_ansible_ssh_process

Loading…
Cancel
Save