perf: 修改 ansible

pull/8970/head
ibuler 2022-09-29 20:44:45 +08:00 committed by 老广
parent cd847c483a
commit 41589c5305
29 changed files with 450 additions and 283 deletions

View File

@ -33,7 +33,7 @@ class AccountViewSet(OrgBulkModelViewSet):
@action(methods=['post'], detail=True, url_path='verify') @action(methods=['post'], detail=True, url_path='verify')
def verify_account(self, request, *args, **kwargs): def verify_account(self, request, *args, **kwargs):
account = super().get_object() account = super().get_object()
task = test_accounts_connectivity_manual.delay([account]) task = test_accounts_connectivity_manual.delay([account.id])
return Response(data={'task': task.id}) return Response(data={'task': task.id})
@ -67,8 +67,8 @@ class AccountTaskCreateAPI(CreateAPIView):
return queryset return queryset
def perform_create(self, serializer): def perform_create(self, serializer):
accounts = self.get_accounts() account_ids = self.get_accounts().values_list('id', flat=True)
task = test_accounts_connectivity_manual.delay(accounts) task = test_accounts_connectivity_manual.delay(account_ids)
data = getattr(serializer, '_data', {}) data = getattr(serializer, '_data', {})
data["task"] = task.id data["task"] = task.id
setattr(serializer, '_data', data) setattr(serializer, '_data', data)

View File

@ -1,12 +1,8 @@
from rest_framework.decorators import action
from rest_framework.response import Response
from common.drf.api import JMSModelViewSet from common.drf.api import JMSModelViewSet
from common.drf.serializers import GroupedChoiceSerializer from common.drf.serializers import GroupedChoiceSerializer
from assets.models import Platform from assets.models import Platform
from assets.serializers import PlatformSerializer, PlatformOpsMethodSerializer from assets.serializers import PlatformSerializer
from assets.const import AllTypes
from assets.playbooks import filter_platform_methods
__all__ = ['AssetPlatformViewSet'] __all__ = ['AssetPlatformViewSet']

View File

@ -0,0 +1,4 @@
from .change_secret import *
from .account_discovery import *
from .account_reconcile import *
from .account_verify import *

View File

@ -1,12 +1,12 @@
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from ops.const import StrategyChoice from ops.const import StrategyChoice
from .common import AutomationStrategy from .base import BaseAutomation
class CollectStrategy(AutomationStrategy): class DiscoveryAutomation(BaseAutomation):
class Meta: class Meta:
verbose_name = _("Collect strategy") verbose_name = _("Discovery strategy")
def to_attr_json(self): def to_attr_json(self):
attr_json = super().to_attr_json() attr_json = super().to_attr_json()

View File

@ -1,12 +1,12 @@
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from ops.const import StrategyChoice from ops.const import StrategyChoice
from .common import AutomationStrategy from .base import BaseAutomation
class PushStrategy(AutomationStrategy): class ReconcileAutomation(BaseAutomation):
class Meta: class Meta:
verbose_name = _("Push strategy") verbose_name = _("Reconcile strategy")
def to_attr_json(self): def to_attr_json(self):
attr_json = super().to_attr_json() attr_json = super().to_attr_json()

View File

@ -1,10 +1,10 @@
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from ops.const import StrategyChoice from ops.const import StrategyChoice
from .common import AutomationStrategy from .base import BaseAutomation
class VerifyStrategy(AutomationStrategy): class VerifyAutomation(BaseAutomation):
class Meta: class Meta:
verbose_name = _("Verify strategy") verbose_name = _("Verify strategy")

View File

