[Update] 修改ops task运行

pull/828/merge
ibuler 2017-12-24 18:53:07 +08:00
parent 30efec1b09
commit bf9bb1b973
22 changed files with 503 additions and 452 deletions

View File

@ -25,9 +25,9 @@ from .hands import IsSuperUser, IsValidUser, IsSuperUserOrAppUser, \
get_user_granted_assets get_user_granted_assets
from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser from .models import AssetGroup, Asset, Cluster, SystemUser, AdminUser
from . import serializers from . import serializers
from .tasks import update_assets_hardware_info, test_admin_user_connectability, \ from .tasks import update_assets_hardware_info_manual, test_admin_user_connectability_util, \
test_admin_user_connectability_manual, push_system_user_to_cluster_assets, \ test_asset_connectability_manual, push_system_user_to_cluster_assets_manual, \
test_system_user_connectability test_system_user_connectability_manual
class AssetViewSet(IDInFilterMixin, BulkModelViewSet): class AssetViewSet(IDInFilterMixin, BulkModelViewSet):
@ -222,7 +222,7 @@ class AssetRefreshHardwareApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
asset_id = kwargs.get('pk') asset_id = kwargs.get('pk')
asset = get_object_or_404(Asset, pk=asset_id) asset = get_object_or_404(Asset, pk=asset_id)
summary = update_assets_hardware_info([asset]) summary = update_assets_hardware_info_manual([asset])
if summary.get('dark'): if summary.get('dark'):
return Response(summary['dark'].values(), status=501) return Response(summary['dark'].values(), status=501)
else: else:
@ -239,7 +239,7 @@ class AssetAdminUserTestApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
asset_id = kwargs.get('pk') asset_id = kwargs.get('pk')
asset = get_object_or_404(Asset, pk=asset_id) asset = get_object_or_404(Asset, pk=asset_id)
ok, msg = test_admin_user_connectability_manual(asset) ok, msg = test_asset_connectability_manual(asset)
if ok: if ok:
return Response({"msg": "pong"}) return Response({"msg": "pong"})
else: else:
@ -255,7 +255,7 @@ class AdminUserTestConnectiveApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
admin_user = self.get_object() admin_user = self.get_object()
test_admin_user_connectability.delay(admin_user, force=True) test_admin_user_connectability_util.delay(admin_user, force=True)
return Response({"msg": "Task created"}) return Response({"msg": "Task created"})
@ -268,7 +268,7 @@ class SystemUserPushApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
system_user = self.get_object() system_user = self.get_object()
push_system_user_to_cluster_assets.delay(system_user, force=True) push_system_user_to_cluster_assets_manual.delay(system_user, force=True)
return Response({"msg": "Task created"}) return Response({"msg": "Task created"})
@ -281,5 +281,5 @@ class SystemUserTestConnectiveApi(generics.RetrieveAPIView):
def retrieve(self, request, *args, **kwargs): def retrieve(self, request, *args, **kwargs):
system_user = self.get_object() system_user = self.get_object()
test_system_user_connectability.delay(system_user, force=True) test_system_user_connectability_manual.delay(system_user, force=True)
return Response({"msg": "Task created"}) return Response({"msg": "Task created"})

View File

