1. settings 添加ops 的logger, 关于Ansible的log单独记录
2. models 增加Tasker模型, 添加AnsibleHostReuslt 对于数据处理的方法
3. ansible_api, callback 类,增加保存Tasker的逻辑,i其他兼容
4. taskers, 实现获取硬件信息和ping的 tasker接口。
pull/530/head
Administrator 2016-10-31 17:41:26 +08:00
parent ccf3851d81
commit 97b8bcd5ca
5 changed files with 250 additions and 40 deletions

View File

@ -182,6 +182,12 @@ LOGGING = {
'formatter': 'main', 'formatter': 'main',
'filename': os.path.join(PROJECT_DIR, 'logs', 'jumpserver.log') 'filename': os.path.join(PROJECT_DIR, 'logs', 'jumpserver.log')
}, },
'ansible_logs': {
'level': 'DEBUG',
'class': 'logging.FileHandler',
'formatter': 'main',
'filename': os.path.join(PROJECT_DIR, 'logs', 'ansible.log')
},
}, },
'loggers': { 'loggers': {
'django': { 'django': {
@ -211,8 +217,8 @@ LOGGING = {
'handlers': ['console', 'file'], 'handlers': ['console', 'file'],
'level': LOG_LEVEL, 'level': LOG_LEVEL,
}, },
'jumpserver.ops.ansible_api': { 'ops.ansible_api': {
'handlers': ['console', 'file'], 'handlers': ['console', 'ansible_logs'],
'level': LOG_LEVEL, 'level': LOG_LEVEL,
} }
} }

View File

@ -4,9 +4,11 @@ from __future__ import unicode_literals
import os import os
import json import json
import logging import logging
import traceback
import ansible.constants as default_config import ansible.constants as default_config
from uuid import uuid4
from django.utils import timezone
from ansible.executor.task_queue_manager import TaskQueueManager from ansible.executor.task_queue_manager import TaskQueueManager
from ansible.inventory import Inventory, Host, Group from ansible.inventory import Inventory, Host, Group
from ansible.vars import VariableManager from ansible.vars import VariableManager
@ -16,7 +18,8 @@ from ansible.utils.display import Display
from ansible.playbook.play import Play from ansible.playbook.play import Play
from ansible.plugins.callback import CallbackBase from ansible.plugins.callback import CallbackBase
from models import AnsiblePlay, AnsibleTask, AnsibleHostResult from models import Tasker, AnsiblePlay, AnsibleTask, AnsibleHostResult
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -177,24 +180,28 @@ class CallbackModule(CallbackBase):
CALLBACK_TYPE = 'stdout' CALLBACK_TYPE = 'stdout'
CALLBACK_NAME = 'json' CALLBACK_NAME = 'json'
def __init__(self, display=None): def __init__(self, tasker_id, display=None):
super(CallbackModule, self).__init__(display) super(CallbackModule, self).__init__(display)
self.results = [] self.results = []
self.output = {} self.output = {}
self.tasker_id = tasker_id
def _new_play(self, play): def _new_play(self, play):
"""将Play保持到数据里面 """将Play保持到数据里面
""" """
ret = { ret = {
'tasker': self.tasker_id,
'name': play.name, 'name': play.name,
'uuid': str(play._uuid), 'uuid': str(play._uuid),
'tasks': [] 'tasks': []
} }
try: try:
play = AnsiblePlay(name=ret['name'], uuid=ret['uuid'], completed=False) tasker = Tasker.objects.get(uuid=self.tasker_id)
play = AnsiblePlay(tasker, name=ret['name'], uuid=ret['uuid'])
play.save() play.save()
except Exception as e: except Exception as e:
traceback.print_exc()
logger.error("Save ansible play uuid to database error!, %s" % e.message) logger.error("Save ansible play uuid to database error!, %s" % e.message)
return ret return ret
@ -418,24 +425,37 @@ class ADHocRunner(InventoryMixin):
self.groups = [] self.groups = []
self.inventory = self.gen_inventory() self.inventory = self.gen_inventory()
# 初始化callback插件
self.results_callback = CallbackModule()
self.play = Play().load(play_data, variable_manager=self.variable_manager, loader=self.loader) self.play = Play().load(play_data, variable_manager=self.variable_manager, loader=self.loader)
@staticmethod @staticmethod
def update_db_play(result, ext_code): def update_db_tasker(tasker_id, ext_code):
try: try:
play = AnsiblePlay.objects.get(uuid=result[0]['uuid']) tasker = Tasker.objects.get(uuid=tasker_id)
play.completed = True tasker.end = timezone.now()
play.status_code = ext_code tasker.completed = True
play.save() tasker.exit_code = ext_code
tasker.save()
except Exception as e: except Exception as e:
logger.error("Update Ansible Play Status into database error!, %s" % e.message) logger.error("Update Tasker Status into database error!, %s" % e.message)
def run(self): def create_db_tasker(self, name, uuid):
try:
hosts = [host.get('name') for host in self.hosts]
tasker = Tasker(name=name, uuid=uuid, hosts=','.join(hosts), start=timezone.now())
tasker.save()
except Exception as e:
logger.error("Save Tasker to database error!, %s" % e.message)
def run(self, tasker_name, tasker_uuid):
"""执行ADHoc, 执行完后, 修改AnsiblePlay的状态为完成状态. """执行ADHoc, 执行完后, 修改AnsiblePlay的状态为完成状态.
:param tasker_uuid <str> 用于标示此次task
""" """
# 初始化callback插件,以及Tasker
self.create_db_tasker(tasker_name, tasker_uuid)
self.results_callback = CallbackModule(tasker_uuid)
tqm = None tqm = None
# TODO:日志和结果分析 # TODO:日志和结果分析
try: try:
@ -450,7 +470,8 @@ class ADHocRunner(InventoryMixin):
ext_code = tqm.run(self.play) ext_code = tqm.run(self.play)
result = self.results_callback.results result = self.results_callback.results
self.update_db_play(result, ext_code) # 任务运行结束, 标示任务完成
self.update_db_tasker(tasker_uuid, ext_code)
ret = json.dumps(result) ret = json.dumps(result)
return ext_code, ret return ext_code, ret
@ -468,7 +489,7 @@ def test_run():
"ip": "192.168.1.119", "ip": "192.168.1.119",
"port": "22", "port": "22",
"username": "root", "username": "root",
"password": "xxx", "password": "tongfang_test",
"key": "asset_private_key", "key": "asset_private_key",
}, },
{ {
@ -487,12 +508,12 @@ def test_run():
"hosts": "default", "hosts": "default",
"gather_facts": "no", "gather_facts": "no",
"tasks": [ "tasks": [
dict(action=dict(module='setup')), dict(action=dict(module='ping')),
dict(action=dict(module='command', args='lsss'))
] ]
} }
hoc = ADHocRunner(conf, play_source, *assets) hoc = ADHocRunner(conf, play_source, *assets)
ext_code, result = hoc.run() uuid = "tasker-" + uuid4().hex
ext_code, result = hoc.run("test_task", uuid)
print ext_code print ext_code
print result print result