@ -12,8 +12,7 @@ from ops.tasks import execute_automation_strategy
from ops.task_handlers import ExecutionManager from ops.task_handlers import ExecutionManager
class AutomationStrategy(CommonModelMixin, PeriodTaskModelMixin, OrgModelMixin): class BaseAutomation(CommonModelMixin, PeriodTaskModelMixin, OrgModelMixin):
id = models.UUIDField(default=uuid.uuid4, primary_key=True)
accounts = models.JSONField(default=list, verbose_name=_("Accounts")) accounts = models.JSONField(default=list, verbose_name=_("Accounts"))
nodes = models.ManyToManyField( nodes = models.ManyToManyField(
'assets.Node', related_name='automation_strategy', blank=True, verbose_name=_("Nodes") 'assets.Node', related_name='automation_strategy', blank=True, verbose_name=_("Nodes")
@ -21,6 +20,7 @@ class AutomationStrategy(CommonModelMixin, PeriodTaskModelMixin, OrgModelMixin):
assets = models.ManyToManyField( assets = models.ManyToManyField(
'assets.Asset', related_name='automation_strategy', blank=True, verbose_name=_("Assets") 'assets.Asset', related_name='automation_strategy', blank=True, verbose_name=_("Assets")
) )
type = models.CharField(max_length=16, verbose_name=_('Type'))
comment = models.TextField(blank=True, verbose_name=_('Comment')) comment = models.TextField(blank=True, verbose_name=_('Comment'))
def __str__(self): def __str__(self):
@ -67,7 +67,7 @@ class AutomationStrategyExecution(OrgModelMixin):
default=dict, blank=True, null=True, verbose_name=_('Automation snapshot') default=dict, blank=True, null=True, verbose_name=_('Automation snapshot')
) )
strategy = models.ForeignKey( strategy = models.ForeignKey(
'AutomationStrategy', related_name='execution', on_delete=models.CASCADE, 'assets.models.automation.base.BaseAutomation', related_name='execution', on_delete=models.CASCADE,
verbose_name=_('Automation strategy') verbose_name=_('Automation strategy')
) )
trigger = models.CharField( trigger = models.CharField(

View File

@ -1,38 +1,19 @@
from django.db import models from django.db import models
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from common.db import fields
from ops.const import SSHKeyStrategy, PasswordStrategy, StrategyChoice from ops.const import SSHKeyStrategy, PasswordStrategy, StrategyChoice
from ops.utils import generate_random_password from ops.utils import generate_random_password
from common.db.fields import ( from .base import BaseAutomation
EncryptCharField, EncryptTextField, JsonDictCharField
)
from .common import AutomationStrategy
class ChangeAuthStrategy(AutomationStrategy): class ChangePasswordAutomation(BaseAutomation):
is_password = models.BooleanField(default=True) class PasswordStrategy(models.TextChoices):
password_strategy = models.CharField( custom = 'specific', _('Specific')
max_length=128, blank=True, null=True, choices=PasswordStrategy.choices, random_one = 'random_one', _('All assets use the same random password')
verbose_name=_('Password strategy') random_all = 'random_all', _('All assets use different random password')
)
password_rules = JsonDictCharField(
max_length=2048, blank=True, null=True, verbose_name=_('Password rules')
)
password = EncryptCharField(
max_length=256, blank=True, null=True, verbose_name=_('Password')
)
is_ssh_key = models.BooleanField(default=False) password = fields.EncryptTextField(blank=True, null=True, verbose_name=_('Secret'))
ssh_key_strategy = models.CharField(
max_length=128, blank=True, null=True, choices=SSHKeyStrategy.choices,
verbose_name=_('SSH Key strategy')
)
private_key = EncryptTextField(
max_length=4096, blank=True, null=True, verbose_name=_('SSH private key')
)
public_key = EncryptTextField(
max_length=4096, blank=True, null=True, verbose_name=_('SSH public key')
)
recipients = models.ManyToManyField( recipients = models.ManyToManyField(
'users.User', related_name='recipients_change_auth_strategy', blank=True, 'users.User', related_name='recipients_change_auth_strategy', blank=True,
verbose_name=_("Recipient") verbose_name=_("Recipient")

View File

@ -76,6 +76,15 @@ class BaseAccount(OrgModelMixin):
def has_secret(self): def has_secret(self):
return bool(self.secret) return bool(self.secret)
@property
def password(self):
return self.secret
@password.setter
def password(self, value):
self.secret = value
self.secret_type = 'password'
@property @property
def private_key(self): def private_key(self):
if self.secret_type == self.SecretType.ssh_key: if self.secret_type == self.SecretType.ssh_key:
@ -91,15 +100,6 @@ class BaseAccount(OrgModelMixin):
self.secret = value self.secret = value
self.secret_type = 'private_key' self.secret_type = 'private_key'
@property
def password(self):
return self.secret
@password.setter
def password(self, value):
self.secret = value
self.secret_type = 'password'
@property @property
def ssh_key_fingerprint(self): def ssh_key_fingerprint(self):
if self.public_key: if self.public_key:
@ -125,8 +125,8 @@ class BaseAccount(OrgModelMixin):
return None return None
@property @property
def private_key_file(self): def private_key_path(self):
if not self.private_key_obj: if not self.secret_type != 'ssh_key' or not self.secret:
return None return None
project_dir = settings.PROJECT_DIR project_dir = settings.PROJECT_DIR
tmp_dir = os.path.join(project_dir, 'tmp') tmp_dir = os.path.join(project_dir, 'tmp')

View File

@ -40,6 +40,9 @@ class Domain(OrgModelMixin):
def gateways(self): def gateways(self):
return self.gateway_set.filter(is_active=True) return self.gateway_set.filter(is_active=True)
def select_gateway(self):
return self.random_gateway()
def random_gateway(self): def random_gateway(self):
gateways = [gw for gw in self.gateways if gw.is_connective] gateways = [gw for gw in self.gateways if gw.is_connective]
if gateways: if gateways:

View File

@ -5,7 +5,7 @@ from django.utils.translation import ugettext as _, gettext_noop
from common.utils import get_logger from common.utils import get_logger
from orgs.utils import org_aware_func from orgs.utils import org_aware_func
from ..models import Connectivity from ..models import Connectivity, Account
from . import const from . import const
from .utils import check_asset_can_run_ansible from .utils import check_asset_can_run_ansible
@ -99,10 +99,11 @@ def test_account_connectivity_util(account, task_name):
@shared_task(queue="ansible") @shared_task(queue="ansible")
def test_accounts_connectivity_manual(accounts): def test_accounts_connectivity_manual(account_ids):
""" """
:param accounts: <Account>对象 :param accounts: <Account>对象
""" """
accounts = Account.objects.filter(id__in=account_ids)
for account in accounts: for account in accounts:
task_name = gettext_noop("Test account connectivity: ") + str(account) task_name = gettext_noop("Test account connectivity: ") + str(account)
test_account_connectivity_util(account, task_name) test_account_connectivity_util(account, task_name)

View File

@ -5,8 +5,8 @@ from celery import shared_task
from django.utils.translation import gettext_noop from django.utils.translation import gettext_noop
from common.utils import get_logger from common.utils import get_logger
from orgs.utils import org_aware_func from orgs.utils import org_aware_func, tmp_to_root_org
from ..models import Asset, Connectivity, Account from ..models import Asset, Connectivity, Account, Node
from . import const from . import const
from .utils import clean_ansible_task_hosts, group_asset_by_platform from .utils import clean_ansible_task_hosts, group_asset_by_platform
@ -41,8 +41,7 @@ def set_assets_accounts_connectivity(assets, results_summary):
Account.bulk_set_connectivity(accounts_failed, Connectivity.failed) Account.bulk_set_connectivity(accounts_failed, Connectivity.failed)
@shared_task(queue="ansible") @org_aware_func('assets')
@org_aware_func("assets")
def test_asset_connectivity_util(assets, task_name=None): def test_asset_connectivity_util(assets, task_name=None):
from ops.utils import update_or_create_ansible_task from ops.utils import update_or_create_ansible_task
@ -88,7 +87,10 @@ def test_asset_connectivity_util(assets, task_name=None):
@shared_task(queue="ansible") @shared_task(queue="ansible")
def test_asset_connectivity_manual(asset): def test_asset_connectivity_manual(asset_id):
asset = Asset.objects.filter(id=asset_id).first()
if not asset:
return
task_name = gettext_noop("Test assets connectivity: ") + str(asset) task_name = gettext_noop("Test assets connectivity: ") + str(asset)
summary = test_asset_connectivity_util([asset], task_name=task_name) summary = test_asset_connectivity_util([asset], task_name=task_name)
@ -99,7 +101,9 @@ def test_asset_connectivity_manual(asset):
@shared_task(queue="ansible") @shared_task(queue="ansible")
def test_assets_connectivity_manual(assets): def test_assets_connectivity_manual(asset_ids):
with tmp_to_root_org():
assets = Asset.objects.filter(id__in=asset_ids)
task_name = gettext_noop("Test assets connectivity: ") + str([asset.name for asset in assets]) task_name = gettext_noop("Test assets connectivity: ") + str([asset.name for asset in assets])
summary = test_asset_connectivity_util(assets, task_name=task_name) summary = test_asset_connectivity_util(assets, task_name=task_name)
@ -110,7 +114,10 @@ def test_assets_connectivity_manual(assets):
@shared_task(queue="ansible") @shared_task(queue="ansible")
def test_node_assets_connectivity_manual(node): def test_node_assets_connectivity_manual(node_id):
with tmp_to_root_org():
node = Node.objects.get(id=node_id)
task_name = gettext_noop("Test if the assets under the node are connectable: ") + node.name task_name = gettext_noop("Test if the assets under the node are connectable: ") + node.name
assets = node.get_all_assets() assets = node.get_all_assets()
result = test_asset_connectivity_util(assets, task_name=task_name) result = test_asset_connectivity_util(assets, task_name=task_name)

View File

@ -9,8 +9,9 @@ from django.utils.translation import ugettext as _, gettext_noop
from common.utils import ( from common.utils import (
capacity_convert, sum_capacity, get_logger capacity_convert, sum_capacity, get_logger
) )
from orgs.utils import org_aware_func from orgs.utils import org_aware_func, tmp_to_root_org
from . import const from . import const
from ..models import Asset, Node
from .utils import clean_ansible_task_hosts from .utils import clean_ansible_task_hosts
@ -27,7 +28,6 @@ def set_assets_hardware_info(assets, result, **kwargs):
""" """
Using ops task run result, to update asset info Using ops task run result, to update asset info
@shared_task must be exit, because we using it as a task callback, is must
be a celery task also be a celery task also
:param assets: :param assets:
:param result: :param result:
@ -83,15 +83,15 @@ def set_assets_hardware_info(assets, result, **kwargs):
return assets_updated return assets_updated
@shared_task @org_aware_func('assets')
@org_aware_func("assets")
def update_assets_hardware_info_util(assets, task_name=None): def update_assets_hardware_info_util(assets, task_name=None):
""" """
Using ansible api to update asset hardware info Using ansible api to update asset hardware info
:param assets: asset seq :param asset_ids: asset seq
:param task_name: task_name running :param task_name: task_name running
:return: result summary ['contacted': {}, 'dark': {}] :return: result summary ['contacted': {}, 'dark': {}]
""" """
from ops.utils import update_or_create_ansible_task from ops.utils import update_or_create_ansible_task
if task_name is None: if task_name is None:
task_name = gettext_noop("Update some assets hardware info. ") task_name = gettext_noop("Update some assets hardware info. ")
@ -110,15 +110,19 @@ def update_assets_hardware_info_util(assets, task_name=None):
@shared_task(queue="ansible") @shared_task(queue="ansible")
def update_asset_hardware_info_manual(asset): def update_asset_hardware_info_manual(asset_id):
with tmp_to_root_org():
asset = Asset.objects.filter(id=asset_id).first()
if not asset:
return
task_name = gettext_noop("Update asset hardware info: ") + str(asset.name) task_name = gettext_noop("Update asset hardware info: ") + str(asset.name)
update_assets_hardware_info_util([asset], task_name=task_name) update_assets_hardware_info_util([asset], task_name=task_name)
@shared_task(queue="ansible") @shared_task(queue="ansible")
def update_assets_hardware_info_manual(assets): def update_assets_hardware_info_manual(asset_ids):
task_name = gettext_noop("Update assets hardware info: ") + str([asset.name for asset in assets]) task_name = gettext_noop("Update assets hardware info: ") + str([asset.name for asset in assets])
update_assets_hardware_info_util(assets, task_name=task_name) update_assets_hardware_info_util(asset_ids, task_name=task_name)
@shared_task(queue="ansible") @shared_task(queue="ansible")
@ -133,7 +137,12 @@ def update_assets_hardware_info_period():
@shared_task(queue="ansible") @shared_task(queue="ansible")
def update_node_assets_hardware_info_manual(node): def update_node_assets_hardware_info_manual(node_id):
with tmp_to_root_org():
node = Node.objects.filter(id=node_id).first()
if not node:
return
task_name = gettext_noop("Update node asset hardware information: ") + str(node.name) task_name = gettext_noop("Update node asset hardware information: ") + str(node.name)
assets = node.get_all_assets() assets = node.get_all_assets()
result = update_assets_hardware_info_util(assets, task_name=task_name) result = update_assets_hardware_info_util(assets, task_name=task_name)

View File

@ -7,7 +7,7 @@ from celery import shared_task
from django.utils.translation import gettext_noop from django.utils.translation import gettext_noop
from django.utils import timezone from django.utils import timezone
from orgs.utils import tmp_to_org, org_aware_func from orgs.utils import tmp_to_org, org_aware_func, tmp_to_root_org
from common.utils import get_logger from common.utils import get_logger
from ..models import GatheredUser, Node from ..models import GatheredUser, Node
from .utils import clean_ansible_task_hosts from .utils import clean_ansible_task_hosts
@ -103,7 +103,6 @@ def add_asset_users(assets, results):
) )
@shared_task(queue="ansible")
@org_aware_func("assets") @org_aware_func("assets")
def gather_asset_users(assets, task_name=None): def gather_asset_users(assets, task_name=None):
from ops.utils import update_or_create_ansible_task from ops.utils import update_or_create_ansible_task

View File

@ -72,6 +72,15 @@ class AdHocResultCallback(CallbackMixin, CallbackModule, CMDCallBackModule):
Task result Callback Task result Callback
""" """
context = None context = None
events = [
'runner_on_failed', 'runner_on_ok',
'runner_on_skipped', 'runner_on_unreachable',
]
def event_handler(self, data):
event = data.get('event', None)
print("Event: ", event)
print("Event Data: ", json.dumps(data))
def clean_result(self, t, host, task_name, task_result): def clean_result(self, t, host, task_name, task_result):
contacted = self.results_summary["contacted"] contacted = self.results_summary["contacted"]

View File

@ -1,4 +1,7 @@
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
from collections import defaultdict
import json
from ansible.inventory.host import Host from ansible.inventory.host import Host
from ansible.vars.manager import VariableManager from ansible.vars.manager import VariableManager
from ansible.inventory.manager import InventoryManager from ansible.inventory.manager import InventoryManager
@ -21,7 +24,7 @@ class BaseHost(Host):
# behind is not must be required # behind is not must be required
"username": "", "username": "",
"password": "", "password": "",
"private_key": "", "private_key_path": "",
"become": { "become": {
"method": "", "method": "",
"user": "", "user": "",
@ -49,8 +52,8 @@ class BaseHost(Host):
# 添加密码和密钥 # 添加密码和密钥
if host_data.get('password'): if host_data.get('password'):
self.set_variable('ansible_ssh_pass', host_data['password']) self.set_variable('ansible_ssh_pass', host_data['password'])
if host_data.get('private_key'): if host_data.get('private_key_path'):
self.set_variable('ansible_ssh_private_key_file', host_data['private_key']) self.set_variable('ansible_ssh_private_key_file', host_data['private_key_path'])
# 添加become支持 # 添加become支持
become = host_data.get("become", False) become = host_data.get("become", False)
@ -155,13 +158,144 @@ class BaseInventory(InventoryManager):
class JMSInventory: class JMSInventory:
def __init__(self, assets, account=None, ansible_connection='ssh', def __init__(self, assets, account_username=None, account_policy='smart', host_var_callback=None):
account_policy='smart', host_var_callback=None):
""" """
:param assets: :param assets:
:param account: account username name if not set use account_policy :param account_username: account username name if not set use account_policy
:param ansible_connection: ssh, local,
:param account_policy: :param account_policy:
:param host_var_callback: :param host_var_callback:
""" """
pass self.assets = self.clean_assets(assets)
self.account_username = account_username
self.account_policy = account_policy
self.host_var_callback = host_var_callback
@staticmethod
def clean_assets(assets):
from assets.models import Asset
asset_ids = [asset.id for asset in assets]
assets = Asset.objects.filter(id__in=asset_ids)\
.prefetch_related('platform', 'domain', 'accounts')
return assets
@staticmethod
def group_by_platform(assets):
groups = defaultdict(list)
for asset in assets:
groups[asset.platform].append(asset)
return groups
@staticmethod
def make_proxy_command(gateway):
proxy_command_list = [
"ssh", "-o", "Port={}".format(gateway.port),
"-o", "StrictHostKeyChecking=no",
"{}@{}".format(gateway.username, gateway.address),
"-W", "%h:%p", "-q",
]
if gateway.password:
proxy_command_list.insert(
0, "sshpass -p '{}'".format(gateway.password)
)
if gateway.private_key:
proxy_command_list.append("-i {}".format(gateway.private_key_file))
proxy_command = "'-o ProxyCommand={}'".format(
" ".join(proxy_command_list)
)
return {"ansible_ssh_common_args": proxy_command}
def asset_to_host(self, asset, account, automation, protocols):
host = {'name': asset.name, 'vars': {
'asset_id': str(asset.id), 'asset_name': asset.name,
'asset_type': asset.type, 'asset_category': asset.category,
}}
ansible_connection = automation.ansible_config.get('ansible_connection', 'ssh')
gateway = None
if asset.domain:
gateway = asset.domain.select_gateway()
ssh_protocol_matched = list(filter(lambda x: x.name == 'ssh', protocols))
ssh_protocol = ssh_protocol_matched[0] if ssh_protocol_matched else None
if ansible_connection == 'local':
if gateway:
host['ansible_host'] = gateway.address
host['ansible_port'] = gateway.port
host['ansible_user'] = gateway.username
host['ansible_password'] = gateway.password
host['ansible_connection'] = 'smart'
else:
host['ansible_connection'] = 'local'
else:
host['ansible_host'] = asset.address
host['ansible_port'] = ssh_protocol.port if ssh_protocol else 22
if account:
host['ansible_user'] = account.username
if account.secret_type == 'password' and account.secret:
host['ansible_password'] = account.secret
elif account.secret_type == 'private_key' and account.secret:
host['ssh_private_key'] = account.private_key_file
if gateway:
host['vars'].update(self.make_proxy_command(gateway))
if self.host_var_callback:
callback_var = self.host_var_callback(asset)
if isinstance(callback_var, dict):
host['vars'].update(callback_var)
return host
def select_account(self, asset):
accounts = list(asset.accounts.all())
if not accounts:
return None
account_selected = None
account_username = self.account_username
if isinstance(self.account_username, str):
account_username = [self.account_username]
if account_username:
for username in account_username:
account_matched = list(filter(lambda account: account.username == username, accounts))
if account_matched:
account_selected = account_matched[0]
return account_selected
if not account_selected:
if self.account_policy in ['privileged_must', 'privileged_first']:
account_selected = list(filter(lambda account: account.is_privileged, accounts))
account_selected = account_selected[0] if account_selected else None
if not account_selected and self.account_policy == 'privileged_first':
account_selected = accounts[0]
return account_selected
def generate(self):
hosts = []
platform_assets = self.group_by_platform(self.assets)
for platform, assets in platform_assets.items():
automation = platform.automation
protocols = platform.protocols.all()
if not automation.ansible_enabled:
continue
for asset in self.assets:
account = self.select_account(asset)
host = self.asset_to_host(asset, account, automation, protocols)
hosts.append(host)
return hosts
def write_to_file(self, path):
hosts = self.generate()
data = {'all': {'hosts': {}}}
for host in hosts:
name = host.pop('name')
var = host.pop('vars', {})
host.update(var)
data['all']['hosts'][name] = host
with open(path, 'w') as f:
f.write(json.dumps(data, indent=4))

View File

@ -0,0 +1,65 @@
class JMSCallback:
def event_handler(self, data, runner_config):
event = data.get('event', None)
if not event:
return
event_data = data.get('event_data', {})
pass
def runner_on_ok(self, event_data):
pass
def runer_on_failed(self, event_data):
pass
def runner_on_skipped(self, event_data):
pass
def runner_on_unreachable(self, event_data):
pass
def runner_on_start(self, event_data):
pass
def runer_retry(self, event_data):
pass
def runner_on_file_diff(self, event_data):
pass
def runner_item_on_failed(self, event_data):
pass
def runner_item_on_skipped(self, event_data):
pass
def playbook_on_play_start(self, event_data):
pass
def playbook_on_stats(self, event_data):
pass
def playbook_on_include(self, event_data):
pass
def playbook_on_notify(self, event_data):
pass
def playbook_on_vars_prompt(self, event_data):
pass
def playbook_on_handler_task_start(self, event_data):
pass
def playbook_on_no_hosts_matched(self, event_data):
pass
def playbook_on_no_hosts_remaining(self, event_data):
pass
def warning(self):
pass
def status_handler(self):
pass

View File

@ -1,14 +1,43 @@
import uuid
import ansible_runner import ansible_runner
from django.conf import settings
class AnsibleInventory:
def __init__(self, assets, account=None, ansible_connection='ssh'):
self.assets = assets
self.account = account
class AdHocRunner: class AdHocRunner:
pass cmd_modules_choices = ('shell', 'raw', 'command', 'script', 'win_shell')
cmd_blacklist = [
"reboot", 'shutdown', 'poweroff', 'halt', 'dd', 'half', 'top'
]
def __init__(self, inventory, module, module_args, pattern='*', project_dir='/tmp/'):
self.id = uuid.uuid4()
self.inventory = inventory
self.pattern = pattern
self.module = module
self.module_args = module_args
self.project_dir = project_dir
def check_module(self):
if self.module not in self.cmd_modules_choices:
return
if self.module_args and self.module_args.split()[0] in self.cmd_blacklist:
raise Exception("command not allowed: {}".format(self.module_args[0]))
def run(self, verbosity=0, **kwargs):
self.check_module()
if verbosity is None and settings.DEBUG:
verbosity = 1
return ansible_runner.run(
host_pattern=self.pattern,
private_data_dir=self.project_dir,
inventory=self.inventory,
module=self.module,
module_args=self.module_args,
verbosity=verbosity,
**kwargs
)
class PlaybookRunner: class PlaybookRunner:

View File

@ -0,0 +1,44 @@
# Generated by Django 3.2.14 on 2022-09-29 12:25
from django.db import migrations, models
class Migration(migrations.Migration):
dependencies = [
('ops', '0022_auto_20220817_1346'),
]
operations = [
migrations.RemoveField(
model_name='celerytask',
name='log_path',
),
migrations.RemoveField(
model_name='celerytask',
name='status',
),
migrations.AddField(
model_name='celerytask',
name='args',
field=models.JSONField(default=[], verbose_name='Args'),
preserve_default=False,
),
migrations.AddField(
model_name='celerytask',
name='is_finished',
field=models.BooleanField(default=False, verbose_name='Finished'),
),
migrations.AddField(
model_name='celerytask',
name='kwargs',
field=models.JSONField(default={}, verbose_name='Kwargs'),
preserve_default=False,
),
migrations.AddField(
model_name='celerytask',
name='state',
field=models.CharField(default='SUCCESS', max_length=16, verbose_name='State'),
preserve_default=False,
),
]

View File

@ -1,123 +0,0 @@
# Generated by Django 3.2.14 on 2022-09-08 11:58
import common.db.fields
from django.conf import settings
from django.db import migrations, models
import django.db.models.deletion
import uuid
class Migration(migrations.Migration):
dependencies = [
migrations.swappable_dependency(settings.AUTH_USER_MODEL),
('assets', '0105_auto_20220817_1544'),
('ops', '0022_auto_20220817_1346'),
]
operations = [
migrations.CreateModel(
name='AutomationStrategy',
fields=[
('org_id', models.CharField(blank=True, db_index=True, default='', max_length=36, verbose_name='Organization')),
('created_by', models.CharField(blank=True, max_length=32, null=True, verbose_name='Created by')),
('date_created', models.DateTimeField(auto_now_add=True, null=True, verbose_name='Date created')),
('date_updated', models.DateTimeField(auto_now=True, verbose_name='Date updated')),
('name', models.CharField(max_length=128, verbose_name='Name')),
('is_periodic', models.BooleanField(default=False)),
('interval', models.IntegerField(blank=True, default=24, null=True, verbose_name='Cycle perform')),
('crontab', models.CharField(blank=True, max_length=128, null=True, verbose_name='Regularly perform')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('accounts', models.JSONField(default=list, verbose_name='Accounts')),
('comment', models.TextField(blank=True, verbose_name='Comment')),
('assets', models.ManyToManyField(blank=True, related_name='automation_strategy', to='assets.Asset', verbose_name='Assets')),
('nodes', models.ManyToManyField(blank=True, related_name='automation_strategy', to='assets.Node', verbose_name='Nodes')),
],
options={
'verbose_name': 'Automation plan',
'unique_together': {('org_id', 'name')},
},
),
migrations.CreateModel(
name='AutomationStrategyExecution',
fields=[
('org_id', models.CharField(blank=True, db_index=True, default='', max_length=36, verbose_name='Organization')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('date_created', models.DateTimeField(auto_now_add=True)),
('timedelta', models.FloatField(default=0.0, null=True, verbose_name='Time')),
('date_start', models.DateTimeField(auto_now_add=True, verbose_name='Date start')),
('snapshot', common.db.fields.EncryptJsonDictTextField(blank=True, default=dict, null=True, verbose_name='Automation snapshot')),
('trigger', models.CharField(choices=[('manual', 'Manual trigger'), ('timing', 'Timing trigger')], default='manual', max_length=128, verbose_name='Trigger mode')),
('strategy', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='execution', to='ops.automationstrategy', verbose_name='Automation strategy')),
],
options={
'verbose_name': 'Automation strategy execution',
},
),
migrations.CreateModel(
name='CollectStrategy',
fields=[
('automationstrategy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='ops.automationstrategy')),
],
options={
'verbose_name': 'Collect strategy',
},
bases=('ops.automationstrategy',),
),
migrations.CreateModel(
name='PushStrategy',
fields=[
('automationstrategy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='ops.automationstrategy')),
],
options={
'verbose_name': 'Push strategy',
},
bases=('ops.automationstrategy',),
),
migrations.CreateModel(
name='VerifyStrategy',
fields=[
('automationstrategy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='ops.automationstrategy')),
],
options={
'verbose_name': 'Verify strategy',
},
bases=('ops.automationstrategy',),
),
migrations.CreateModel(
name='AutomationStrategyTask',
fields=[
('org_id', models.CharField(blank=True, db_index=True, default='', max_length=36, verbose_name='Organization')),
('id', models.UUIDField(default=uuid.uuid4, primary_key=True, serialize=False)),
('is_success', models.BooleanField(default=False, verbose_name='Is success')),
('timedelta', models.FloatField(default=0.0, null=True, verbose_name='Time')),
('date_start', models.DateTimeField(auto_now_add=True, verbose_name='Date start')),
('reason', models.CharField(blank=True, max_length=1024, null=True, verbose_name='Reason')),
('account', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='assets.account', verbose_name='Account')),
('asset', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, to='assets.asset', verbose_name='Asset')),
('execution', models.ForeignKey(on_delete=django.db.models.deletion.CASCADE, related_name='task', to='ops.automationstrategyexecution', verbose_name='Automation strategy execution')),
],
options={
'verbose_name': 'Automation strategy task',
},
),
migrations.CreateModel(
name='ChangeAuthStrategy',
fields=[
('automationstrategy_ptr', models.OneToOneField(auto_created=True, on_delete=django.db.models.deletion.CASCADE, parent_link=True, primary_key=True, serialize=False, to='ops.automationstrategy')),
('is_password', models.BooleanField(default=True)),
('password_strategy', models.CharField(blank=True, choices=[('custom', 'Custom password'), ('random_one', 'All assets use the same random password'), ('random_all', 'All assets use different random password')], max_length=128, null=True, verbose_name='Password strategy')),
('password_rules', common.db.fields.JsonDictCharField(blank=True, max_length=2048, null=True, verbose_name='Password rules')),
('password', common.db.fields.EncryptCharField(blank=True, max_length=256, null=True, verbose_name='Password')),
('is_ssh_key', models.BooleanField(default=False)),
('ssh_key_strategy', models.CharField(blank=True, choices=[('add', 'Append SSH KEY'), ('set', 'Empty and append SSH KEY'), ('set_jms', 'Replace (The key generated by JumpServer) ')], max_length=128, null=True, verbose_name='SSH Key strategy')),
('private_key', common.db.fields.EncryptTextField(blank=True, max_length=4096, null=True, verbose_name='SSH private key')),
('public_key', common.db.fields.EncryptTextField(blank=True, max_length=4096, null=True, verbose_name='SSH public key')),
('recipients', models.ManyToManyField(blank=True, related_name='recipients_change_auth_strategy', to=settings.AUTH_USER_MODEL, verbose_name='Recipient')),
],
options={
'verbose_name': 'Change auth strategy',
},
bases=('ops.automationstrategy',),
),
]

View File

@ -4,4 +4,3 @@
from .adhoc import * from .adhoc import *
from .celery import * from .celery import *
from .command import * from .command import *
from .automation import *

View File

@ -1,5 +0,0 @@
from .change_auth import *
from .collect import *
from .push import *
from .verify import *
from .common import *

View File

@ -1,2 +0,0 @@
# -*- coding: utf-8 -*-
#

View File

@ -9,32 +9,16 @@ from django.db import models
class CeleryTask(models.Model): class CeleryTask(models.Model):
WAITING = "waiting"
RUNNING = "running"
FINISHED = "finished"
LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery') LOG_DIR = os.path.join(settings.PROJECT_DIR, 'data', 'celery')
STATUS_CHOICES = (
(WAITING, WAITING),
(RUNNING, RUNNING),
(FINISHED, FINISHED),
)
id = models.UUIDField(primary_key=True, default=uuid.uuid4) id = models.UUIDField(primary_key=True, default=uuid.uuid4)
name = models.CharField(max_length=1024) name = models.CharField(max_length=1024)
status = models.CharField(max_length=128, choices=STATUS_CHOICES, db_index=True) args = models.JSONField(verbose_name=_("Args"))
log_path = models.CharField(max_length=256, blank=True, null=True) kwargs = models.JSONField(verbose_name=_("Kwargs"))
state = models.CharField(max_length=16, verbose_name=_("State"))
is_finished = models.BooleanField(default=False, verbose_name=_("Finished"))
date_published = models.DateTimeField(auto_now_add=True) date_published = models.DateTimeField(auto_now_add=True)
date_start = models.DateTimeField(null=True) date_start = models.DateTimeField(null=True)
date_finished = models.DateTimeField(null=True) date_finished = models.DateTimeField(null=True)
def __str__(self): def __str__(self):
return "{}: {}".format(self.name, self.id) return "{}: {}".format(self.name, self.id)
def is_finished(self):
return self.status == self.FINISHED
@property
def full_log_path(self):
if not self.log_path:
return None
return os.path.join(self.LOG_DIR, self.log_path)

View File

@ -0,0 +1,16 @@
from django.db import models
from django.utils.translation import gettext_lazy as _
from orgs.mixins.models import JMSOrgBaseModel
from ..mixin import PeriodTaskModelMixin
class PlaybookTask(PeriodTaskModelMixin, JMSOrgBaseModel):
assets = models.ManyToManyField('assets.Asset', verbose_name=_("Assets"))
account = models.CharField(max_length=128, default='root', verbose_name=_('Account'))
playbook = models.FilePathField(max_length=1024, verbose_name=_("Playbook"))
owner = models.CharField(max_length=1024, verbose_name=_("Owner"))
comment = models.TextField(blank=True, verbose_name=_("Comment"))
def get_register_task(self):
pass

View File

@ -1,15 +1,20 @@
from django.utils import translation import ast
from django.utils import translation, timezone
from django.core.cache import cache from django.core.cache import cache
from celery.signals import task_prerun, task_postrun, before_task_publish from celery import signals
from common.db.utils import close_old_connections from common.db.utils import close_old_connections, get_logger
from .models import CeleryTask
logger = get_logger(__name__)
TASK_LANG_CACHE_KEY = 'TASK_LANG_{}' TASK_LANG_CACHE_KEY = 'TASK_LANG_{}'
TASK_LANG_CACHE_TTL = 1800 TASK_LANG_CACHE_TTL = 1800
@before_task_publish.connect() @signals.before_task_publish.connect
def before_task_publish(headers=None, **kwargs): def before_task_publish(headers=None, **kwargs):
task_id = headers.get('id') task_id = headers.get('id')
current_lang = translation.get_language() current_lang = translation.get_language()
@ -17,8 +22,10 @@ def before_task_publish(headers=None, **kwargs):
cache.set(key, current_lang, 1800) cache.set(key, current_lang, 1800)
@task_prerun.connect() @signals.task_prerun.connect
def on_celery_task_pre_run(task_id='', **kwargs): def on_celery_task_pre_run(task_id='', **kwargs):
# 更新状态
CeleryTask.objects.filter(id=task_id).update(state='RUNNING', date_start=timezone.now())
# 关闭之前的数据库连接 # 关闭之前的数据库连接
close_old_connections() close_old_connections()
@ -29,6 +36,40 @@ def on_celery_task_pre_run(task_id='', **kwargs):
translation.activate(task_lang) translation.activate(task_lang)
@task_postrun.connect() @signals.task_postrun.connect
def on_celery_task_post_run(**kwargs): def on_celery_task_post_run(task_id='', state='', **kwargs):
close_old_connections() close_old_connections()
print("Task post run: ", task_id, state)
CeleryTask.objects.filter(id=task_id).update(
state=state, date_finished=timezone.now(), is_finished=True
)
@signals.after_task_publish.connect
def task_sent_handler(headers=None, body=None, **kwargs):
info = headers if 'task' in headers else body
task = info.get('task')
i = info.get('id')
if not i or not task:
logger.error("Not found task id or name: {}".format(info))
return
args = info.get('argsrepr', '()')
kwargs = info.get('kwargsrepr', '{}')
try:
args = list(ast.literal_eval(args))
kwargs = ast.literal_eval(kwargs)
except (ValueError, SyntaxError):
args = []
kwargs = {}
data = {
'id': i,
'name': task,
'state': 'PENDING',
'is_finished': False,
'args': args,
'kwargs': kwargs
}
CeleryTask.objects.create(**data)

View File

@ -1,10 +1,10 @@
# coding: utf-8 # coding: utf-8
import os import os
import subprocess import subprocess
import time
from django.conf import settings from django.conf import settings
from celery import shared_task, subtask from celery import shared_task, subtask
from celery import signals
from celery.exceptions import SoftTimeLimitExceeded from celery.exceptions import SoftTimeLimitExceeded
from django.utils import timezone from django.utils import timezone
@ -30,7 +30,7 @@ def rerun_task():
pass pass
@shared_task(queue="ansible") @shared_task(queue="ansible", verbose_name=_("Run ansible task"))
def run_ansible_task(tid, callback=None, **kwargs): def run_ansible_task(tid, callback=None, **kwargs):
""" """
:param tid: is the tasks serialized data :param tid: is the tasks serialized data
@ -49,7 +49,7 @@ def run_ansible_task(tid, callback=None, **kwargs):
return result return result
@shared_task(soft_time_limit=60, queue="ansible") @shared_task(soft_time_limit=60, queue="ansible", verbose_name=_("Run ansible command"))
def run_command_execution(cid, **kwargs): def run_command_execution(cid, **kwargs):
with tmp_to_root_org(): with tmp_to_root_org():
execution = get_object_or_none(CommandExecution, id=cid) execution = get_object_or_none(CommandExecution, id=cid)
@ -136,7 +136,7 @@ def check_server_performance_period():
ServerPerformanceCheckUtil().check_and_publish() ServerPerformanceCheckUtil().check_and_publish()
@shared_task(queue="ansible") @shared_task(queue="ansible", verbose_name=_("Hello"))
def hello(name, callback=None): def hello(name, callback=None):
from users.models import User from users.models import User
import time import time
@ -148,38 +148,12 @@ def hello(name, callback=None):
return gettext("Hello") return gettext("Hello")
@shared_task
# @after_app_shutdown_clean_periodic
# @register_as_period_task(interval=30)
def hello123():
return None
@shared_task @shared_task
def hello_callback(result): def hello_callback(result):
print(result) print(result)
print("Hello callback") print("Hello callback")
@shared_task
def add(a, b):
time.sleep(5)
return a + b
@shared_task
def add_m(x):
from celery import chain
a = range(x)
b = [a[i:i + 10] for i in range(0, len(a), 10)]
s = list()
s.append(add.s(b[0], b[1]))
for i in b[1:]:
s.append(add.s(i))
res = chain(*tuple(s))()
return res
@shared_task @shared_task
def execute_automation_strategy(pid, trigger): def execute_automation_strategy(pid, trigger):
from .models import AutomationStrategy from .models import AutomationStrategy
@ -191,3 +165,4 @@ def execute_automation_strategy(pid, trigger):
with tmp_to_org(instance.org): with tmp_to_org(instance.org):
instance.execute(trigger) instance.execute(trigger)

View File

@ -6,7 +6,7 @@ from orgs.utils import current_org, tmp_to_org
from common.cache import Cache, IntegerField from common.cache import Cache, IntegerField
from common.utils import get_logger from common.utils import get_logger
from users.models import UserGroup, User from users.models import UserGroup, User
from assets.models import Node, Domain, Gateway, Asset from assets.models import Node, Domain, Gateway, Asset, Account
from terminal.models import Session from terminal.models import Session
from perms.models import AssetPermission from perms.models import AssetPermission
@ -52,6 +52,7 @@ class OrgResourceStatisticsCache(OrgRelatedCache):
assets_amount = IntegerField() assets_amount = IntegerField()
nodes_amount = IntegerField(queryset=Node.objects) nodes_amount = IntegerField(queryset=Node.objects)
accounts_amount = IntegerField(queryset=Account.objects)
domains_amount = IntegerField(queryset=Domain.objects) domains_amount = IntegerField(queryset=Domain.objects)
gateways_amount = IntegerField(queryset=Gateway.objects) gateways_amount = IntegerField(queryset=Gateway.objects)
asset_perms_amount = IntegerField(queryset=AssetPermission.objects) asset_perms_amount = IntegerField(queryset=AssetPermission.objects)

View File

@ -6,6 +6,6 @@ logger = get_logger(__file__)
@shared_task @shared_task
def refresh_org_cache_task(cache, *fields): def refresh_org_cache_task(*fields):
logger.info(f'CACHE: refresh <org: {cache.get_current_org()}> {cache.key}.{fields}') from .caches import OrgResourceStatisticsCache
cache.refresh(*fields) OrgResourceStatisticsCache.refresh(*fields)