diff --git a/apps/accounts/signal_handlers.py b/apps/accounts/signal_handlers.py index 311c8d83d..d2ac73cc4 100644 --- a/apps/accounts/signal_handlers.py +++ b/apps/accounts/signal_handlers.py @@ -2,9 +2,8 @@ from django.db.models.signals import pre_save, post_save from django.dispatch import receiver from assets.models import Asset -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.utils import get_logger -from .automations.push_account.manager import PushAccountManager from .models import Account logger = get_logger(__name__) diff --git a/apps/assets/migrations/0093_auto_20220403_1627.py b/apps/assets/migrations/0093_auto_20220403_1627.py index 45b8e92ed..e90bab983 100644 --- a/apps/assets/migrations/0093_auto_20220403_1627.py +++ b/apps/assets/migrations/0093_auto_20220403_1627.py @@ -90,6 +90,11 @@ class Migration(migrations.Migration): old_name='ip', new_name='address', ), + migrations.AlterField( + model_name='asset', + name='address', + field=models.CharField(db_index=True, max_length=1024, verbose_name='Address'), + ), migrations.AddField( model_name='asset', name='date_updated', diff --git a/apps/assets/models/asset/common.py b/apps/assets/models/asset/common.py index dfbf2ecf0..a745829b7 100644 --- a/apps/assets/models/asset/common.py +++ b/apps/assets/models/asset/common.py @@ -105,7 +105,7 @@ class Asset(NodesRelationMixin, AbsConnectivity, JMSOrgBaseModel): Type = const.AllTypes name = models.CharField(max_length=128, verbose_name=_('Name')) - address = models.CharField(max_length=128, verbose_name=_('IP'), db_index=True) + address = models.CharField(max_length=1024, verbose_name=_('Address'), db_index=True) platform = models.ForeignKey(Platform, on_delete=models.PROTECT, verbose_name=_("Platform"), related_name='assets') domain = models.ForeignKey("assets.Domain", null=True, blank=True, related_name='assets', verbose_name=_("Domain"), on_delete=models.SET_NULL) diff --git a/apps/assets/signal_handlers/asset.py b/apps/assets/signal_handlers/asset.py index 55cd0ede2..f7ea06be4 100644 --- a/apps/assets/signal_handlers/asset.py +++ b/apps/assets/signal_handlers/asset.py @@ -1,11 +1,14 @@ # -*- coding: utf-8 -*- # -from django.db.models.signals import post_save, m2m_changed, pre_delete, post_delete, pre_save +from django.db.models.signals import ( + post_save, m2m_changed, pre_delete, post_delete, pre_save +) from django.dispatch import receiver from assets.models import Asset, Node, Cloud, Device, Host, Web, Database +from assets.tasks.ping import test_assets_connectivity_util from common.const.signals import POST_ADD, POST_REMOVE, PRE_REMOVE -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.utils import get_logger logger = get_logger(__file__) @@ -28,8 +31,7 @@ def on_asset_create(sender, instance=None, created=False, **kwargs): logger.info("Asset create signal recv: {}".format(instance)) # 获取资产硬件信息 - # update_assets_fact_util.delay([instance]) - # test_asset_connectivity_util.delay([instance]) + test_assets_connectivity_util([instance]) # 确保资产存在一个节点 has_node = instance.nodes.all().exists() @@ -60,34 +62,6 @@ def on_asset_nodes_add(instance, action, reverse, pk_set, **kwargs): for node in nodes: nodes_ancestors_keys.update(Node.get_node_ancestor_keys(node, with_self=True)) - # 查询所有祖先节点关联的系统用户,都是要跟资产建立关系的 - # system_user_ids = SystemUser.objects.filter( - # nodes__key__in=nodes_ancestors_keys - # ).distinct().values_list('id', flat=True) - - # 查询所有已存在的关系 - # m2m_model = SystemUser.assets.through - # exist = set(m2m_model.objects.filter( - # systemuser_id__in=system_user_ids, asset_id__in=asset_ids - # ).values_list('systemuser_id', 'asset_id')) - # TODO 优化 - # to_create = [] - # for system_user_id in system_user_ids: - # asset_ids_to_push = [] - # for asset_id in asset_ids: - # if (system_user_id, asset_id) in exist: - # continue - # asset_ids_to_push.append(asset_id) - # to_create.append(m2m_model( - # systemuser_id=system_user_id, - # asset_id=asset_id, - # org_id=instance.org_id - # )) - # if asset_ids_to_push: - # push_system_user_to_assets.delay(system_user_id, asset_ids_to_push) - # m2m_model.objects.bulk_create(to_create) - # - RELATED_NODE_IDS = '_related_node_ids' diff --git a/apps/assets/tasks/ping.py b/apps/assets/tasks/ping.py index 7117c8bc7..252390870 100644 --- a/apps/assets/tasks/ping.py +++ b/apps/assets/tasks/ping.py @@ -1,23 +1,22 @@ # ~*~ coding: utf-8 ~*~ from celery import shared_task -from django.utils.translation import gettext_noop, gettext_lazy as _ +from django.utils.translation import gettext_noop -from common.utils import get_logger from assets.const import AutomationTypes, GATEWAY_NAME +from common.utils import get_logger from orgs.utils import org_aware_func - from .common import automation_execute_start logger = get_logger(__file__) __all__ = [ - 'test_asset_connectivity_util', + 'test_assets_connectivity_task', 'test_assets_connectivity_manual', 'test_node_assets_connectivity_manual', ] -def test_connectivity_util(assets, tp, task_name, local_port=None): +def _test_connectivity_util(assets, tp, task_name, local_port=None): if not assets: return @@ -30,8 +29,9 @@ def test_connectivity_util(assets, tp, task_name, local_port=None): automation_execute_start(task_name, tp, child_snapshot) +@shared_task @org_aware_func('assets') -def test_asset_connectivity_util(assets, task_name=None, local_port=None): +def test_assets_connectivity_task(assets, task_name=None, local_port=None): from assets.models import PingAutomation if task_name is None: task_name = gettext_noop("Test assets connectivity ") @@ -39,26 +39,24 @@ def test_asset_connectivity_util(assets, task_name=None, local_port=None): task_name = PingAutomation.generate_unique_name(task_name) gateway_assets = assets.filter(platform__name=GATEWAY_NAME) - test_connectivity_util( + _test_connectivity_util( gateway_assets, AutomationTypes.ping_gateway, task_name, local_port ) non_gateway_assets = assets.exclude(platform__name=GATEWAY_NAME) - test_connectivity_util(non_gateway_assets, AutomationTypes.ping, task_name) + _test_connectivity_util(non_gateway_assets, AutomationTypes.ping, task_name) -@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of a asset')) def test_assets_connectivity_manual(asset_ids, local_port=None): from assets.models import Asset assets = Asset.objects.filter(id__in=asset_ids) task_name = gettext_noop("Test assets connectivity ") - test_asset_connectivity_util(assets, task_name, local_port) + test_assets_connectivity_task.delay(assets, task_name, local_port) -@shared_task(queue="ansible", verbose_name=_('Manually test the connectivity of assets under a node')) def test_node_assets_connectivity_manual(node_id, local_port=None): from assets.models import Node node = Node.objects.get(id=node_id) task_name = gettext_noop("Test if the assets under the node are connectable ") assets = node.get_all_assets() - test_asset_connectivity_util(assets, task_name, local_port) + test_assets_connectivity_task.delay(*assets, task_name, local_port) diff --git a/apps/common/decorator.py b/apps/common/decorator.py deleted file mode 100644 index bb7b3e866..000000000 --- a/apps/common/decorator.py +++ /dev/null @@ -1,68 +0,0 @@ -# -*- coding: utf-8 -*- -# -import functools -import threading -import time -import uuid - -from django.core.cache import cache -from django.db import transaction - - -def on_transaction_commit(func): - """ - 如果不调用on_commit, 对象创建时添加多对多字段值失败 - """ - - def inner(*args, **kwargs): - transaction.on_commit(lambda: func(*args, **kwargs)) - - return inner - - -class Singleton(object): - """ 单例类 """ - - def __init__(self, cls): - self._cls = cls - self._instance = {} - - def __call__(self): - if self._cls not in self._instance: - self._instance[self._cls] = self._cls() - return self._instance[self._cls] - - -def _run_func_if_is_last(ttl, func, *args, **kwargs): - ix = uuid.uuid4().__str__() - key = f'DELAY_RUN_{func.__name__}' - cache.set(key, ix, ttl) - st = (ttl - 2 > 1) and ttl - 2 or 1 - time.sleep(st) - got = cache.get(key, None) - - if ix == got: - func(*args, **kwargs) - cache.delete(key) - - -def delay_run(ttl=5): - def inner(func): - @functools.wraps(func) - def wrapper(*args, **kwargs): - t = threading.Thread(target=_run_func_if_is_last, args=(ttl, func, *args), kwargs=kwargs) - t.start() - - return wrapper - - return inner - - -@delay_run(ttl=10) -def run_it_many(username, year=2000): - print("Hello, %s, now is %s" % (username, year)) - - -if __name__ == '__main__': - for i in range(20): - run_it_many('test', 2000) diff --git a/apps/common/decorators.py b/apps/common/decorators.py new file mode 100644 index 000000000..91a6ca623 --- /dev/null +++ b/apps/common/decorators.py @@ -0,0 +1,134 @@ +# -*- coding: utf-8 -*- +# +import functools +import inspect +import time +import uuid +from concurrent.futures import ThreadPoolExecutor + +from django.core.cache import cache +from django.db import transaction + + +def on_transaction_commit(func): + """ + 如果不调用on_commit, 对象创建时添加多对多字段值失败 + """ + + def inner(*args, **kwargs): + transaction.on_commit(lambda: func(*args, **kwargs)) + + return inner + + +class Singleton(object): + """ 单例类 """ + + def __init__(self, cls): + self._cls = cls + self._instance = {} + + def __call__(self): + if self._cls not in self._instance: + self._instance[self._cls] = self._cls() + return self._instance[self._cls] + + +def _run_func_if_is_last(ttl, func, *args, **kwargs): + ix = uuid.uuid4().__str__() + key = f'DELAY_RUN_{func.__name__}' + cache.set(key, ix, ttl) + st = (ttl - 2 > 1) and ttl - 2 or 2 + time.sleep(st) + got = cache.get(key, None) + + if ix == got: + func(*args, **kwargs) + + +executor = ThreadPoolExecutor(10) + + +def delay_run(ttl=5): + def inner(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + executor.submit(_run_func_if_is_last, ttl, func, *args, **kwargs) + + return wrapper + + return inner + + +def _merge_run(ttl, func, *args, **kwargs): + if not args or not isinstance(args[0], (list, tuple)): + raise ValueError('args[0] must be list or tuple') + + key = f'DELAY_MERGE_RUN_{func.__name__}' + ix = uuid.uuid4().__str__() + value = cache.get(key, []) + value.extend(args[0]) + + st = (ttl - 2 > 1) and ttl - 2 or 2 + time.sleep(st) + got = cache.get(key, None) + + if ix == got: + func(*args, **kwargs) + + +def merge_delay_run(ttl): + """ + 合并 func 参数,延迟执行, 在 ttl 秒内, 只执行最后一次 + func 参数必须是 *args + :param ttl: + :return: + """ + + def inner(func): + sigs = inspect.signature(func) + if len(sigs.parameters) != 1: + raise ValueError('func must have one arguments: %s' % func.__name__) + param = list(sigs.parameters.values())[0] + if not str(param).startswith('*'): + raise ValueError('func args must be startswith *: %s' % func.__name__) + + @functools.wraps(func) + def wrapper(*args): + key = f'DELAY_MERGE_RUN_{func.__name__}' + values = cache.get(key, []) + new_arg = [*values, *args] + cache.set(key, new_arg, ttl) + return delay_run(ttl)(func)(*new_arg) + + return wrapper + + return inner + + +def delay_run(ttl=5): + """ + 延迟执行函数, 在 ttl 秒内, 只执行最后一次 + :param ttl: + :return: + """ + + def inner(func): + @functools.wraps(func) + def wrapper(*args, **kwargs): + executor.submit(_run_func_if_is_last, ttl, func, *args, **kwargs) + + return wrapper + + return inner + + +@delay_run(ttl=10) +def test_delay_run(username, year=2000): + print("Hello, %s, now is %s" % (username, year)) + + +@merge_delay_run(ttl=10) +def test_merge_delay_run(*users): + name = ','.join(users) + print("Hello, %s, now is %s" % (name, time.time())) diff --git a/apps/notifications/signal_handlers.py b/apps/notifications/signal_handlers.py index 74b700643..5b6b75a7e 100644 --- a/apps/notifications/signal_handlers.py +++ b/apps/notifications/signal_handlers.py @@ -7,7 +7,7 @@ from django.db.models.signals import post_save from django.dispatch import receiver from django.utils.functional import LazyObject -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.utils import get_logger from common.utils.connection import RedisPubSub from notifications.backends import BACKEND diff --git a/apps/orgs/signal_handlers/common.py b/apps/orgs/signal_handlers/common.py index 802e9c299..dd4168c5c 100644 --- a/apps/orgs/signal_handlers/common.py +++ b/apps/orgs/signal_handlers/common.py @@ -10,7 +10,7 @@ from django.dispatch import receiver from django.utils.functional import LazyObject from common.const.signals import PRE_REMOVE, POST_REMOVE -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.signals import django_ready from common.utils import get_logger from common.utils.connection import RedisPubSub diff --git a/apps/perms/utils/user_perm_tree.py b/apps/perms/utils/user_perm_tree.py index 8888a67e9..b88b6db26 100644 --- a/apps/perms/utils/user_perm_tree.py +++ b/apps/perms/utils/user_perm_tree.py @@ -4,27 +4,25 @@ from collections import defaultdict from django.conf import settings from django.core.cache import cache -from users.models import User from assets.models import Asset from assets.utils import NodeAssetsUtil +from common.db.models import output_as_string +from common.decorators import on_transaction_commit +from common.utils import get_logger +from common.utils.common import lazyproperty, timeit from orgs.models import Organization from orgs.utils import ( current_org, tmp_to_org, tmp_to_root_org ) -from common.decorator import on_transaction_commit -from common.utils import get_logger -from common.utils.common import lazyproperty, timeit -from common.db.models import output_as_string - from perms.locks import UserGrantedTreeRebuildLock from perms.models import ( AssetPermission, UserAssetGrantedTreeNodeRelation, PermNode ) - +from users.models import User from .permission import AssetPermissionUtil logger = get_logger(__name__) @@ -78,7 +76,7 @@ class UserPermTreeRefreshUtil(_UserPermTreeCacheMixin): end = time.time() logger.info( 'Refresh user [{user}] org [{org}] perm tree, user {use_time:.2f}s' - ''.format(user=self.user, org=org, use_time=end-start) + ''.format(user=self.user, org=org, use_time=end - start) ) def _clean_user_perm_tree_for_legacy_org(self): @@ -315,9 +313,9 @@ class UserPermTreeBuildUtil(object): asset_node_pairs = Asset.nodes.through.objects \ .filter(asset_id__in=self.direct_asset_ids) \ .annotate( - str_asset_id=output_as_string('asset_id'), - str_node_id=output_as_string('node_id') - ).values_list('str_asset_id', 'str_node_id') + str_asset_id=output_as_string('asset_id'), + str_node_id=output_as_string('node_id') + ).values_list('str_asset_id', 'str_node_id') asset_node_pairs = list(asset_node_pairs) return asset_node_pairs diff --git a/apps/settings/signal_handlers.py b/apps/settings/signal_handlers.py index f4f4c74ce..999a01acd 100644 --- a/apps/settings/signal_handlers.py +++ b/apps/settings/signal_handlers.py @@ -8,7 +8,7 @@ from django.db.utils import ProgrammingError, OperationalError from django.dispatch import receiver from django.utils.functional import LazyObject -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.signals import django_ready from common.utils import get_logger, ssh_key_gen from common.utils.connection import RedisPubSub diff --git a/apps/terminal/signal_handlers/db_port.py b/apps/terminal/signal_handlers/db_port.py index 9ee60257c..bacbc1f0c 100644 --- a/apps/terminal/signal_handlers/db_port.py +++ b/apps/terminal/signal_handlers/db_port.py @@ -2,7 +2,7 @@ from django.db.models.signals import post_save, post_delete from django.dispatch import receiver from assets.models import Asset -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.signals import django_ready from common.utils import get_logger from ..utils import db_port_manager diff --git a/apps/terminal/signal_handlers/terminal.py b/apps/terminal/signal_handlers/terminal.py index 10d87b989..9a1536e02 100644 --- a/apps/terminal/signal_handlers/terminal.py +++ b/apps/terminal/signal_handlers/terminal.py @@ -2,7 +2,7 @@ from django.db.models.signals import post_save from django.dispatch import receiver from django.utils.functional import LazyObject -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.utils import get_logger from common.utils.connection import RedisPubSub from ..models import Task diff --git a/apps/terminal/startup.py b/apps/terminal/startup.py index 36a740cb6..a6f656565 100644 --- a/apps/terminal/startup.py +++ b/apps/terminal/startup.py @@ -1,16 +1,16 @@ import os -import time import socket import threading +import time + from django.conf import settings from common.db.utils import close_old_connections -from common.decorator import Singleton -from common.utils import get_disk_usage, get_cpu_load, get_memory_usage, get_logger - -from .serializers.terminal import TerminalRegistrationSerializer, StatSerializer +from common.decorators import Singleton +from common.utils import get_disk_usage, get_cpu_load, get_memory_usage from .const import TerminalType from .models import Terminal +from .serializers.terminal import TerminalRegistrationSerializer, StatSerializer __all__ = ['CoreTerminal', 'CeleryTerminal'] diff --git a/apps/terminal/utils/db_port_mapper.py b/apps/terminal/utils/db_port_mapper.py index 6f8457d22..f5405f97e 100644 --- a/apps/terminal/utils/db_port_mapper.py +++ b/apps/terminal/utils/db_port_mapper.py @@ -4,7 +4,7 @@ from django.utils.translation import ugettext_lazy as _ from assets.const import DatabaseTypes from assets.models import Database -from common.decorator import Singleton +from common.decorators import Singleton from common.exceptions import JMSException from common.utils import get_logger, get_object_or_none from orgs.utils import tmp_to_root_org diff --git a/apps/tickets/signal_handlers/ticket.py b/apps/tickets/signal_handlers/ticket.py index 8d9a9ef32..954aee0fa 100644 --- a/apps/tickets/signal_handlers/ticket.py +++ b/apps/tickets/signal_handlers/ticket.py @@ -1,9 +1,8 @@ # -*- coding: utf-8 -*- # -from django.dispatch import receiver from django.db.models.signals import post_save, m2m_changed -from common.decorator import on_transaction_commit +from common.decorators import on_transaction_commit from common.utils import get_logger from tickets.models import Ticket diff --git a/apps/users/signal_handlers.py b/apps/users/signal_handlers.py index bcf50173a..487b3c917 100644 --- a/apps/users/signal_handlers.py +++ b/apps/users/signal_handlers.py @@ -1,20 +1,19 @@ # -*- coding: utf-8 -*- # -from django.dispatch import receiver -from django_auth_ldap.backend import populate_user from django.conf import settings from django.core.exceptions import PermissionDenied -from django_cas_ng.signals import cas_user_authenticated from django.db.models.signals import post_save +from django.dispatch import receiver +from django_auth_ldap.backend import populate_user +from django_cas_ng.signals import cas_user_authenticated +from authentication.backends.oauth2.signals import oauth2_create_or_update_user from authentication.backends.oidc.signals import openid_create_or_update_user from authentication.backends.saml2.signals import saml2_create_or_update_user -from authentication.backends.oauth2.signals import oauth2_create_or_update_user +from common.decorators import on_transaction_commit from common.utils import get_logger -from common.decorator import on_transaction_commit -from .signals import post_user_create from .models import User, UserPasswordHistory - +from .signals import post_user_create logger = get_logger(__file__) @@ -47,9 +46,9 @@ def user_authenticated_handle(user, created, source, attrs=None, **kwargs): @receiver(post_save, sender=User) def save_passwd_change(sender, instance: User, **kwargs): - passwords = UserPasswordHistory.objects\ + passwords = UserPasswordHistory.objects \ .filter(user=instance) \ - .order_by('-date_created')\ + .order_by('-date_created') \ .values_list('password', flat=True) passwords = passwords[:int(settings.OLD_PASSWORD_HISTORY_LIMIT_COUNT)]