mirror of https://github.com/jumpserver/jumpserver
[Update] 修改信号
parent
158678c2db
commit
3804ab532d
|
@ -25,7 +25,7 @@ from .hands import IsSuperUser, IsValidUser, IsSuperUserOrAppUser, \
|
|||
get_user_granted_assets
|
||||
from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser
|
||||
from . import serializers
|
||||
from .tasks import update_assets_hardware_info_manual, test_admin_user_connectability_util, \
|
||||
from .tasks import update_asset_hardware_info_manual, test_admin_user_connectability_util, \
|
||||
test_asset_connectability_manual, push_system_user_to_cluster_assets_manual, \
|
||||
test_system_user_connectability_manual
|
||||
|
||||
|
@ -222,7 +222,7 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView):
|
|||
def retrieve(self, request, *args, **kwargs):
|
||||
asset_id = kwargs.get('pk')
|
||||
asset = get_object_or_404(Asset, pk=asset_id)
|
||||
summary = update_assets_hardware_info_manual([asset])[1]
|
||||
summary = update_asset_hardware_info_manual(asset)[1]
|
||||
if summary.get('dark'):
|
||||
return Response(summary['dark'].values(), status=501)
|
||||
else:
|
||||
|
|
|
@ -7,7 +7,5 @@ class AssetsConfig(AppConfig):
|
|||
name = 'assets'
|
||||
|
||||
def ready(self):
|
||||
from .signals import on_app_ready
|
||||
from . import tasks
|
||||
on_app_ready.send(self.__class__)
|
||||
from . import signals_handler
|
||||
super().ready()
|
||||
|
|
|
@ -1,39 +1,19 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
# PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY"
|
||||
PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD: {}")
|
||||
PUSH_SYSTEM_USER_MANUAL_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER MANUALLY: {}")
|
||||
PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}")
|
||||
# PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}"
|
||||
PUSH_SYSTEM_USER_ON_CHANGE_TASK_NAME = _("PUSH SYSTEM USER ON CHANGE: {}")
|
||||
PUSH_SYSTEM_USER_ON_CREATE_TASK_NAME = _("PUSH SYSTEM USER ON CREATE: {}")
|
||||
PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME = _("PUSH SYSTEM USERS ON ASSET CREAT: {}")
|
||||
|
||||
|
||||
UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO')
|
||||
UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO MANUALLY')
|
||||
UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO ON CREATE')
|
||||
# UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY"
|
||||
UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD')
|
||||
UPDATE_ASSETS_HARDWARE_TASKS = [
|
||||
{
|
||||
'name': UPDATE_ASSETS_HARDWARE_TASK_NAME,
|
||||
'name': "setup",
|
||||
'action': {
|
||||
'module': 'setup'
|
||||
}
|
||||
}
|
||||
]
|
||||
|
||||
# TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY"
|
||||
TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD: {}")
|
||||
TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME = _("TEST ADMIN USER CONN MANUALLY: {}")
|
||||
TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}")
|
||||
ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}"
|
||||
TEST_ADMIN_USER_CONN_TASKS = [
|
||||
{
|
||||
"name": "TEST_ADMIN_CONNECTIVE",
|
||||
"name": "ping",
|
||||
"action": {
|
||||
"module": "ping",
|
||||
}
|
||||
|
@ -41,15 +21,11 @@ TEST_ADMIN_USER_CONN_TASKS = [
|
|||
]
|
||||
|
||||
ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}"
|
||||
TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL")
|
||||
|
||||
TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY"
|
||||
TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD: {}")
|
||||
TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME = _("TEST SYSTEM USER CONN MANUALLY: {}")
|
||||
SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}"
|
||||
TEST_SYSTEM_USER_CONN_TASKS = [
|
||||
{
|
||||
"name": "TEST_SYSTEM_USER_CONNECTIVE",
|
||||
"name": "ping",
|
||||
"action": {
|
||||
"module": "ping",
|
||||
}
|
||||
|
|
|
@ -23,6 +23,7 @@ class AssetCreateForm(forms.ModelForm):
|
|||
'groups': forms.SelectMultiple(attrs={'class': 'select2', 'data-placeholder': _('Select asset groups')}),
|
||||
'cluster': forms.Select(attrs={'class': 'select2', 'data-placeholder': _('Select cluster')}),
|
||||
'admin_user': forms.Select(attrs={'class': 'select2', 'data-placeholder': _('Select admin user')}),
|
||||
'port': forms.TextInput()
|
||||
}
|
||||
help_texts = {
|
||||
'hostname': '* required',
|
||||
|
@ -32,6 +33,13 @@ class AssetCreateForm(forms.ModelForm):
|
|||
'admin_user': _('Host level admin user, If not set using cluster admin user default')
|
||||
}
|
||||
|
||||
def clean_admin_user(self):
|
||||
cluster = self.cleaned_data.get('cluster')
|
||||
admin_user = self.cleaned_data.get('admin_user')
|
||||
if not cluster.admin_user and not admin_user:
|
||||
raise forms.ValidationError(_("You need set a admin user if cluster not have"))
|
||||
return self.cleaned_data['admin_user']
|
||||
|
||||
|
||||
class AssetUpdateForm(forms.ModelForm):
|
||||
class Meta:
|
||||
|
@ -53,6 +61,13 @@ class AssetUpdateForm(forms.ModelForm):
|
|||
'admin_user': _('Host level admin user, If not set using cluster admin user default')
|
||||
}
|
||||
|
||||
def clean_admin_user(self):
|
||||
cluster = self.cleaned_data.get('cluster')
|
||||
admin_user = self.cleaned_data.get('admin_user')
|
||||
if not cluster.admin_user and not admin_user:
|
||||
raise forms.ValidationError(_("You need set a admin user if cluster not have"))
|
||||
return self.cleaned_data['admin_user']
|
||||
|
||||
|
||||
class AssetBulkUpdateForm(forms.ModelForm):
|
||||
assets = forms.ModelMultipleChoiceField(
|
||||
|
@ -283,7 +298,6 @@ class SystemUserUpdateForm(SystemUserForm):
|
|||
system_user = super(forms.ModelForm, self).save()
|
||||
|
||||
if private_key_file:
|
||||
print(private_key_file)
|
||||
private_key = private_key_file.read().strip().decode('utf-8')
|
||||
public_key = ssh_pubkey_gen(private_key=private_key)
|
||||
else:
|
||||
|
|
|
@ -207,13 +207,11 @@ class AdminUser(AssetUser):
|
|||
|
||||
|
||||
class SystemUser(AssetUser):
|
||||
SSH_PROTOCOL = 'ssh'
|
||||
PROTOCOL_CHOICES = (
|
||||
('ssh', 'ssh'),
|
||||
)
|
||||
AUTH_METHOD_CHOICES = (
|
||||
('P', 'Password'),
|
||||
('K', 'Public key'),
|
||||
(SSH_PROTOCOL, 'ssh'),
|
||||
)
|
||||
|
||||
cluster = models.ManyToManyField('assets.Cluster', blank=True, verbose_name=_("Cluster"))
|
||||
priority = models.IntegerField(default=10, verbose_name=_("Priority"))
|
||||
protocol = models.CharField(max_length=16, choices=PROTOCOL_CHOICES, default='ssh', verbose_name=_('Protocol'))
|
||||
|
@ -229,6 +227,12 @@ class SystemUser(AssetUser):
|
|||
clusters = self.cluster.all()
|
||||
return Asset.objects.filter(cluster__in=clusters)
|
||||
|
||||
def get_clusters(self):
|
||||
return self.cluster.all()
|
||||
|
||||
def get_clusters_joined(self):
|
||||
return ', '.join([cluster.name for cluster in self.get_clusters()])
|
||||
|
||||
@property
|
||||
def assets_amount(self):
|
||||
return len(self.get_clusters_assets())
|
||||
|
|
|
@ -0,0 +1,130 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
#
|
||||
|
||||
|
||||
from django.db.models.signals import post_save, post_init, m2m_changed, pre_save
|
||||
from django.dispatch import receiver
|
||||
from django.utils.translation import gettext as _
|
||||
|
||||
from common.utils import get_logger
|
||||
from .models import Asset, SystemUser, Cluster
|
||||
from .tasks import update_assets_hardware_info_util, \
|
||||
test_asset_connectability_util, \
|
||||
push_system_user_util
|
||||
|
||||
|
||||
logger = get_logger(__file__)
|
||||
|
||||
|
||||
def update_asset_hardware_info_on_created(asset):
|
||||
logger.debug("Update asset `{}` hardware info".format(asset))
|
||||
update_assets_hardware_info_util.delay([asset])
|
||||
|
||||
|
||||
def test_asset_conn_on_created(asset):
|
||||
logger.debug("Test asset `{}` connectability".format(asset))
|
||||
test_asset_connectability_util.delay(asset)
|
||||
|
||||
|
||||
def push_cluster_system_users_to_asset(asset):
|
||||
logger.info("Push cluster system user to asset: {}".format(asset))
|
||||
task_name = _("Push cluster system users to asset")
|
||||
system_users = asset.cluster.systemuser_set.all()
|
||||
push_system_user_util.delay(system_users, [asset], task_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
|
||||
def on_asset_created(sender, instance=None, created=False, **kwargs):
|
||||
if instance and created:
|
||||
logger.info("Asset `` create signal received".format(instance))
|
||||
update_asset_hardware_info_on_created(instance)
|
||||
test_asset_conn_on_created(instance)
|
||||
push_cluster_system_users_to_asset(instance)
|
||||
|
||||
|
||||
@receiver(post_init, sender=Asset)
|
||||
def on_asset_init(sender, instance, created=False, **kwargs):
|
||||
if instance and created is False:
|
||||
instance.__original_cluster = instance.cluster
|
||||
|
||||
|
||||
@receiver(post_save, sender=Asset)
|
||||
def on_asset_cluster_changed(sender, instance=None, created=False, **kwargs):
|
||||
if instance and created is False and instance.cluster != instance.__original_cluster:
|
||||
logger.info("Asset cluster changed signal received")
|
||||
push_cluster_system_users_to_asset(instance)
|
||||
|
||||
|
||||
def push_to_cluster_assets_on_system_user_created_or_update(system_user):
|
||||
if not system_user.auto_push:
|
||||
return
|
||||
logger.debug("Push system user `{}` to cluster assets".format(system_user.name))
|
||||
for cluster in system_user.cluster.all():
|
||||
task_name = _("Push system user to cluster assets: {}->{}").format(
|
||||
cluster.name, system_user.name
|
||||
)
|
||||
assets = cluster.assets.all()
|
||||
push_system_user_util.delay([system_user], assets, task_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=SystemUser)
|
||||
def on_system_user_created_or_updated(sender, instance=None, **kwargs):
|
||||
if instance and instance.auto_push:
|
||||
logger.info("System user `{}` create or update signal received".format(instance))
|
||||
push_to_cluster_assets_on_system_user_created_or_update(instance)
|
||||
|
||||
|
||||
@receiver(post_init, sender=Cluster, dispatch_uid="my_unique_identifier")
|
||||
def on_cluster_init(sender, instance, **kwargs):
|
||||
logger.debug("On cluster init")
|
||||
instance.__original_assets = tuple(instance.assets.values_list('pk', flat=True))
|
||||
# instance.__origin_system_users = tuple(instance.systemuser_set.all())
|
||||
|
||||
|
||||
@receiver(post_save, sender=Cluster, dispatch_uid="my_unique_identifier")
|
||||
def on_cluster_assets_changed(sender, instance, **kwargs):
|
||||
assets_origin = instance.__original_assets
|
||||
assets_new = instance.assets.values_list('pk', flat=True)
|
||||
assets_added = set(assets_new) - set(assets_origin)
|
||||
if assets_added:
|
||||
logger.debug("Receive cluster change assets signal")
|
||||
logger.debug("Push cluster `{}` system users to: {}".format(
|
||||
instance, ', '.join([str(asset) for asset in assets_added])
|
||||
))
|
||||
assets = []
|
||||
for asset_id in assets_added:
|
||||
try:
|
||||
asset = Asset.objects.get(pk=asset_id)
|
||||
except Asset.DoesNotExist:
|
||||
continue
|
||||
else:
|
||||
assets.append(asset)
|
||||
system_users = [s for s in instance.systemuser_set.all() if s.auto_push]
|
||||
task_name = _("Push system user to assets")
|
||||
push_system_user_util.delay(system_users, assets, task_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=Cluster, dispatch_uid="my_unique_identifier")
|
||||
def on_cluster_system_user_changed(sender, instance, **kwargs):
|
||||
system_users_origin = instance.__origin_system_users
|
||||
system_user_new = instance.systemuser_set.values_list('pk', flat=True)
|
||||
system_users_added = set(system_user_new) - system_users_origin
|
||||
if system_users_added:
|
||||
logger.debug("Receive cluster change system users signal")
|
||||
system_users = []
|
||||
for system_user_id in system_users_added:
|
||||
try:
|
||||
system_user = SystemUser.objects.get(pk=system_user_id)
|
||||
except SystemUser.DoesNotExist:
|
||||
continue
|
||||
else:
|
||||
system_users.append(system_user)
|
||||
logger.debug("Push new system users `{}` to cluster `{}` assets".format(
|
||||
','.join([s.name for s in system_users]), instance
|
||||
))
|
||||
task_name = _(
|
||||
"Push system user to cluster assets: {}->{}").format(
|
||||
instance.name, ', '.join(s.name for s in system_users)
|
||||
)
|
||||
push_system_user_util.delay(system_users, instance.assets.all(), task_name)
|
||||
|
|
@ -3,15 +3,14 @@ import json
|
|||
|
||||
from celery import shared_task
|
||||
from django.core.cache import cache
|
||||
from django.dispatch import receiver
|
||||
from django.db.models.signals import post_save
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from common.utils import get_object_or_none, capacity_convert, \
|
||||
sum_capacity, encrypt_password, get_logger
|
||||
from common.celery import register_as_period_task, after_app_shutdown_clean, \
|
||||
after_app_ready_start, app as celery_app
|
||||
|
||||
from .models import SystemUser, AdminUser, Asset
|
||||
from .models import SystemUser, AdminUser, Asset, Cluster
|
||||
from . import const
|
||||
|
||||
|
||||
|
@ -22,9 +21,9 @@ CACHE_MAX_TIME = 60*60*60
|
|||
|
||||
|
||||
@shared_task
|
||||
def update_assets_hardware_info(result, **kwargs):
|
||||
def set_assets_hardware_info(result, **kwargs):
|
||||
"""
|
||||
Using ops task run result, to update asset info
|
||||
Unsing ops task run result, to update asset info
|
||||
|
||||
@shared_task must be exit, because we using it as a task callback, is must
|
||||
be a celery task also
|
||||
|
@ -36,7 +35,7 @@ def update_assets_hardware_info(result, **kwargs):
|
|||
assets_updated = []
|
||||
for hostname, info in result_raw.get('ok', {}).items():
|
||||
if info:
|
||||
info = info[const.UPDATE_ASSETS_HARDWARE_TASK_NAME]['ansible_facts']
|
||||
info = info['setup']['ansible_facts']
|
||||
else:
|
||||
continue
|
||||
|
||||
|
@ -78,7 +77,7 @@ def update_assets_hardware_info(result, **kwargs):
|
|||
|
||||
|
||||
@shared_task
|
||||
def update_assets_hardware_info_util(assets, task_name):
|
||||
def update_assets_hardware_info_util(assets, task_name=None):
|
||||
"""
|
||||
Using ansible api to update asset hardware info
|
||||
:param assets: asset seq
|
||||
|
@ -86,34 +85,25 @@ def update_assets_hardware_info_util(assets, task_name):
|
|||
:return: result summary ['contacted': {}, 'dark': {}]
|
||||
"""
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
if task_name is None:
|
||||
task_name = _("Update some assets hardware info")
|
||||
tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
|
||||
hostname_list = [asset.hostname for asset in assets]
|
||||
task, _ = update_or_create_ansible_task(
|
||||
task, created = update_or_create_ansible_task(
|
||||
task_name, hosts=hostname_list, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
|
||||
)
|
||||
result = task.run()
|
||||
# Todo: may be somewhere using
|
||||
# Manual run callback function
|
||||
assets_updated = update_assets_hardware_info(result)
|
||||
assets_updated = set_assets_hardware_info(result)
|
||||
return result
|
||||
|
||||
|
||||
@shared_task
|
||||
def update_assets_hardware_info_manual(assets):
|
||||
task_name = const.UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME
|
||||
return update_assets_hardware_info_util(assets, task_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
|
||||
def update_asset_info_on_created(sender, instance=None, created=False, **kwargs):
|
||||
if instance and created:
|
||||
msg = "Receive asset {} create signal, update asset hardware info".format(
|
||||
instance
|
||||
)
|
||||
logger.debug(msg)
|
||||
task_name = const.UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME
|
||||
update_assets_hardware_info_util.delay([instance], task_name)
|
||||
def update_asset_hardware_info_manual(asset):
|
||||
task_name = _("Update asset hardware info")
|
||||
return update_assets_hardware_info_util([asset], task_name=task_name)
|
||||
|
||||
|
||||
@celery_app.task
|
||||
|
@ -126,28 +116,28 @@ def update_assets_hardware_info_period():
|
|||
:return:
|
||||
"""
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME
|
||||
task_name = _("Update assets hardware info period")
|
||||
hostname_list = [asset.hostname for asset in Asset.objects.all()]
|
||||
tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
|
||||
|
||||
# Only create, schedule by celery beat
|
||||
_ = update_or_create_ansible_task(
|
||||
update_or_create_ansible_task(
|
||||
task_name, hosts=hostname_list, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
|
||||
interval=60*60*24, is_periodic=True, callback=update_assets_hardware_info.name,
|
||||
interval=60*60*24, is_periodic=True, callback=set_assets_hardware_info.name,
|
||||
)
|
||||
|
||||
|
||||
## ADMIN USER CONNECTIVE ##
|
||||
|
||||
@shared_task
|
||||
def update_admin_user_connectability_info(result, **kwargs):
|
||||
def set_admin_user_connectability_info(result, **kwargs):
|
||||
admin_user = kwargs.get("admin_user")
|
||||
task_name = kwargs.get("task_name")
|
||||
if admin_user is None and task_name is not None:
|
||||
admin_user = task_name.split(":")[-1]
|
||||
|
||||
_, summary = result
|
||||
raw, summary = result
|
||||
cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user)
|
||||
cache.set(cache_key, summary, CACHE_MAX_TIME)
|
||||
|
||||
|
@ -167,7 +157,6 @@ def test_admin_user_connectability_util(admin_user, task_name):
|
|||
Test asset admin user can connect or not. Using ansible api do that
|
||||
:param admin_user:
|
||||
:param task_name:
|
||||
:param force: Force update
|
||||
:return:
|
||||
"""
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
|
@ -180,7 +169,7 @@ def test_admin_user_connectability_util(admin_user, task_name):
|
|||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
|
||||
)
|
||||
result = task.run()
|
||||
update_admin_user_connectability_info(result, admin_user=admin_user.name)
|
||||
set_admin_user_connectability_info(result, admin_user=admin_user.name)
|
||||
return result
|
||||
|
||||
|
||||
|
@ -195,31 +184,31 @@ def test_admin_user_connectability_period():
|
|||
from ops.utils import update_or_create_ansible_task
|
||||
admin_users = AdminUser.objects.all()
|
||||
for admin_user in admin_users:
|
||||
task_name = const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME.format(admin_user.name)
|
||||
task_name = _("Test admin user connectability period: {}").format(admin_user)
|
||||
assets = admin_user.get_related_assets()
|
||||
hosts = [asset.hostname for asset in assets]
|
||||
tasks = const.TEST_ADMIN_USER_CONN_TASKS
|
||||
_ = update_or_create_ansible_task(
|
||||
update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
|
||||
interval=3600, is_periodic=True,
|
||||
callback=update_admin_user_connectability_info.name,
|
||||
callback=set_admin_user_connectability_info.name,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
def test_admin_user_connectability_manual(admin_user):
|
||||
task_name = const.TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME.format(admin_user.name)
|
||||
task_name = _("Test admin user connectability: {}").format(admin_user.name)
|
||||
return test_admin_user_connectability_util.delay(admin_user, task_name)
|
||||
|
||||
|
||||
@shared_task
|
||||
def test_asset_connectability_manual(asset):
|
||||
def test_asset_connectability_util(asset, task_name=None):
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
|
||||
task_name = const.TEST_ASSET_CONN_TASK_NAME
|
||||
assets = [asset]
|
||||
hosts = [asset.hostname for asset in assets]
|
||||
if task_name is None:
|
||||
task_name = "Test asset connectability"
|
||||
hosts = [asset.hostname]
|
||||
tasks = const.TEST_ADMIN_USER_CONN_TASKS
|
||||
task, created = update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
|
@ -228,30 +217,28 @@ def test_asset_connectability_manual(asset):
|
|||
result = task.run()
|
||||
summary = result[1]
|
||||
if summary.get('dark'):
|
||||
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME)
|
||||
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0,
|
||||
CACHE_MAX_TIME)
|
||||
else:
|
||||
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1,
|
||||
CACHE_MAX_TIME)
|
||||
return summary
|
||||
|
||||
|
||||
@shared_task
|
||||
def test_asset_connectability_manual(asset):
|
||||
summary = test_asset_connectability_util(asset)
|
||||
|
||||
if summary.get('dark'):
|
||||
return False, summary['dark']
|
||||
else:
|
||||
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME)
|
||||
return True, ""
|
||||
|
||||
|
||||
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
|
||||
def update_asset_conn_info_on_created(sender, instance=None, created=False,
|
||||
**kwargs):
|
||||
if instance and created:
|
||||
task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance)
|
||||
msg = "Receive asset {} create signal, test asset connectability".format(
|
||||
instance
|
||||
)
|
||||
logger.debug(msg)
|
||||
test_asset_connectability_manual.delay(instance, task_name)
|
||||
|
||||
|
||||
## System user connective ##
|
||||
|
||||
|
||||
@shared_task
|
||||
def update_system_user_connectablity_info(result, **kwargs):
|
||||
def set_system_user_connectablity_info(result, **kwargs):
|
||||
summary = result[1]
|
||||
task_name = kwargs.get("task_name")
|
||||
system_user = kwargs.get("system_user")
|
||||
|
@ -279,13 +266,13 @@ def test_system_user_connectability_util(system_user, task_name):
|
|||
run_as=system_user.name, created_by="System",
|
||||
)
|
||||
result = task.run()
|
||||
update_system_user_connectablity_info(result, system_user=system_user.name)
|
||||
set_system_user_connectablity_info(result, system_user=system_user.name)
|
||||
return result
|
||||
|
||||
|
||||
@shared_task
|
||||
def test_system_user_connectability_manual(system_user):
|
||||
task_name = const.TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME.format(system_user.name)
|
||||
task_name = "Test system user connectability: {}".format(system_user)
|
||||
return test_system_user_connectability_util(system_user, task_name)
|
||||
|
||||
|
||||
|
@ -297,17 +284,17 @@ def test_system_user_connectability_period():
|
|||
from ops.utils import update_or_create_ansible_task
|
||||
system_users = SystemUser.objects.all()
|
||||
for system_user in system_users:
|
||||
task_name = const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME.format(
|
||||
task_name = _("Test system user connectability period: {}").format(
|
||||
system_user.name
|
||||
)
|
||||
assets = system_user.get_clusters_assets()
|
||||
hosts = [asset.hostname for asset in assets]
|
||||
tasks = const.TEST_SYSTEM_USER_CONN_TASKS
|
||||
_ = update_or_create_ansible_task(
|
||||
update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=False, run_as=system_user.name,
|
||||
created_by='System', interval=3600, is_periodic=True,
|
||||
callback=update_admin_user_connectability_info.name,
|
||||
callback=set_admin_user_connectability_info.name,
|
||||
)
|
||||
|
||||
|
||||
|
@ -351,13 +338,18 @@ def get_push_system_user_tasks(system_user):
|
|||
|
||||
|
||||
@shared_task
|
||||
def push_system_user_util(system_user, task_name):
|
||||
def push_system_user_util(system_users, assets, task_name):
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
tasks = []
|
||||
for system_user in system_users:
|
||||
tasks.extend(get_push_system_user_tasks(system_user))
|
||||
|
||||
print("Task: ", tasks)
|
||||
if not tasks:
|
||||
return
|
||||
|
||||
tasks = get_push_system_user_tasks(system_user)
|
||||
assets = system_user.get_clusters_assets()
|
||||
hosts = [asset.hostname for asset in assets]
|
||||
task, _ = update_or_create_ansible_task(
|
||||
task, created = update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
|
||||
)
|
||||
|
@ -366,8 +358,9 @@ def push_system_user_util(system_user, task_name):
|
|||
|
||||
@shared_task
|
||||
def push_system_user_to_cluster_assets_manual(system_user):
|
||||
task_name = const.PUSH_SYSTEM_USER_MANUAL_TASK_NAME.format(system_user.name)
|
||||
return push_system_user_util(system_user, task_name)
|
||||
task_name = _("Push system user to cluster assets: {}").format(system_user.name)
|
||||
assets = system_user.get_clusters_assets()
|
||||
return push_system_user_util([system_user], assets, task_name)
|
||||
|
||||
|
||||
@shared_task
|
||||
|
@ -376,65 +369,22 @@ def push_system_user_to_cluster_assets_manual(system_user):
|
|||
@after_app_shutdown_clean
|
||||
def push_system_user_period():
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
clusters = Cluster.objects.all()
|
||||
|
||||
for system_user in SystemUser.objects.filter(auto_push=True):
|
||||
assets = system_user.get_clusters_assets()
|
||||
task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME.format(system_user.name)
|
||||
hosts = [asset.hostname for asset in assets]
|
||||
tasks = get_push_system_user_tasks(system_user)
|
||||
for cluster in clusters:
|
||||
tasks = []
|
||||
system_users = [system_user for system_user in cluster.systemuser_set.all() if system_user.auto_push]
|
||||
if not system_users:
|
||||
return
|
||||
for system_user in system_users:
|
||||
tasks.extend(get_push_system_user_tasks(system_user))
|
||||
|
||||
_ = update_or_create_ansible_task(
|
||||
task_name = _("Push system user to cluster assets period: {}->{}").format(
|
||||
cluster.name, ', '.join(s.name for s in system_users)
|
||||
)
|
||||
hosts = [asset.hostname for asset in cluster.assets.all()]
|
||||
update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
|
||||
interval=60*60*24, is_periodic=True,
|
||||
)
|
||||
|
||||
|
||||
@shared_task
|
||||
def push_asset_system_users_util(asset, task_name, system_users=None):
|
||||
from ops.utils import update_or_create_ansible_task
|
||||
|
||||
if system_users is None:
|
||||
system_users = asset.cluster.systemuser_set.all()
|
||||
|
||||
tasks = []
|
||||
for system_user in system_users:
|
||||
if system_user.auto_push:
|
||||
tasks.extend(get_push_system_user_tasks(system_user))
|
||||
|
||||
hosts = [asset.hostname]
|
||||
task, _ = update_or_create_ansible_task(
|
||||
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
|
||||
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
|
||||
)
|
||||
return task.run()
|
||||
|
||||
|
||||
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
|
||||
def push_system_user_on_created(sender, instance=None, created=False, **kwargs):
|
||||
if instance and created:
|
||||
task_name = const.PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME
|
||||
system_users = instance.cluster.systemuser_set.all()
|
||||
msg = "Receive asset {} create signal, push system users".format(
|
||||
instance
|
||||
)
|
||||
logger.debug(msg)
|
||||
push_asset_system_users_util.delay(instance, system_users, task_name=task_name)
|
||||
|
||||
|
||||
@receiver(post_save, sender=SystemUser)
|
||||
def push_system_user_on_change(sender, instance=None, update_fields=None, **kwargs):
|
||||
if instance and instance.auto_push:
|
||||
logger.debug("System user `{}` changed, push it".format(instance.name))
|
||||
task_name = "PUSH SYSTEM USER ON CREATED: {}".format(instance.name)
|
||||
push_system_user_util.delay(instance, task_name)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
@ -28,7 +28,6 @@ from common.utils import get_object_or_none, get_logger, is_uuid
|
|||
from .. import forms
|
||||
from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser
|
||||
from ..hands import AdminUserRequiredMixin
|
||||
from ..tasks import update_assets_hardware_info_util
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
@ -162,10 +161,6 @@ class AssetUpdateView(AdminUserRequiredMixin, UpdateView):
|
|||
kwargs.update(context)
|
||||
return super(AssetUpdateView, self).get_context_data(**kwargs)
|
||||
|
||||
def form_invalid(self, form):
|
||||
logger.error(form.errors)
|
||||
return super().form_invalid(form)
|
||||
|
||||
|
||||
class AssetDeleteView(AdminUserRequiredMixin, DeleteView):
|
||||
model = Asset
|
||||
|
|
|
@ -336,11 +336,15 @@ CELERY_RESULT_SERIALIZER = 'pickle'
|
|||
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
|
||||
CELERY_ACCEPT_CONTENT = ['json', 'pickle']
|
||||
CELERY_RESULT_EXPIRES = 3600
|
||||
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
|
||||
CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
|
||||
# CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
|
||||
CELERY_WORKER_LOG_FORMAT = '%(message)s'
|
||||
# CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
|
||||
CELERY_WORKER_TASK_LOG_FORMAT = '%(message)s'
|
||||
# CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
|
||||
CELERY_TASK_EAGER_PROPAGATES = True
|
||||
# CELERY_TIMEZONE = TIME_ZONE
|
||||
# CELERY_ENABLE_UTC = True
|
||||
CELERY_REDIRECT_STDOUTS = True
|
||||
CELERY_REDIRECT_STDOUTS_LEVEL = "INFO"
|
||||
CELERY_WORKER_HIJACK_ROOT_LOGGER = False
|
||||
|
||||
|
||||
# Cache use redis
|
||||
|
|
|
@ -51,7 +51,6 @@ class AdHocResultCallback(CallbackModule):
|
|||
contacted.remove(host)
|
||||
|
||||
def v2_runner_on_failed(self, result, ignore_errors=False):
|
||||
print("#######RUN FAILED" * 19)
|
||||
self.gather_result("failed", result)
|
||||
super().v2_runner_on_failed(result, ignore_errors=ignore_errors)
|
||||
|
||||
|
|
|
@ -5,8 +5,8 @@ import subprocess
|
|||
import threading
|
||||
import time
|
||||
import argparse
|
||||
import platform
|
||||
import sys
|
||||
import signal
|
||||
|
||||
from apps import __version__
|
||||
|
||||
|
@ -25,9 +25,7 @@ LOG_LEVEL = CONFIG.LOG_LEVEL
|
|||
WORKERS = 4
|
||||
|
||||
EXIT_EVENT = threading.Event()
|
||||
EXIT_MSGS = []
|
||||
|
||||
|
||||
processes = {}
|
||||
|
||||
try:
|
||||
os.makedirs(os.path.join(BASE_DIR, "data", "static"))
|
||||
|
@ -97,7 +95,6 @@ def start_service(services):
|
|||
__version__))
|
||||
print('Quit the server with CONTROL-C.')
|
||||
|
||||
processes = {}
|
||||
services_all = {
|
||||
"gunicorn": start_gunicorn,
|
||||
"celery": start_celery,
|
||||
|
@ -126,6 +123,12 @@ def start_service(services):
|
|||
time.sleep(5)
|
||||
|
||||
|
||||
def stop_service():
|
||||
for name, proc in processes.items():
|
||||
print("Stop service {}".format(name))
|
||||
proc.terminate()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
parser = argparse.ArgumentParser(description="Jumpserver start tools")
|
||||
parser.add_argument("services", type=str, nargs='+', default="all",
|
||||
|
@ -133,6 +136,9 @@ if __name__ == '__main__':
|
|||
help="The service to start",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
start_service(args.services)
|
||||
try:
|
||||
start_service(args.services)
|
||||
except KeyboardInterrupt:
|
||||
stop_service()
|
||||
|
||||
|
||||
|
|
Loading…
Reference in New Issue