@ -2,14 +2,20 @@
# #
from django.utils.translation import ugettext as _ from django.utils.translation import ugettext as _
PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY" # PUSH_SYSTEM_USER_PERIOD_LOCK_KEY = "PUSH_SYSTEM_USER_PERIOD_KEY"
PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD TASK") PUSH_SYSTEM_USER_PERIOD_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER PERIOD: {}")
PUSH_SYSTEM_USER_MANUAL_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER MANUALLY: {}")
PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}") PUSH_SYSTEM_USER_TASK_NAME = _("PUSH SYSTEM USER TO CLUSTER: {}")
PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}" # PUSH_SYSTEM_USER_LOCK_KEY = "PUSH_SYSTEM_USER_TO_CLUSTER_LOCK_{}"
PUSH_SYSTEM_USER_ON_CHANGE_TASK_NAME = _("PUSH SYSTEM USER ON CHANGE: {}")
PUSH_SYSTEM_USER_ON_CREATE_TASK_NAME = _("PUSH SYSTEM USER ON CREATE: {}")
PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME = _("PUSH SYSTEM USERS ON ASSET CREAT: {}")
UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO') UPDATE_ASSETS_HARDWARE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO')
UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY" UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO MANUALLY')
UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO ON CREATE')
# UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY = "UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY"
UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD') UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME = _('UPDATE ASSETS HARDWARE INFO PERIOD')
UPDATE_ASSETS_HARDWARE_TASKS = [ UPDATE_ASSETS_HARDWARE_TASKS = [
{ {
@ -20,10 +26,10 @@ UPDATE_ASSETS_HARDWARE_TASKS = [
} }
] ]
TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY" # TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY = "TEST_ADMIN_USER_CONN_PERIOD_KEY"
TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD TASK") TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME = _("TEST ADMIN USER CONN PERIOD: {}")
TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME = _("TEST ADMIN USER CONN MANUALLY: {}")
TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}") TEST_ADMIN_USER_CONN_TASK_NAME = _("TEST ADMIN USER CONN: {}")
TEST_ADMIN_USER_CONN_LOCK_KEY = TEST_ADMIN_USER_CONN_TASK_NAME
ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}" ADMIN_USER_CONN_CACHE_KEY = "ADMIN_USER_CONN_{}"
TEST_ADMIN_USER_CONN_TASKS = [ TEST_ADMIN_USER_CONN_TASKS = [
{ {
@ -38,10 +44,8 @@ ASSET_ADMIN_CONN_CACHE_KEY = "ASSET_ADMIN_USER_CONN_{}"
TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL") TEST_ASSET_CONN_TASK_NAME = _("ASSET CONN TEST MANUAL")
TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY" TEST_SYSTEM_USER_CONN_PERIOD_LOCK_KEY = "TEST_SYSTEM_USER_CONN_PERIOD_KEY"
TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD TASK") TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME = _("TEST SYSTEM USER CONN PERIOD: {}")
TEST_SYSTEM_USER_CONN_CACHE_KEY_PREFIX = "SYSTEM_USER_CONN_" TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME = _("TEST SYSTEM USER CONN MANUALLY: {}")
TEST_SYSTEM_USER_CONN_TASK_NAME = _("TEST SYSTEM USER CONN: {}")
TEST_SYSTEM_USER_CONN_LOCK_KEY = "TEST_SYSTEM_USER_CONN_{}"
SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}" SYSTEM_USER_CONN_CACHE_KEY = "SYSTEM_USER_CONN_{}"
TEST_SYSTEM_USER_CONN_TASKS = [ TEST_SYSTEM_USER_CONN_TASKS = [
{ {

View File

@ -13,13 +13,14 @@ from django.db import models
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django.conf import settings from django.conf import settings
from common.utils import signer, ssh_key_string_to_obj, ssh_key_gen from common.utils import get_signer, ssh_key_string_to_obj, ssh_key_gen
from .utils import private_key_validator from .utils import private_key_validator
from ..const import SYSTEM_USER_CONN_CACHE_KEY from ..const import SYSTEM_USER_CONN_CACHE_KEY
__all__ = ['AdminUser', 'SystemUser',] __all__ = ['AdminUser', 'SystemUser',]
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
signer = get_signer()
class AssetUser(models.Model): class AssetUser(models.Model):

View File

@ -8,9 +8,11 @@ from django.db.models.signals import post_save
from common.utils import get_object_or_none, capacity_convert, \ from common.utils import get_object_or_none, capacity_convert, \
sum_capacity, encrypt_password, get_logger sum_capacity, encrypt_password, get_logger
from common.celery import register_as_period_task, after_app_shutdown_clean, \
after_app_ready_start, app as celery_app
from .models import SystemUser, AdminUser, Asset from .models import SystemUser, AdminUser, Asset
from . import const from . import const
from ops.decorators import register_as_period_task
FORKS = 10 FORKS = 10
@ -19,7 +21,18 @@ logger = get_logger(__file__)
CACHE_MAX_TIME = 60*60*60 CACHE_MAX_TIME = 60*60*60
def _update_asset_info(result_raw): @shared_task
def update_assets_hardware_info(result, **kwargs):
"""
Using ops task run result, to update asset info
@shared_task must be exit, because we using it as a task callback, is must
be a celery task also
:param result:
:param kwargs: {task_name: ""}
:return:
"""
result_raw = result[0]
assets_updated = [] assets_updated = []
for hostname, info in result_raw['ok'].items(): for hostname, info in result_raw['ok'].items():
if info: if info:
@ -65,173 +78,240 @@ def _update_asset_info(result_raw):
@shared_task @shared_task
def update_assets_hardware_info(assets, task_name=None): def update_assets_hardware_info_util(assets, task_name):
""" """
Using ansible api to update asset hardware info Using ansible api to update asset hardware info
:param assets: asset seq :param assets: asset seq
:param task_name: task_name running :param task_name: task_name running
:return: result summary ['contacted': {}, 'dark': {}] :return: result summary ['contacted': {}, 'dark': {}]
""" """
from ops.utils import create_or_update_ansible_task from ops.utils import update_or_create_ansible_task
if task_name is None:
task_name = const.UPDATE_ASSETS_HARDWARE_TASK_NAME
tasks = const.UPDATE_ASSETS_HARDWARE_TASKS tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
hostname_list = [asset.hostname for asset in assets] hostname_list = [asset.hostname for asset in assets]
task = create_or_update_ansible_task( task, _ = update_or_create_ansible_task(
task_name, hosts=hostname_list, tasks=tasks, pattern='all', task_name, hosts=hostname_list, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
) )
result = task.run() result = task.run()
summary, result_raw = result.results_summary, result.results_raw # Todo: may be somewhere using
# TOdo: may be somewhere using # Manual run callback function
assets_updated = _update_asset_info(result_raw) assets_updated = update_assets_hardware_info(result)
return summary return result
@shared_task @shared_task
@register_as_period_task(interval=60*60*60*24) def update_assets_hardware_info_manual(assets):
def update_assets_hardware_period(): task_name = const.UPDATE_ASSETS_HARDWARE_MANUAL_TASK_NAME
return update_assets_hardware_info_util(assets, task_name)
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
def update_asset_info_on_created(sender, instance=None, created=False, **kwargs):
if instance and created:
msg = "Receive asset {} create signal, update asset hardware info".format(
instance
)
logger.debug(msg)
task_name = const.UPDATE_ASSETS_HARDWARE_ON_CREATE_TASK_NAME
update_assets_hardware_info_util.delay([instance], task_name)
@celery_app.task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def update_assets_hardware_info_period():
""" """
Update asset hardware period task Update asset hardware period task
:return: :return:
""" """
from ops.utils import create_or_update_ansible_task from ops.utils import update_or_create_ansible_task
task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME task_name = const.UPDATE_ASSETS_HARDWARE_PERIOD_TASK_NAME
if cache.get(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY) == 1: hostname_list = [asset.hostname for asset in Asset.objects.all()]
msg = "Task {} is running or before long, passed this time".format( tasks = const.UPDATE_ASSETS_HARDWARE_TASKS
task_name
)
logger.debug(msg)
return {}
# Todo: set cache but not update, because we want also set it to as a
# minimum update time too
cache.set(const.UPDATE_ASSETS_HARDWARE_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME)
assets = Asset.objects.filter(type__in=['Server', 'VM'])
return update_assets_hardware_info(assets, task_name=task_name)
# Only create, schedule by celery beat
_ = update_or_create_ansible_task(
task_name, hosts=hostname_list, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
interval=60*60*24, is_periodic=True, callback=update_assets_hardware_info.name,
)
## ADMIN USER CONNECTIVE ##
@shared_task @shared_task
def test_admin_user_connectability(admin_user, force=False): def update_admin_user_connectability_info(result, **kwargs):
""" admin_user = kwargs.get("admin_user")
Test asset admin user can connect or not. Using ansible api do that task_name = kwargs.get("task_name")
:param admin_user: if admin_user is None and task_name is not None:
:param force: Force update admin_user = task_name.split(":")[-1]
:return:
"""
from ops.utils import create_or_update_ansible_task
task_name = const.TEST_ADMIN_USER_CONN_TASK_NAME.format(admin_user.name) _, summary = result
lock_key = const.TEST_ADMIN_USER_CONN_LOCK_KEY.format(admin_user.name) cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user)
cache.set(cache_key, summary, CACHE_MAX_TIME)
if cache.get(lock_key, 0) == 1 and not force: for i in summary.get('contacted', []):
logger.debug("Task {} is running or before along, passed this time")
return {}
assets = admin_user.get_related_assets()
hosts = [asset.hostname for asset in assets]
tasks = const.TEST_ADMIN_USER_CONN_TASKS
task = create_or_update_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
)
cache.set(lock_key, 1, CACHE_MAX_TIME)
result = task.run()
cache_key = const.ADMIN_USER_CONN_CACHE_KEY.format(admin_user.name)
cache.set(cache_key, result.results_summary, CACHE_MAX_TIME)
for i in result.results_summary.get('contacted', []):
asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i)
cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME) cache.set(asset_conn_cache_key, 1, CACHE_MAX_TIME)
for i, msg in result.results_summary.get('dark', {}).items(): for i, msg in summary.get('dark', {}).items():
asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i) asset_conn_cache_key = const.ASSET_ADMIN_CONN_CACHE_KEY.format(i)
cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME) cache.set(asset_conn_cache_key, 0, CACHE_MAX_TIME)
logger.error(msg) logger.error(msg)
return result.results_summary
@shared_task @shared_task
def test_admin_user_connectability_period(): def test_admin_user_connectability_util(admin_user, task_name):
if cache.get(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY) == 1: """
msg = "{} task is running or before long, passed this time".format( Test asset admin user can connect or not. Using ansible api do that
const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME :param admin_user:
) :param task_name:
logger.debug(msg) :param force: Force update
return :return:
"""
from ops.utils import update_or_create_ansible_task
logger.debug("Task {} start".format(const.TEST_ADMIN_USER_CONN_TASK_NAME)) assets = admin_user.get_related_assets()
cache.set(const.TEST_ADMIN_USER_CONN_PERIOD_LOCK_KEY, 1, CACHE_MAX_TIME) hosts = [asset.hostname for asset in assets]
admin_users = AdminUser.objects.all()
for admin_user in admin_users:
test_admin_user_connectability(admin_user)
@shared_task
def test_admin_user_connectability_manual(asset, task_name=None):
from ops.utils import create_or_update_ansible_task
if task_name is None:
task_name = const.TEST_ASSET_CONN_TASK_NAME
hosts = [asset.hostname]
tasks = const.TEST_ADMIN_USER_CONN_TASKS tasks = const.TEST_ADMIN_USER_CONN_TASKS
task = create_or_update_ansible_task( task, created = update_or_create_ansible_task(
task_name, tasks=tasks, hosts=hosts, run_as_admin=True, task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
created_by='System', options=const.TASK_OPTIONS, pattern='all', options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
) )
result = task.run() result = task.run()
update_admin_user_connectability_info(result, admin_user=admin_user.name)
return result
if result.results_summary['dark']:
@celery_app.task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def test_admin_user_connectability_period():
"""
A period task that update the ansible task period
"""
from ops.utils import update_or_create_ansible_task
admin_users = AdminUser.objects.all()
for admin_user in admin_users:
task_name = const.TEST_ADMIN_USER_CONN_PERIOD_TASK_NAME.format(admin_user.name)
assets = admin_user.get_related_assets()
hosts = [asset.hostname for asset in assets]
tasks = const.TEST_ADMIN_USER_CONN_TASKS
_ = update_or_create_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
interval=3600, is_periodic=True,
callback=update_admin_user_connectability_info.name,
)
@shared_task
def test_admin_user_connectability_manual(admin_user):
task_name = const.TEST_ADMIN_USER_CONN_MANUAL_TASK_NAME.format(admin_user.name)
return test_admin_user_connectability_util.delay(admin_user, task_name)
@shared_task
def test_asset_connectability_manual(asset):
from ops.utils import update_or_create_ansible_task
task_name = const.TEST_ASSET_CONN_TASK_NAME
assets = [asset]
hosts = [asset.hostname for asset in assets]
tasks = const.TEST_ADMIN_USER_CONN_TASKS
task, created = update_or_create_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
)
result = task.run()
summary = result[1]
if summary.get('dark'):
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME) cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 0, CACHE_MAX_TIME)
return False, result.results_summary['dark'] return False, summary['dark']
else: else:
cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME) cache.set(const.ASSET_ADMIN_CONN_CACHE_KEY.format(asset.hostname), 1, CACHE_MAX_TIME)
return True, "" return True, ""
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
def update_asset_conn_info_on_created(sender, instance=None, created=False,
**kwargs):
if instance and created:
task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance)
msg = "Receive asset {} create signal, test asset connectability".format(
instance
)
logger.debug(msg)
test_asset_connectability_manual.delay(instance, task_name)
## System user connective ##
@shared_task @shared_task
def test_system_user_connectability(system_user, force=False): def update_system_user_connectablity_info(result, **kwargs):
summary = result[1]
task_name = kwargs.get("task_name")
system_user = kwargs.get("system_user")
if system_user is None:
system_user = task_name.split(":")[-1]
cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user)
cache.set(cache_key, summary, CACHE_MAX_TIME)
@shared_task
def test_system_user_connectability_util(system_user, task_name):
""" """
Test system cant connect his assets or not. Test system cant connect his assets or not.
:param system_user: :param system_user:
:param force :param task_name:
:return: :return:
""" """
from ops.utils import create_or_update_ansible_task from ops.utils import update_or_create_ansible_task
lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY.format(system_user.name)
task_name = const.TEST_SYSTEM_USER_CONN_TASK_NAME.format(system_user.name)
if cache.get(lock_key, 0) == 1 and not force:
logger.debug("Task {} is running or before long, passed this time".format(task_name))
return {}
assets = system_user.get_clusters_assets() assets = system_user.get_clusters_assets()
hosts = [asset.hostname for asset in assets] hosts = [asset.hostname for asset in assets]
tasks = const.TEST_SYSTEM_USER_CONN_TASKS tasks = const.TEST_SYSTEM_USER_CONN_TASKS
task = create_or_update_ansible_task( task, created = update_or_create_ansible_task(
task_name, hosts=hosts, tasks=tasks, pattern='all', task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, options=const.TASK_OPTIONS,
run_as=system_user.name, created_by="System", run_as=system_user.name, created_by="System",
) )
cache.set(lock_key, 1, CACHE_MAX_TIME)
result = task.run() result = task.run()
cache_key = const.SYSTEM_USER_CONN_CACHE_KEY.format(system_user.name) update_system_user_connectablity_info(result, system_user=system_user.name)
print("Set cache: {} {}".format(cache_key, result.results_summary)) return result
cache.set(cache_key, result.results_summary, CACHE_MAX_TIME)
return result.results_summary
@shared_task @shared_task
def test_system_user_connectability_manual(system_user):
task_name = const.TEST_SYSTEM_USER_CONN_MANUAL_TASK_NAME.format(system_user.name)
return test_system_user_connectability_util(system_user, task_name)
@shared_task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def test_system_user_connectability_period(): def test_system_user_connectability_period():
lock_key = const.TEST_SYSTEM_USER_CONN_LOCK_KEY from ops.utils import update_or_create_ansible_task
if cache.get(lock_key) == 1: system_users = SystemUser.objects.all()
logger.debug("{} task is running, passed this time".format( for system_user in system_users:
const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME task_name = const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME.format(
)) system_user.name
return )
assets = system_user.get_clusters_assets()
hosts = [asset.hostname for asset in assets]
tasks = const.TEST_SYSTEM_USER_CONN_TASKS
_ = update_or_create_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=False, run_as=system_user.name,
created_by='System', interval=3600, is_periodic=True,
callback=update_admin_user_connectability_info.name,
)
logger.debug("Task {} start".format(const.TEST_SYSTEM_USER_CONN_PERIOD_TASK_NAME))
cache.set(lock_key, 1, CACHE_MAX_TIME)
for system_user in SystemUser.objects.all():
test_system_user_connectability(system_user)
#### Push system user tasks ####
def get_push_system_user_tasks(system_user): def get_push_system_user_tasks(system_user):
tasks = [ tasks = [
@ -271,75 +351,48 @@ def get_push_system_user_tasks(system_user):
@shared_task @shared_task
def push_system_user(system_user, assets, task_name=None): def push_system_user_util(system_user, task_name):
from ops.utils import create_or_update_ansible_task from ops.utils import update_or_create_ansible_task
if system_user.auto_push and assets: tasks = get_push_system_user_tasks(system_user)
if task_name is None: assets = system_user.get_clusters_assets()
task_name = 'PUSH-SYSTEM-USER-{}'.format(system_user.name) hosts = [asset.hostname for asset in assets]
task, _ = update_or_create_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
)
return task.run()
@shared_task
def push_system_user_to_cluster_assets_manual(system_user):
task_name = const.PUSH_SYSTEM_USER_MANUAL_TASK_NAME.format(system_user.name)
return push_system_user_util(system_user, task_name)
@shared_task
@register_as_period_task(interval=3600)
@after_app_ready_start
@after_app_shutdown_clean
def push_system_user_period():
from ops.utils import update_or_create_ansible_task
for system_user in SystemUser.objects.filter(auto_push=True):
assets = system_user.get_clusters_assets()
task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME.format(system_user.name)
hosts = [asset.hostname for asset in assets] hosts = [asset.hostname for asset in assets]
tasks = get_push_system_user_tasks(system_user) tasks = get_push_system_user_tasks(system_user)
task = create_or_update_ansible_task( _ = update_or_create_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' options=const.TASK_OPTIONS, run_as_admin=True, created_by='System',
interval=60*60*24, is_periodic=True,
) )
result = task.run()
for i in result.results_summary.get('contacted'):
logger.debug("Push system user {} to {} [OK]".format(
system_user.name, i
))
for i in result.results_summary.get('dark'):
logger.error("Push system user {} to {} [FAILED]".format(
system_user.name, i
))
return result.results_summary
else:
msg = "Task {} does'nt execute, because auto_push " \
"is not True, or not assets".format(task_name)
logger.debug(msg)
return {}
@shared_task @shared_task
def push_system_user_to_cluster_assets(system_user, force=False): def push_asset_system_users_util(asset, task_name, system_users=None):
lock_key = const.PUSH_SYSTEM_USER_LOCK_KEY from ops.utils import update_or_create_ansible_task
task_name = const.PUSH_SYSTEM_USER_TASK_NAME.format(system_user.name)
if cache.get(lock_key, 0) == 1 and not force:
msg = "Task {} is running or before long, passed this time".format(
task_name
)
logger.debug(msg)
return {}
logger.debug("Task {} start".format(task_name))
assets = system_user.get_clusters_assets()
summary = push_system_user(system_user, assets, task_name)
return summary
@shared_task
def push_system_user_period():
task_name = const.PUSH_SYSTEM_USER_PERIOD_TASK_NAME
if cache.get(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY) == 1:
msg = "Task {} is running or before long, passed this time".format(
task_name
)
logger.debug(msg)
return
logger.debug("Task {} start".format(task_name))
cache.set(const.PUSH_SYSTEM_USER_PERIOD_LOCK_KEY, 1, timeout=CACHE_MAX_TIME)
for system_user in SystemUser.objects.filter(auto_push=True):
push_system_user_to_cluster_assets(system_user)
@shared_task
def push_asset_system_users(asset, system_users=None, task_name=None):
from ops.utils import create_or_update_ansible_task
if task_name is None:
task_name = "PUSH-ASSET-SYSTEM-USER-{}".format(asset.hostname)
if system_users is None: if system_users is None:
system_users = asset.cluster.systemuser_set.all() system_users = asset.cluster.systemuser_set.all()
@ -350,81 +403,38 @@ def push_asset_system_users(asset, system_users=None, task_name=None):
tasks.extend(get_push_system_user_tasks(system_user)) tasks.extend(get_push_system_user_tasks(system_user))
hosts = [asset.hostname] hosts = [asset.hostname]
task, _ = update_or_create_ansible_task(
task = create_or_update_ansible_task(
task_name=task_name, hosts=hosts, tasks=tasks, pattern='all', task_name=task_name, hosts=hosts, tasks=tasks, pattern='all',
options=const.TASK_OPTIONS, run_as_admin=True, created_by='System' options=const.TASK_OPTIONS, run_as_admin=True, created_by='System'
) )
result = task.run() return task.run()
return result.results_summary
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
def update_asset_info_when_created(sender, instance=None, created=False, **kwargs):
if instance and created:
msg = "Receive asset {} create signal, update asset hardware info".format(
instance
)
logger.debug(msg)
task_name = "UPDATE-ASSET-HARDWARE-INFO-WHEN-CREATED"
update_assets_hardware_info.delay([instance], task_name)
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
def update_asset_conn_info_on_created(sender, instance=None, created=False, **kwargs):
if instance and created:
task_name = 'TEST-ASSET-CONN-WHEN-CREATED-{}'.format(instance)
msg = "Receive asset {} create signal, test asset connectability".format(
instance
)
logger.debug(msg)
test_admin_user_connectability_manual.delay(instance, task_name)
@receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier") @receiver(post_save, sender=Asset, dispatch_uid="my_unique_identifier")
def push_system_user_on_created(sender, instance=None, created=False, **kwargs): def push_system_user_on_created(sender, instance=None, created=False, **kwargs):
if instance and created: if instance and created:
task_name = 'PUSH-SYSTEM-USER-WHEN-ASSET-CREATED-{}'.format(instance) task_name = const.PUSH_SYSTEM_USERS_ON_ASSET_CREATE_TASK_NAME
system_users = instance.cluster.systemuser_set.all() system_users = instance.cluster.systemuser_set.all()
msg = "Receive asset {} create signal, push system users".format( msg = "Receive asset {} create signal, push system users".format(
instance instance
) )
logger.debug(msg) logger.debug(msg)
push_asset_system_users.delay(instance, system_users, task_name=task_name) push_asset_system_users_util.delay(instance, system_users, task_name=task_name)
@receiver(post_save, sender=SystemUser) @receiver(post_save, sender=SystemUser)
def push_system_user_on_auth_change(sender, instance=None, update_fields=None, **kwargs): def push_system_user_on_change(sender, instance=None, update_fields=None, **kwargs):
fields_check = {'_password', '_private_key', '_public_key'} if instance and instance.auto_push:
auth_changed = update_fields & fields_check if update_fields else None logger.debug("System user `{}` changed, push it".format(instance.name))
if instance and instance.auto_push and auth_changed: task_name = "PUSH SYSTEM USER ON CREATED: {}".format(instance.name)
logger.debug("System user `{}` auth changed, push it".format(instance.name)) push_system_user_util.delay(instance, task_name)
task_name = "PUSH-SYSTEM-USER-ON-CREATED-{}".format(instance.name)
push_system_user_to_cluster_assets.delay(instance, task_name)
periodic_tasks = (
{
'update_assets_hardware_period': {
'task': 'assets.tasks.update_assets_hardware_period',
'schedule': 60*60*60*24,
'args': (),
},
'test-admin-user-connectability_period': {
'task': 'assets.tasks.test_admin_user_connectability_period',
'schedule': 60*60*60,
'args': (),
},
'push_system_user_period': {
'task': 'assets.tasks.push_system_user_period',
'schedule': 60*60*60*24,
'args': (),
}
}
)
def initial_periodic_tasks():
from ops.utils import create_periodic_tasks
create_periodic_tasks(periodic_tasks)