View File

@ -11,24 +11,46 @@ from django.utils.translation import ugettext_lazy as _
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class Tasker(models.Model):
uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True)
name = models.CharField(max_length=128, blank=True, verbose_name=_('Name'))
start = models.DateTimeField(auto_now_add=True, verbose_name=_('Start Time'))
end = models.DateTimeField(blank=True, null=True, verbose_name=_('End Time'))
exit_code = models.IntegerField(default=0, verbose_name=_('Exit Code'))
completed = models.BooleanField(default=False, verbose_name=_('Is Completed'))
hosts = models.TextField(blank=True, null=True, verbose_name=_('Hosts'))
def __unicode__(self):
return "%s" % self.uuid
@property
def total_hosts(self):
return self.hosts.split(',')
class AnsiblePlay(models.Model): class AnsiblePlay(models.Model):
tasker = models.ForeignKey(Tasker, related_name='plays', blank=True, null=True)
uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True) uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True)
name = models.CharField(max_length=128, verbose_name=_('Name')) name = models.CharField(max_length=128, verbose_name=_('Name'))
completed = models.BooleanField(default=False, verbose_name=_('IsCompleted'))
status_code = models.IntegerField(default=0, verbose_name=_('StatusCode'))
def __unicode__(self): def __unicode__(self):
return "%s<%s>" % (self.name, self.uuid) return "%s<%s>" % (self.name, self.uuid)
def to_dict(self):
return {"uuid": self.uuid, "name": self.name}
class AnsibleTask(models.Model): class AnsibleTask(models.Model):
play = models.ForeignKey(AnsiblePlay, related_name='tasks', blank=True, null=True)
uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True) uuid = models.CharField(max_length=128, verbose_name=_('UUID'), primary_key=True)
play = models.ForeignKey(AnsiblePlay, related_name='tasks', blank=True)
name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) name = models.CharField(max_length=128, blank=True, verbose_name=_('Name'))
def __unicode__(self): def __unicode__(self):
return "%s<%s>" % (self.name, self.uuid) return "%s<%s>" % (self.name, self.uuid)
def to_dict(self):
return {"uuid": self.uuid, "name": self.name}
def failed(self): def failed(self):
pass pass
@ -37,9 +59,8 @@ class AnsibleTask(models.Model):
class AnsibleHostResult(models.Model): class AnsibleHostResult(models.Model):
task = models.ForeignKey(AnsibleTask, related_name='host_results', blank=True) task = models.ForeignKey(AnsibleTask, related_name='host_results', blank=True, null=True)
name = models.CharField(max_length=128, blank=True, verbose_name=_('Name')) name = models.CharField(max_length=128, blank=True, verbose_name=_('Name'))
status = models.BooleanField(blank=True, default=False, verbose_name=_('Status'))
success = models.TextField(blank=True, verbose_name=_('Success')) success = models.TextField(blank=True, verbose_name=_('Success'))
skipped = models.TextField(blank=True, verbose_name=_('Skipped')) skipped = models.TextField(blank=True, verbose_name=_('Skipped'))
failed = models.TextField(blank=True, verbose_name=_('Failed')) failed = models.TextField(blank=True, verbose_name=_('Failed'))
@ -47,7 +68,7 @@ class AnsibleHostResult(models.Model):
no_host = models.TextField(blank=True, verbose_name=_('NoHost')) no_host = models.TextField(blank=True, verbose_name=_('NoHost'))
def __unicode__(self): def __unicode__(self):
return "%s<%s>" % (self.name, str(self.status)) return "%s %s<%s>" % (self.name, str(self.is_success), self.task.uuid)
@property @property
def is_failed(self): def is_failed(self):
@ -56,14 +77,18 @@ class AnsibleHostResult(models.Model):
return False return False
@property @property
def success_data(self): def is_success(self):
return not self.is_failed
@property
def _success_data(self):
if self.success: if self.success:
return json.loads(self.success) return json.loads(self.success)
elif self.skipped: elif self.skipped:
return json.loads(self.skipped) return json.loads(self.skipped)
@property @property
def failed_data(self): def _failed_data(self):
if self.failed: if self.failed:
return json.loads(self.failed) return json.loads(self.failed)
elif self.unreachable: elif self.unreachable:
@ -71,3 +96,117 @@ class AnsibleHostResult(models.Model):
elif self.no_host: elif self.no_host:
return {"msg": self.no_host} return {"msg": self.no_host}
@property
def failed_msg(self):
return self._failed_data.get("msg")
@staticmethod
def __filter_disk(ansible_devices, exclude_devices):
"""
过滤磁盘设备丢弃掉不需要的设备
:param ansible_devices: 对应的facts字段
:param exclude_devices: <list> 一个需要被丢弃的设备,匹配规则是startwith, 比如需要丢弃sr0子类的 ['sr']
:return: <dict> 过滤获取的结果
"""
for start_str in exclude_devices:
for key in ansible_devices.keys():
if key.startswith(start_str):
ansible_devices.pop(key)
return ansible_devices
@staticmethod
def __filter_interface(ansible_interfaces, exclude_interface):
"""
过滤网卡设备丢弃掉不需要的网卡 比如lo
:param ansible_interface: 对应的facts字段
:param exclude_interface: <list> 一个需要被丢弃的设备,匹配规则是startwith, 比如需要丢弃lo子类的 ['lo']
:return: <dict> 过滤获取的结果
"""
for interface in ansible_interfaces:
for start_str in exclude_interface:
if interface.startswith(start_str):
i = ansible_interfaces.index(interface)
ansible_interfaces.pop(i)
return ansible_interfaces
@staticmethod
def __gather_interface(facts, interfaces):
"""
收集所有interface的具体信息
:param facts: ansible faces
:param interfaces: 需要收集的intreface列表
:return: <dict> interface的详情
"""
result = {}
for key in interfaces:
if "ansible_" + key in facts.keys():
result[key] = facts.get(key)
return result
def __deal_setup(self):
"""
处理ansible setup模块收集到的数据提取资产需要的部分
:return: <dict> {"msg": <str>, "data": <dict>}, 注意msg是异常信息, 有msg时 data为None
"""
result = self._success_data
module_name = result['invocation'].get('module_name') if result.get('invocation') else None
if module_name is not None:
if module_name != "setup":
return {"msg": "the property only for ansible setup module result!, can't support other module", "data":None}
else:
data = {}
facts =result.get('ansible_facts')
interfaces = self.__filter_interface(facts.get('ansible_interfaces'), ['lo'])
cpu_describe = "%s %s" % (facts.get('ansible_processor')[0], facts.get('ansible_processor')[1]) if len(facts.get('ansible_processor')) >= 2 else ""
data['sn'] = facts.get('ansible_product_serial')
data['env'] = facts.get('ansible_env')
data['os'] = "%s %s(%s)" % (facts.get('ansible_distribution'),
facts.get('ansible_distribution_version'),
facts.get('ansible_distribution_release'))
data['mem'] = facts.get('ansible_memtotal_mb')
data['cpu'] = "%s %d" % (cpu_describe, facts.get('ansible_processor_count'))
data['disk'] = self.__filter_disk(facts.get('ansible_devices'), ['sr'])
data['interface'] = self.__gather_interface(facts, interfaces)
return {"msg": None, "data": data}
else:
return {"msg": "there isn't module_name field! can't process this data format", "data": None}
@property
def deal_setup(self):
try:
return self.__deal_setup()
except Exception as e:
return {"msg": "deal with setup data failed, %s" % e.message, "data": None}
def __deal_ping(self):
"""
处理ansible ping模块收集到的数据
:return: <dict> {"msg": <str>, "data": {"success": <bool>}}, 注意msg是异常信息, 有msg时 data为None
"""
result = self._success_data
module_name = result['invocation'].get('module_name') if result.get('invocation') else None
if module_name is not None:
if module_name != "ping":
return {"msg": "the property only for ansible setup module result!, can't support other module", "data":None}
else:
ping = True if result.get('ping') == "pong" else False
return {"msg": None, "data": {"success": ping}}
else:
return {"msg": "there isn't module_name field! can't process this data format", "data": None}
@property
def deal_ping(self):
try:
return self.__deal_ping()
except Exception as e:
return {"msg": "deal with ping data failed, %s" % e.message, "data": None}

