feat: 作业中心数据库支持网域命令执行 (#12117)

Co-authored-by: jiangweidong <weidong.jiang@fit2cloud.com>
pull/12193/head^2
fit2bot 1 year ago committed by GitHub
parent 18670d493e
commit 6d611bbbbd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -17,6 +17,61 @@ from ops.ansible import JMSInventory, PlaybookRunner, DefaultCallback
logger = get_logger(__name__) logger = get_logger(__name__)
class SSHTunnelManager:
def __init__(self, *args, **kwargs):
self.gateway_servers = dict()
@staticmethod
def file_to_json(path):
with open(path, 'r') as f:
d = json.load(f)
return d
@staticmethod
def json_to_file(path, data):
with open(path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
def local_gateway_prepare(self, runner):
info = self.file_to_json(runner.inventory)
servers, not_valid = [], []
for k, host in info['all']['hosts'].items():
jms_asset, jms_gateway = host.get('jms_asset'), host.get('gateway')
if not jms_gateway:
continue
try:
server = SSHTunnelForwarder(
(jms_gateway['address'], jms_gateway['port']),
ssh_username=jms_gateway['username'],
ssh_password=jms_gateway['secret'],
ssh_pkey=jms_gateway['private_key_path'],
remote_bind_address=(jms_asset['address'], jms_asset['port'])
)
server.start()
except Exception as e:
err_msg = 'Gateway is not active: %s' % jms_asset.get('name', '')
print(f'\033[31m {err_msg} 原因: {e} \033[0m\n')
not_valid.append(k)
else:
host['ansible_host'] = jms_asset['address'] = host['login_host'] = '127.0.0.1'
host['ansible_port'] = jms_asset['port'] = host['login_port'] = server.local_bind_port
servers.append(server)
# 网域不可连接的,就不继续执行此资源的后续任务了
for a in set(not_valid):
info['all']['hosts'].pop(a)
self.json_to_file(runner.inventory, info)
self.gateway_servers[runner.id] = servers
def local_gateway_clean(self, runner):
servers = self.gateway_servers.get(runner.id, [])
for s in servers:
try:
s.stop()
except Exception:
pass
class PlaybookCallback(DefaultCallback): class PlaybookCallback(DefaultCallback):
def playbook_on_stats(self, event_data, **kwargs): def playbook_on_stats(self, event_data, **kwargs):
super().playbook_on_stats(event_data, **kwargs) super().playbook_on_stats(event_data, **kwargs)
@ -37,7 +92,6 @@ class BasePlaybookManager:
# 根据执行方式就行分组, 不同资产的改密、推送等操作可能会使用不同的执行方式 # 根据执行方式就行分组, 不同资产的改密、推送等操作可能会使用不同的执行方式
# 然后根据执行方式分组, 再根据 bulk_size 分组, 生成不同的 playbook # 然后根据执行方式分组, 再根据 bulk_size 分组, 生成不同的 playbook
self.playbooks = [] self.playbooks = []
self.gateway_servers = dict()
params = self.execution.snapshot.get('params') params = self.execution.snapshot.get('params')
self.params = params or {} self.params = params or {}
@ -247,66 +301,10 @@ class BasePlaybookManager:
def on_runner_failed(self, runner, e): def on_runner_failed(self, runner, e):
print("Runner failed: {} {}".format(e, self)) print("Runner failed: {} {}".format(e, self))
@staticmethod
def file_to_json(path):
with open(path, 'r') as f:
d = json.load(f)
return d
@staticmethod @staticmethod
def json_dumps(data): def json_dumps(data):
return json.dumps(data, indent=4, sort_keys=True) return json.dumps(data, indent=4, sort_keys=True)
@staticmethod
def json_to_file(path, data):
with open(path, 'w') as f:
json.dump(data, f, indent=4, sort_keys=True)
def local_gateway_prepare(self, runner):
info = self.file_to_json(runner.inventory)
servers, not_valid = [], []
for k, host in info['all']['hosts'].items():
jms_asset, jms_gateway = host.get('jms_asset'), host.get('gateway')
if not jms_gateway:
continue
try:
server = SSHTunnelForwarder(
(jms_gateway['address'], jms_gateway['port']),
ssh_username=jms_gateway['username'],
ssh_password=jms_gateway['secret'],
ssh_pkey=jms_gateway['private_key_path'],
remote_bind_address=(jms_asset['address'], jms_asset['port'])
)
server.start()
except Exception as e:
err_msg = 'Gateway is not active: %s' % jms_asset.get('name', '')
print(f'\033[31m {err_msg} 原因: {e} \033[0m\n')
not_valid.append(k)
else:
host['ansible_host'] = jms_asset['address'] = '127.0.0.1'
host['ansible_port'] = jms_asset['port'] = server.local_bind_port
servers.append(server)
# 网域不可连接的,就不继续执行此资源的后续任务了
for a in set(not_valid):
info['all']['hosts'].pop(a)
self.json_to_file(runner.inventory, info)
self.gateway_servers[runner.id] = servers
def local_gateway_clean(self, runner):
servers = self.gateway_servers.get(runner.id, [])
for s in servers:
try:
s.stop()
except Exception:
pass
def before_runner_start(self, runner):
self.local_gateway_prepare(runner)
def after_runner_end(self, runner):
self.local_gateway_clean(runner)
def delete_runtime_dir(self): def delete_runtime_dir(self):
if settings.DEBUG_DEV: if settings.DEBUG_DEV:
return return
@ -326,14 +324,15 @@ class BasePlaybookManager:
for i, runner in enumerate(runners, start=1): for i, runner in enumerate(runners, start=1):
if len(runners) > 1: if len(runners) > 1:
print(">>> 开始执行第 {} 批任务".format(i)) print(">>> 开始执行第 {} 批任务".format(i))
self.before_runner_start(runner) ssh_tunnel = SSHTunnelManager()
ssh_tunnel.local_gateway_prepare(runner)
try: try:
cb = runner.run(**kwargs) cb = runner.run(**kwargs)
self.on_runner_success(runner, cb) self.on_runner_success(runner, cb)
except Exception as e: except Exception as e:
self.on_runner_failed(runner, e) self.on_runner_failed(runner, e)
finally: finally:
self.after_runner_end(runner) ssh_tunnel.local_gateway_clean(runner)
print('\n') print('\n')
self.execution.status = 'success' self.execution.status = 'success'
self.execution.date_finished = timezone.now() self.execution.date_finished = timezone.now()

@ -20,6 +20,7 @@ from simple_history.models import HistoricalRecords
from accounts.models import Account from accounts.models import Account
from acls.models import CommandFilterACL from acls.models import CommandFilterACL
from assets.models import Asset from assets.models import Asset
from assets.automations.base.manager import SSHTunnelManager
from common.db.encoder import ModelJSONFieldEncoder from common.db.encoder import ModelJSONFieldEncoder
from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException from ops.ansible import JMSInventory, AdHocRunner, PlaybookRunner, CommandInBlackListException
from ops.mixin import PeriodTaskModelMixin from ops.mixin import PeriodTaskModelMixin
@ -79,6 +80,13 @@ class JMSPermedInventory(JMSInventory):
host['login_password'] = account.secret host['login_password'] = account.secret
host['login_db'] = asset.spec_info.get('db_name', '') host['login_db'] = asset.spec_info.get('db_name', '')
host['ansible_python_interpreter'] = sys.executable host['ansible_python_interpreter'] = sys.executable
if gateway:
host['gateway'] = {
'address': gateway.address, 'port': gateway.port,
'username': gateway.username, 'secret': gateway.password,
'private_key_path': gateway.private_key_path
}
host['jms_asset']['port'] = protocol.port
return host return host
return super().make_account_vars(host, asset, account, automation, protocol, platform, gateway) return super().make_account_vars(host, asset, account, automation, protocol, platform, gateway)
@ -529,6 +537,8 @@ class JobExecution(JMSOrgBaseModel):
self.before_start() self.before_start()
runner = self.get_runner() runner = self.get_runner()
ssh_tunnel = SSHTunnelManager()
ssh_tunnel.local_gateway_prepare(runner)
try: try:
cb = runner.run(**kwargs) cb = runner.run(**kwargs)
self.set_result(cb) self.set_result(cb)
@ -539,6 +549,8 @@ class JobExecution(JMSOrgBaseModel):
except Exception as e: except Exception as e:
logging.error(e, exc_info=True) logging.error(e, exc_info=True)
self.set_error(e) self.set_error(e)
finally:
ssh_tunnel.local_gateway_clean(runner)
class Meta: class Meta:
verbose_name = _("Job Execution") verbose_name = _("Job Execution")

Loading…
Cancel
Save