View File

@ -49,9 +49,6 @@ $(document).ready(function(){
"aaSorting": [[2, "asc"]], "aaSorting": [[2, "asc"]],
"aoColumnDefs": [ { "bSortable": false, "aTargets": [ 0 ] }], "aoColumnDefs": [ { "bSortable": false, "aTargets": [ 0 ] }],
"bAutoWidth": false, "bAutoWidth": false,
"language": {
"url": "/static/js/plugins/dataTables/i18n/zh-hans.json"
},
columns: [ columns: [
{data: "checkbox"}, {data: "checkbox"},
{data: "id"}, {data: "id"},

View File

@ -28,7 +28,7 @@ from common.utils import get_object_or_none, get_logger, is_uuid
from .. import forms from .. import forms
from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser from ..models import Asset, AssetGroup, AdminUser, Cluster, SystemUser
from ..hands import AdminUserRequiredMixin from ..hands import AdminUserRequiredMixin
from ..tasks import update_assets_hardware_info from ..tasks import update_assets_hardware_info_util
__all__ = [ __all__ = [
@ -314,10 +314,6 @@ class BulkImportAssetView(AdminUserRequiredMixin, JSONResponseMixin, FormView):
except Exception as e: except Exception as e:
failed.append('%s: %s' % (asset_dict['hostname'], str(e))) failed.append('%s: %s' % (asset_dict['hostname'], str(e)))
if assets:
update_assets_hardware_info.delay([asset._to_secret_json() for asset in assets])
data = { data = {
'created': created, 'created': created,
'created_info': 'Created {}'.format(len(created)), 'created_info': 'Created {}'.format(len(created)),

View File

@ -1,8 +1,15 @@
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import os import os
import json
from functools import wraps
from celery import Celery from celery import Celery, subtask
from celery.signals import worker_ready, worker_shutdown
from .utils import get_logger
logger = get_logger(__file__)
# set the default Django settings module for the 'celery' program. # set the default Django settings module for the 'celery' program.
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings') os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'jumpserver.settings')
@ -15,3 +22,167 @@ app = Celery('jumpserver')
# pickle the object when using Windows. # pickle the object when using Windows.
app.config_from_object('django.conf:settings', namespace='CELERY') app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS]) app.autodiscover_tasks(lambda: [app_config.split('.')[0] for app_config in settings.INSTALLED_APPS])
def create_or_update_celery_periodic_tasks(tasks):
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
"""
:param tasks: {
'add-every-monday-morning': {
'task': 'tasks.add' # A registered celery task,
'interval': 30,
'crontab': "30 7 * * *",
'args': (16, 16),
'kwargs': {},
'enabled': False,
},
}
:return:
"""
# Todo: check task valid, task and callback must be a celery task
for name, detail in tasks.items():
interval = None
crontab = None
if isinstance(detail.get("interval"), int):
intervals = IntervalSchedule.objects.filter(
every=detail["interval"], period=IntervalSchedule.SECONDS
)
if intervals:
interval = intervals[0]
else:
interval = IntervalSchedule.objects.create(
every=detail['interval'],
period=IntervalSchedule.SECONDS,
)
elif isinstance(detail.get("crontab"), str):
try:
minute, hour, day, month, week = detail["crontab"].split()
except ValueError:
raise SyntaxError("crontab is not valid")
kwargs = dict(
minute=minute, hour=hour, day_of_week=week,
day_of_month=day, month_of_year=month,
)
contabs = CrontabSchedule.objects.filter(
**kwargs
)
if contabs:
crontab = contabs[0]
else:
crontab = CrontabSchedule.objects.create(**kwargs)
else:
raise SyntaxError("Schedule is not valid")
defaults = dict(
interval=interval,
crontab=crontab,
name=name,
task=detail['task'],
args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})),
enabled=detail.get('enabled', True),
)
task = PeriodicTask.objects.update_or_create(
defaults=defaults, name=name,
)
return task
def disable_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
def delete_celery_periodic_task(task_name):
from django_celery_beat.models import PeriodicTask
PeriodicTask.objects.filter(name=task_name).delete()
__REGISTER_PERIODIC_TASKS = []
__AFTER_APP_SHUTDOWN_CLEAN_TASKS = []
__AFTER_APP_READY_RUN_TASKS = []
def register_as_period_task(crontab=None, interval=None):
"""
Warning: Task must be have not any args and kwargs
:param crontab: "* * * * *"
:param interval: 60*60*60
:return:
"""
if crontab is None and interval is None:
raise SyntaxError("Must set crontab or interval one")
def decorate(func):
if crontab is None and interval is None:
raise SyntaxError("Interval and crontab must set one")
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __REGISTER_PERIODIC_TASKS:
create_or_update_celery_periodic_tasks({
name: {
'task': name,
'interval': interval,
'crontab': crontab,
'args': (),
'enabled': True,
}
})
__REGISTER_PERIODIC_TASKS.append(name)
@wraps(func)
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
return wrapper
return decorate
def after_app_ready_start(func):
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __AFTER_APP_READY_RUN_TASKS:
__AFTER_APP_READY_RUN_TASKS.append(name)
@wraps(func)
def decorate(*args, **kwargs):
return func(*args, **kwargs)
return decorate
def after_app_shutdown_clean(func):
# Because when this decorator run, the task was not created,
# So we can't use func.name
name = '{func.__module__}.{func.__name__}'.format(func=func)
if name not in __AFTER_APP_READY_RUN_TASKS:
__AFTER_APP_SHUTDOWN_CLEAN_TASKS.append(name)
@wraps(func)
def decorate(*args, **kwargs):
return func(*args, **kwargs)
return decorate
@worker_ready.connect
def on_app_ready(sender=None, headers=None, body=None, **kwargs):
logger.debug("App ready signal recv")
logger.debug("Start need start task: [{}]".format(
", ".join(__AFTER_APP_READY_RUN_TASKS))
)
for task in __AFTER_APP_READY_RUN_TASKS:
subtask(task).delay()
@worker_shutdown.connect
def after_app_shutdown(sender=None, headers=None, body=None, **kwargs):
from django_celery_beat.models import PeriodicTask
logger.debug("App shutdown signal recv")
logger.debug("Clean need cleaned period tasks: [{}]".format(
', '.join(__AFTER_APP_SHUTDOWN_CLEAN_TASKS))
)
PeriodicTask.objects.filter(name__in=__AFTER_APP_SHUTDOWN_CLEAN_TASKS).delete()

View File

@ -1,6 +1,6 @@
from django.core.mail import send_mail from django.core.mail import send_mail
from django.conf import settings from django.conf import settings
from common import celery_app as app from .celery import app
from .utils import get_logger from .utils import get_logger

View File

@ -1,13 +1,11 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import json
import re import re
from collections import OrderedDict from collections import OrderedDict
from six import string_types from six import string_types
import base64 import base64
import os import os
from itertools import chain from itertools import chain
import string
import logging import logging
import datetime import datetime
import time import time
@ -27,9 +25,6 @@ from django.conf import settings
from django.utils import timezone from django.utils import timezone
from .compat import to_bytes, to_string
SECRET_KEY = settings.SECRET_KEY
UUID_PATTERN = re.compile(r'[0-9a-zA-Z\-]{36}') UUID_PATTERN = re.compile(r'[0-9a-zA-Z\-]{36}')
@ -51,9 +46,22 @@ def get_object_or_none(model, **kwargs):
return obj return obj
class Signer(object): class Singleton(type):
def __init__(cls, *args, **kwargs):
cls.__instance = None
super().__init__(*args, **kwargs)
def __call__(cls, *args, **kwargs):
if cls.__instance is None:
cls.__instance = super().__call__(*args, **kwargs)
return cls.__instance
else:
return cls.__instance
class Signer(metaclass=Singleton):
"""用来加密,解密,和基于时间戳的方式验证token""" """用来加密,解密,和基于时间戳的方式验证token"""
def __init__(self, secret_key=SECRET_KEY): def __init__(self, secret_key=None):
self.secret_key = secret_key self.secret_key = secret_key
def sign(self, value): def sign(self, value):
@ -100,58 +108,10 @@ def combine_seq(s1, s2, callback=None):
return seq return seq
def search_object_attr(obj, value='', attr_list=None, ignore_case=False):
"""It's provide a method to search a object attribute equal some value
If object some attribute equal :param: value, return True else return False
class A():
name = 'admin'
age = 7
:param obj: A object
:param value: A string match object attribute
:param attr_list: Only match attribute in attr_list
:param ignore_case: Ignore case
:return: Boolean
"""
if value == '':
return True
try:
object_attr = obj.__dict__
except AttributeError:
return False
if attr_list is not None:
new_object_attr = {}
for attr in attr_list:
new_object_attr[attr] = object_attr.pop(attr)
object_attr = new_object_attr
if ignore_case:
if not isinstance(value, string_types):
return False
if value.lower() in map(string.lower, map(str, object_attr.values())):
return True
else:
if value in object_attr.values():
return True
return False
def get_logger(name=None): def get_logger(name=None):
return logging.getLogger('jumpserver.%s' % name) return logging.getLogger('jumpserver.%s' % name)
def int_seq(seq):
try:
return map(int, seq)
except ValueError:
return seq
def timesince(dt, since='', default="just now"): def timesince(dt, since='', default="just now"):
""" """
Returns string representing "time since" e.g. Returns string representing "time since" e.g.
@ -391,4 +351,6 @@ def is_uuid(s):
return False return False
signer = Signer() def get_signer():
signer = Signer(settings.SECRET_KEY)
return signer

View File

@ -337,8 +337,9 @@ CELERY_ACCEPT_CONTENT = ['json', 'pickle']
CELERY_RESULT_EXPIRES = 3600 CELERY_RESULT_EXPIRES = 3600
CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s' CELERY_WORKER_TASK_LOG_FORMAT = '%(asctime)s [%(module)s %(levelname)s] %(message)s'
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_TIMEZONE = TIME_ZONE CELERY_TIMEZONE = TIME_ZONE
# TERMINAL_HEATBEAT_INTERVAL = CONFIG.TERMINAL_HEATBEAT_INTERVAL or 30 # CELERY_ENABLE_UTC = True
# Cache use redis # Cache use redis

View File

@ -1,38 +0,0 @@
# -*- coding: utf-8 -*-
#
from functools import wraps
TASK_PREFIX = "TOOT"
CALLBACK_PREFIX = "COC"
def register_as_period_task(crontab=None, interval=None):
"""
:param crontab: "* * * * *"
:param interval: 60*60*60
:return:
"""
from .utils import create_or_update_celery_periodic_tasks
if crontab is None and interval is None:
raise SyntaxError("Must set crontab or interval one")
def decorate(func):
@wraps(func)
def wrapper(*args, **kwargs):
tasks = {
func.__name__: {
'task': func.__name__,
'args': args,
'kwargs': kwargs,
'interval': interval,
'crontab': crontab,
'enabled': True,
}
}
create_or_update_celery_periodic_tasks(tasks)
return func(*args, **kwargs)
return wrapper
return decorate

View File

@ -9,7 +9,9 @@ from django.utils import timezone
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask from django_celery_beat.models import CrontabSchedule, IntervalSchedule, PeriodicTask
from common.utils import signer, get_logger from common.utils import get_signer, get_logger
from common.celery import delete_celery_periodic_task, create_or_update_celery_periodic_tasks, \
disable_celery_periodic_task
from .ansible import AdHocRunner, AnsibleError from .ansible import AdHocRunner, AnsibleError
from .inventory import JMSInventory from .inventory import JMSInventory
@ -17,6 +19,7 @@ __all__ = ["Task", "AdHoc", "AdHocRunHistory"]
logger = get_logger(__file__) logger = get_logger(__file__)
signer = get_signer()
class Task(models.Model): class Task(models.Model):
@ -82,8 +85,6 @@ class Task(models.Model):
def save(self, force_insert=False, force_update=False, using=None, def save(self, force_insert=False, force_update=False, using=None,
update_fields=None): update_fields=None):
from .utils import create_or_update_celery_periodic_tasks, \
disable_celery_periodic_task
from .tasks import run_ansible_task from .tasks import run_ansible_task
super().save( super().save(
force_insert=force_insert, force_update=force_update, force_insert=force_insert, force_update=force_update,
@ -114,7 +115,6 @@ class Task(models.Model):
disable_celery_periodic_task(self.name) disable_celery_periodic_task(self.name)
def delete(self, using=None, keep_parents=False): def delete(self, using=None, keep_parents=False):
from .utils import delete_celery_periodic_task
super().delete(using=using, keep_parents=keep_parents) super().delete(using=using, keep_parents=keep_parents)
delete_celery_periodic_task(self.name) delete_celery_periodic_task(self.name)
@ -246,7 +246,7 @@ class AdHoc(models.Model):
} }
:return: :return:
""" """
self._become = signer.sign(json.dumps(item)) self._become = signer.sign(json.dumps(item)).decode('utf-8')
@property @property
def options(self): def options(self):
@ -271,6 +271,11 @@ class AdHoc(models.Model):
except AdHocRunHistory.DoesNotExist: except AdHocRunHistory.DoesNotExist:
return None return None
def save(self, force_insert=False, force_update=False, using=None,
update_fields=None):
super().save(force_insert=force_insert, force_update=force_update,
using=using, update_fields=update_fields)
def __str__(self): def __str__(self):
return "{} of {}".format(self.task.name, self.short_id) return "{} of {}".format(self.task.name, self.short_id)

View File

@ -21,9 +21,9 @@ def run_ansible_task(task_id, callback=None, **kwargs):
task = get_object_or_none(Task, id=task_id) task = get_object_or_none(Task, id=task_id)
if task: if task:
result = task.object.run() result = task.run()
if callback is not None: if callback is not None:
subtask(callback).delay(result) subtask(callback).delay(result, task_name=task.name)
return result return result
else: else:
logger.error("No task found") logger.error("No task found")

View File

@ -57,14 +57,20 @@
<td class="text-center">{{ object.adhoc.all | length}}</td> <td class="text-center">{{ object.adhoc.all | length}}</td>
<td class="text-center">{{ object.latest_adhoc.hosts | length}}</td> <td class="text-center">{{ object.latest_adhoc.hosts | length}}</td>
<td class="text-center"> <td class="text-center">
{% if object.latest_history.is_success %} {% if object.latest_history %}
<i class="fa fa-check text-navy"></i> {% if object.latest_history.is_success %}
{% else %} <i class="fa fa-check text-navy"></i>
<i class="fa fa-times text-danger"></i> {% else %}
<i class="fa fa-times text-danger"></i>
{% endif %}
{% endif %} {% endif %}
</td> </td>
<td class="text-center">{{ object.latest_history.date_start }}</td> <td class="text-center">{{ object.latest_history.date_start }}</td>
<td class="text-center">{{ object.latest_history.timedelta|floatformat }} s</td> <td class="text-center">
{% if object.latest_history %}
{{ object.latest_history.timedelta|floatformat }} s
{% endif %}
</td>
<td class="text-center"> <td class="text-center">
<a href="{% url 'ops:task-run' pk=object.id %}" class="btn btn-xs btn-info">{% trans "Run" %}</a> <a href="{% url 'ops:task-run' pk=object.id %}" class="btn btn-xs btn-info">{% trans "Run" %}</a>
<a data-uid="{{ object.id }}" class="btn btn-xs btn-danger btn-del">{% trans "Delete" %}</a> <a data-uid="{{ object.id }}" class="btn btn-xs btn-danger btn-del">{% trans "Delete" %}</a>

View File

@ -1,8 +1,4 @@
# ~*~ coding: utf-8 ~*~ # ~*~ coding: utf-8 ~*~
import json
from django_celery_beat.models import PeriodicTask, IntervalSchedule, CrontabSchedule
from common.utils import get_logger, get_object_or_none from common.utils import get_logger, get_object_or_none
from .models import Task, AdHoc from .models import Task, AdHoc
@ -13,22 +9,27 @@ def get_task_by_id(task_id):
return get_object_or_none(Task, id=task_id) return get_object_or_none(Task, id=task_id)
def create_or_update_ansible_task( def update_or_create_ansible_task(
task_name, hosts, tasks, pattern='all', options=None, task_name, hosts, tasks,
interval=None, crontab=None, is_periodic=False,
callback=None, pattern='all', options=None,
run_as_admin=False, run_as="", become_info=None, run_as_admin=False, run_as="", become_info=None,
created_by=None, interval=None, crontab=None, created_by=None,
is_periodic=False, callback=None,
): ):
task = get_object_or_none(Task, name=task_name) defaults = {
'name': task_name,
'interval': interval,
'crontab': crontab,
'is_periodic': is_periodic,
'callback': callback,
'created_by': created_by,
}
if task is None: created = False
task = Task( task, _ = Task.objects.update_or_create(
name=task_name, interval=interval, defaults=defaults, name=task_name,
crontab=crontab, is_periodic=is_periodic, )
callback=callback, created_by=created_by
)
task.save()
adhoc = task.latest_adhoc adhoc = task.latest_adhoc
new_adhoc = AdHoc(task=task, pattern=pattern, new_adhoc = AdHoc(task=task, pattern=pattern,
@ -38,70 +39,13 @@ def create_or_update_ansible_task(
new_adhoc.tasks = tasks new_adhoc.tasks = tasks
new_adhoc.options = options new_adhoc.options = options
new_adhoc.become = become_info new_adhoc.become = become_info
if not adhoc or adhoc != new_adhoc: if not adhoc or adhoc != new_adhoc:
logger.debug("Task create new adhoc: {}".format(task_name))
new_adhoc.save() new_adhoc.save()
task.latest_adhoc = new_adhoc task.latest_adhoc = new_adhoc
return task created = True
return task, created
def create_or_update_celery_periodic_tasks(tasks):
"""
:param tasks: {
'add-every-monday-morning': {
'task': 'tasks.add' # A registered celery task,
'interval': 30,
'crontab': "30 7 * * *",
'args': (16, 16),
'kwargs': {},
'enabled': False,
},
}
:return:
"""
# Todo: check task valid, task and callback must be a celery task
for name, detail in tasks.items():
interval = None
crontab = None
if isinstance(detail.get("interval"), int):
interval, _ = IntervalSchedule.objects.get_or_create(
every=detail['interval'],
period=IntervalSchedule.SECONDS,
)
elif isinstance(detail.get("crontab"), str):
try:
minute, hour, day, month, week = detail["crontab"].split()
except ValueError:
raise SyntaxError("crontab is not valid")
crontab, _ = CrontabSchedule.objects.get_or_create(
minute=minute, hour=hour, day_of_week=week,
day_of_month=day, month_of_year=month,
)
else:
raise SyntaxError("Schedule is not valid")
defaults = dict(
interval=interval,
crontab=crontab,
name=name,
task=detail['task'],
args=json.dumps(detail.get('args', [])),
kwargs=json.dumps(detail.get('kwargs', {})),
enabled=detail['enabled']
)
task = PeriodicTask.objects.update_or_create(
defaults=defaults, name=name,
)
logger.info("Create periodic task: {}".format(task))
return task
def disable_celery_periodic_task(task_name):
PeriodicTask.objects.filter(name=task_name).update(enabled=False)
def delete_celery_periodic_task(task_name):
PeriodicTask.objects.filter(name=task_name).delete()

View File

@ -262,9 +262,6 @@ jumpserver.initDataTable = function (options) {
var table = ele.DataTable({ var table = ele.DataTable({
pageLength: options.pageLength || 15, pageLength: options.pageLength || 15,
dom: options.dom || '<"#uc.pull-left">flt<"row m-t"<"col-md-8"<"#op.col-md-6"><"col-md-6 text-center"i>><"col-md-4"p>>', dom: options.dom || '<"#uc.pull-left">flt<"row m-t"<"col-md-8"<"#op.col-md-6"><"col-md-6 text-center"i>><"col-md-4"p>>',
language: {
url: options.i18n_url || "/static/js/plugins/dataTables/i18n/zh-hans.json"
},
order: options.order || [], order: options.order || [],
select: options.select || 'multi', select: options.select || 'multi',
buttons: [], buttons: [],

View File

@ -2,7 +2,7 @@
<div class="sidebar-collapse"> <div class="sidebar-collapse">
<ul class="nav" id="side-menu"> <ul class="nav" id="side-menu">
{% include '_user_profile.html' %} {% include '_user_profile.html' %}
{% if request.user.is_superuser and request.COOKIES.admin == "Yes" %} {% if request.user.is_superuser and request.COOKIES.IN_ADMIN_PAGE != "No" %}
{% include '_nav.html' %} {% include '_nav.html' %}
{% else %} {% else %}
{% include '_nav_user.html' %} {% include '_nav_user.html' %}

View File

@ -20,7 +20,7 @@
<li><a href="{% url 'users:user-profile-update' %}">{% trans 'Profile settings' %}</a></li> <li><a href="{% url 'users:user-profile-update' %}">{% trans 'Profile settings' %}</a></li>
<li class="divider"></li> <li class="divider"></li>
{% if request.user.is_superuser %} {% if request.user.is_superuser %}
{% if request.COOKIES.admin == 'No' %} {% if request.COOKIES.IN_ADMIN_PAGE == 'No' %}
<li><a id="switch_admin">{% trans 'Admin page' %}</a></li> <li><a id="switch_admin">{% trans 'Admin page' %}</a></li>
{% else %} {% else %}
<li><a id="switch_user">{% trans 'User page' %}</a></li> <li><a id="switch_user">{% trans 'User page' %}</a></li>
@ -37,11 +37,11 @@
$(document).ready(function () { $(document).ready(function () {
}) })
.on('click', '#switch_admin', function () { .on('click', '#switch_admin', function () {
setCookie("admin", "Yes"); setCookie("IN_ADMIN_PAGE", "Yes");
window.location = "/" window.location = "/"
}) })
.on('click', '#switch_user', function () { .on('click', '#switch_user', function () {
setCookie("admin", "No"); setCookie("IN_ADMIN_PAGE", "No");
window.location = "/" window.location = "/"
}) })
</script> </script>

View File

@ -1,16 +1,9 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
#
from __future__ import unicode_literals
import uuid import uuid
from django.db import models, IntegrityError from django.db import models, IntegrityError
from django.contrib.auth.models import Group
from django.utils.translation import ugettext_lazy as _ from django.utils.translation import ugettext_lazy as _
from common.utils import signer, date_expired_default
from common.mixins import NoDeleteModelMixin from common.mixins import NoDeleteModelMixin
__all__ = ['UserGroup'] __all__ = ['UserGroup']

View File

@ -1,7 +1,6 @@
#!/usr/bin/env python #!/usr/bin/env python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
# #
import os
import uuid import uuid
from collections import OrderedDict from collections import OrderedDict
@ -15,10 +14,11 @@ from django.utils import timezone
from django.shortcuts import reverse from django.shortcuts import reverse
from .group import UserGroup from .group import UserGroup
from common.utils import signer, date_expired_default from common.utils import get_signer, date_expired_default
__all__ = ['User'] __all__ = ['User']
signer = get_signer()
class User(AbstractUser): class User(AbstractUser):

View File

@ -5,10 +5,12 @@ from django.utils.translation import ugettext_lazy as _
from rest_framework import serializers from rest_framework import serializers
from rest_framework_bulk import BulkListSerializer from rest_framework_bulk import BulkListSerializer
from common.utils import signer, validate_ssh_public_key from common.utils import get_signer, validate_ssh_public_key
from common.mixins import BulkSerializerMixin from common.mixins import BulkSerializerMixin
from .models import User, UserGroup from .models import User, UserGroup
signer = get_signer()
class UserSerializer(BulkSerializerMixin, serializers.ModelSerializer): class UserSerializer(BulkSerializerMixin, serializers.ModelSerializer):
groups_display = serializers.SerializerMethodField() groups_display = serializers.SerializerMethodField()

View File

@ -45,7 +45,7 @@ def start_beat():
os.chdir(APPS_DIR) os.chdir(APPS_DIR)
os.environ.setdefault('PYTHONOPTIMIZE', '1') os.environ.setdefault('PYTHONOPTIMIZE', '1')
scheduler = "django_celery_beat.schedulers:DatabaseScheduler" scheduler = "django_celery_beat.schedulers:DatabaseScheduler"
cmd = 'celery -A common beat -l {} --scheduler {} --max-interval 30 '.format(LOG_LEVEL, scheduler) cmd = 'celery -A common beat -l {} --scheduler {} --max-interval 5 '.format(LOG_LEVEL, scheduler)
subprocess.call(cmd, shell=True) subprocess.call(cmd, shell=True)