View File

@ -1,14 +1,12 @@
from __future__ import unicode_literals from __future__ import unicode_literals
from .tasks import * from .tasks import *
from .models import Tasker, AnsiblePlay, AnsibleTask, AnsibleHostResult
from uuid import uuid1
from celery.result import AsyncResult from celery.result import AsyncResult
def start_get_hardware_info(*assets):
result = get_asset_hardware_info.delay(*assets)
return result.id
def get_result(task_id): def get_result(task_id):
result = AsyncResult(task_id) result = AsyncResult(task_id)
if result.ready(): if result.ready():
@ -17,8 +15,54 @@ def get_result(task_id):
return {"Completed": False, "data": None} return {"Completed": False, "data": None}
def start_get_hardware_info(*assets):
name = "Get host hardware information"
uuid = "tasker-" + uuid1().hex
get_asset_hardware_info.delay(name, uuid, *assets)
return uuid
def __get_hardware_info(tasker_uuid):
tasker = Tasker.objects.get(uuid=tasker_uuid)
host_results = []
for play in tasker.plays.all():
for t in play.tasks.all():
for h in t.host_results.all():
host_results.append(h)
return host_results
def get_hardware_info(tasker_uuid):
try:
return {"msg": None, "data": __get_hardware_info(tasker_uuid)}
except Exception as e:
return {"msg": "query data failed!, %s" % e.message, "data": None}
def start_ping_test(*assets): def start_ping_test(*assets):
result = asset_test_ping_check.delay(*assets) name = "Test host connection"
return result.id uuid = "tasker-" + uuid1().hex
asset_test_ping_check.delay(name, uuid, *assets)
return uuid
def __get_ping_test(tasker_uuid):
tasker = Tasker.objects.get(uuid=tasker_uuid)
host_results = []
for play in tasker.plays.all():
for t in play.tasks.all():
for h in t.host_results.all():
host_results.append(h)
return host_results
def get_ping_test(tasker_uuid):
try:
return {"msg": None, "data": __get_ping_test(tasker_uuid)}
except Exception as e:
return {"msg": "query data failed!, %s" % e.message, "data": None}

