diff --git a/apps/assets/automations/base/manager.py b/apps/assets/automations/base/manager.py index 817952d46..4ebed3118 100644 --- a/apps/assets/automations/base/manager.py +++ b/apps/assets/automations/base/manager.py @@ -17,6 +17,61 @@ from ops.ansible import JMSInventory, PlaybookRunner, DefaultCallback logger = get_logger(__name__) +class SSHTunnelManager: + def __init__(self, *args, **kwargs): + self.gateway_servers = dict() + + @staticmethod + def file_to_json(path): + with open(path, 'r') as f: + d = json.load(f) + return d + + @staticmethod + def json_to_file(path, data): + with open(path, 'w') as f: + json.dump(data, f, indent=4, sort_keys=True) + + def local_gateway_prepare(self, runner): + info = self.file_to_json(runner.inventory) + servers, not_valid = [], [] + for k, host in info['all']['hosts'].items(): + jms_asset, jms_gateway = host.get('jms_asset'), host.get('gateway') + if not jms_gateway: + continue + try: + server = SSHTunnelForwarder( + (jms_gateway['address'], jms_gateway['port']), + ssh_username=jms_gateway['username'], + ssh_password=jms_gateway['secret'], + ssh_pkey=jms_gateway['private_key_path'], + remote_bind_address=(jms_asset['address'], jms_asset['port']) + ) + server.start() + except Exception as e: + err_msg = 'Gateway is not active: %s' % jms_asset.get('name', '') + print(f'\033[31m {err_msg} 原因: {e} \033[0m\n') + not_valid.append(k) + else: + host['ansible_host'] = jms_asset['address'] = host['login_host'] = '127.0.0.1' + host['ansible_port'] = jms_asset['port'] = host['login_port'] = server.local_bind_port + servers.append(server) + + # 网域不可连接的,就不继续执行此资源的后续任务了 + for a in set(not_valid): + info['all']['hosts'].pop(a) + self.json_to_file(runner.inventory, info) + self.gateway_servers[runner.id] = servers + + def local_gateway_clean(self, runner): + servers = self.gateway_servers.get(runner.id, []) + for s in servers: + try: + s.stop() + except Exception: + pass + + class PlaybookCallback(DefaultCallback): def playbook_on_stats(self, event_data, **kwargs): super().playbook_on_stats(event_data, **kwargs) @@ -37,7 +92,6 @@ class BasePlaybookManager: # 根据执行方式就行分组, 不同资产的改密、推送等操作可能会使用不同的执行方式 # 然后根据执行方式分组, 再根据 bulk_size 分组, 生成不同的 playbook self.playbooks = [] - self.gateway_servers = dict() params = self.execution.snapshot.get('params') self.params = params or {} @@ -247,66 +301,10 @@ class BasePlaybookManager: def on_runner_failed(self, runner, e): print("Runner failed: {} {}".format(e, self)) - @staticmethod - def file_to_json(path): - with open(path, 'r') as f: - d = json.load(f) - return d - @staticmethod def json_dumps(data): return json.dumps(data, indent=4, sort_keys=True) - @staticmethod - def json_to_file(path, data): - with open(path, 'w') as f: - json.dump(data, f, indent=4, sort_keys=True) - - def local_gateway_prepare(self, runner): - info = self.file_to_json(runner.inventory) - servers, not_valid = [], [] - for k, host in info['all']['hosts'].items(): - jms_asset, jms_gateway = host.get('jms_asset'), host.get('gateway') - if not jms_gateway: - continue - try: - server = SSHTunnelForwarder( - (jms_gateway['address'], jms_gateway['port']), - ssh_username=jms_gateway['username'], - ssh_password=jms_gateway['secret'], - ssh_pkey=jms_gateway['private_key_path'], - remote_bind_address=(jms_asset['address'], jms_asset['port']) - ) - server.start() - except Exception as e: - err_msg = 'Gateway is not active: %s' % jms_asset.get('name', '') - print(f'\033[31m {err_msg} 原因: {e} \033[0m\n') - not_valid.append(k) - else: - host['ansible_host'] = jms_asset['address'] = '127.0.0.1' - host['ansible_port'] = jms_asset['port'] = server.local_bind_port - servers.append(server) - - # 网域不可连接的,就不继续执行此资源的后续任务了 - for a in set(not_valid): - info['all']['hosts'].pop(a) - self.json_to_file(runner.inventory, info) - self.gateway_servers[runner.id] = servers - - def local_gateway_clean(self, runner): - servers = self.gateway_servers.get(runner.id, []) - for s in servers: - try: - s.stop() - except Exception: - pass - - def before_runner_start(self, runner): - self.local_gateway_prepare(runner) - - def after_runner_end(self, runner): - self.local_gateway_clean(runner) - def delete_runtime_dir(self): if settings.DEBUG_DEV: return @@ -326,14 +324,15 @@ class BasePlaybookManager: for i, runner in enumerate(runners, start=1): if len(runners) > 1: print(">>> 开始执行第 {} 批任务".format(i)) - self.before_runner_start(runner) + ssh_tunnel = SSHTunnelManager() + ssh_tunnel.local_gateway_prepare(runner) try: cb = runner.run(**kwargs) self.on_runner_success(runner, cb) except Exception as e: self.on_runner_failed(runner, e) finally: - self.after_runner_end(runner) + ssh_tunnel.local_gateway_clean(runner) print('\n') self.execution.status = 'success' self.execution.date_finished = timezone.now() diff --git a/apps/ops/models/job.py b/apps/ops/models/job.py index 5dea9a73e..d339dbf7f 100644 --- a/apps/ops/models/job.py +++ b/apps/ops/models/job.py @@ -20,6 +20,7 @@ from simple_history.models import HistoricalRecords from accounts.models import Account from acls.models import CommandFilterACL from assets.models import Asset +from assets.automations.base.manager import SSHTunnelManager from common.db.encoder import ModelJSONFieldEncoder from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException from ops.mixin import PeriodTaskModelMixin @@ -79,6 +80,13 @@ class JMSPermedInventory(JMSInventory): host['login_password'] = account.secret host['login_db'] = asset.spec_info.get('db_name', '') host['ansible_python_interpreter'] = sys.executable + if gateway: + host['gateway'] = { + 'address': gateway.address, 'port': gateway.port, + 'username': gateway.username, 'secret': gateway.password, + 'private_key_path': gateway.private_key_path + } + host['jms_asset']['port'] = protocol.port return host return super().make_account_vars(host, asset, account, automation, protocol, platform, gateway) @@ -529,6 +537,8 @@ class JobExecution(JMSOrgBaseModel): self.before_start() runner = self.get_runner() + ssh_tunnel = SSHTunnelManager() + ssh_tunnel.local_gateway_prepare(runner) try: cb = runner.run(**kwargs) self.set_result(cb) @@ -539,6 +549,8 @@ class JobExecution(JMSOrgBaseModel): except Exception as e: logging.error(e, exc_info=True) self.set_error(e) + finally: + ssh_tunnel.local_gateway_clean(runner) class Meta: verbose_name = _("Job Execution")