View File

@ -7,7 +7,7 @@ from ops.ansible_api import Config, ADHocRunner
@shared_task(name="get_asset_hardware_info") @shared_task(name="get_asset_hardware_info")
def get_asset_hardware_info(*assets): def get_asset_hardware_info(task_name, task_uuid, *assets):
conf = Config() conf = Config()
play_source = { play_source = {
"name": "Get host hardware information", "name": "Get host hardware information",
@ -18,12 +18,12 @@ def get_asset_hardware_info(*assets):
] ]
} }
hoc = ADHocRunner(conf, play_source, *assets) hoc = ADHocRunner(conf, play_source, *assets)
ext_code, result = hoc.run() ext_code, result = hoc.run(task_name, task_uuid)
return ext_code, result return ext_code, result
@shared_task(name="asset_test_ping_check") @shared_task(name="asset_test_ping_check")
def asset_test_ping_check(*assets): def asset_test_ping_check(task_name, task_uuid, *assets):
conf = Config() conf = Config()
play_source = { play_source = {
"name": "Test host connection use ping", "name": "Test host connection use ping",
@ -34,7 +34,7 @@ def asset_test_ping_check(*assets):
] ]
} }
hoc = ADHocRunner(conf, play_source, *assets) hoc = ADHocRunner(conf, play_source, *assets)
ext_code, result = hoc.run() ext_code, result = hoc.run(task_name, task_uuid)
return ext_code, result return ext_code, result