From 26d8e4568331b78a24eb36f8c879a4a1a72d3378 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sun, 3 Mar 2024 14:22:17 -0500 Subject: [PATCH] Asynchronous Dynamic Module Loading Support (#1071) --- apprise/manager.py | 371 ++++++++++++++++-------------- test/test_notification_manager.py | 52 +++++ 2 files changed, 254 insertions(+), 169 deletions(-) diff --git a/apprise/manager.py b/apprise/manager.py index 3d964af2..d649afab 100644 --- a/apprise/manager.py +++ b/apprise/manager.py @@ -32,6 +32,7 @@ import sys import time import hashlib import inspect +import threading from .utils import import_module from .utils import Singleton from .utils import parse_list @@ -60,6 +61,9 @@ class PluginManager(metaclass=Singleton): # The module path to scan module_path = join(abspath(dirname(__file__)), _id) + # thread safe loading + _lock = threading.Lock() + def __init__(self, *args, **kwargs): """ Over-ride our class instantiation to provide a singleton @@ -103,40 +107,49 @@ class PluginManager(metaclass=Singleton): # effort/overhead doing it again self._paths_previously_scanned = set() + # Track loaded module paths to prevent from loading them again + self._loaded = set() + def unload_modules(self, disable_native=False): """ Reset our object and unload all modules """ - if self._custom_module_map: - # Handle Custom Module Assignments - for meta in self._custom_module_map.values(): - if meta['name'] not in self._module_map: - # Nothing to remove - continue + with self._lock: + if self._custom_module_map: + # Handle Custom Module Assignments + for meta in self._custom_module_map.values(): + if meta['name'] not in self._module_map: + # Nothing to remove + continue - # For the purpose of tidying up un-used modules in memory - loaded = [m for m in sys.modules.keys() - if m.startswith( - self._module_map[meta['name']]['path'])] + # For the purpose of tidying up un-used modules in memory + loaded = [m for m in sys.modules.keys() + if m.startswith( + self._module_map[meta['name']]['path'])] - for module_path in loaded: - del sys.modules[module_path] + for module_path in loaded: + del sys.modules[module_path] - # Reset disabled plugins (if any) - for schema in self._disabled: - self._schema_map[schema].enabled = True - self._disabled.clear() + # Reset disabled plugins (if any) + for schema in self._disabled: + self._schema_map[schema].enabled = True + self._disabled.clear() - # Reset our variables - self._module_map = None if not disable_native else {} - self._schema_map = {} - self._custom_module_map = {} + # Reset our variables + self._schema_map = {} + self._custom_module_map = {} + if disable_native: + self._module_map = {} - # Reset our path cache - self._paths_previously_scanned = set() + else: + self._module_map = None + self._loaded = set() - def load_modules(self, path=None, name=None): + # Reset our path cache + self._paths_previously_scanned = set() + + def load_modules(self, path=None, name=None, force=False): """ Load our modules into memory """ @@ -145,102 +158,120 @@ class PluginManager(metaclass=Singleton): module_name_prefix = self.module_name_prefix if name is None else name module_path = self.module_path if path is None else path - if not self: - # Initialize our maps - self._module_map = {} - self._schema_map = {} - self._custom_module_map = {} + with self._lock: + if not force and module_path in self._loaded: + # We're done + return - # Used for the detection of additional Notify Services objects - # The .py extension is optional as we support loading directories too - module_re = re.compile( - r'^(?P' + self.fname_prefix + r'[a-z0-9]+)(\.py)?$', re.I) + # Our base reference + module_count = len(self._module_map) if self._module_map else 0 + schema_count = len(self._schema_map) if self._schema_map else 0 - t_start = time.time() - for f in os.listdir(module_path): - tl_start = time.time() - match = module_re.match(f) - if not match: - # keep going - continue + if not self: + # Initialize our maps + self._module_map = {} + self._schema_map = {} + self._custom_module_map = {} - elif match.group('name') == f'{self.fname_prefix}Base': - # keep going - continue + # Used for the detection of additional Notify Services objects + # The .py extension is optional as we support loading directories + # too + module_re = re.compile( + r'^(?P' + self.fname_prefix + r'[a-z0-9]+)(\.py)?$', + re.I) - # Store our notification/plugin name: - module_name = match.group('name') - module_pyname = '{}.{}'.format(module_name_prefix, module_name) - - if module_name in self._module_map: - logger.warning( - "%s(s) (%s) already loaded; ignoring %s", - self.name, module_name, os.path.join(module_path, f)) - continue - - try: - module = __import__( - module_pyname, - globals(), locals(), - fromlist=[module_name]) - - except ImportError: - # No problem, we can try again another way... - module = import_module( - os.path.join(module_path, f), module_pyname) - if not module: - # logging found in import_module and not needed here + t_start = time.time() + for f in os.listdir(module_path): + tl_start = time.time() + match = module_re.match(f) + if not match: + # keep going continue - if not hasattr(module, module_name): - # Not a library we can load as it doesn't follow the simple - # rule that the class must bear the same name as the - # notification file itself. - logger.trace( - "%s (%s) import failed; no filename/Class " - "match found in %s", - self.name, module_name, os.path.join(module_path, f)) - continue - - # Get our plugin - plugin = getattr(module, module_name) - if not hasattr(plugin, 'app_id'): - # Filter out non-notification modules - logger.trace( - "(%s) import failed; no app_id defined in %s", - self.name, module_name, os.path.join(module_path, f)) - continue - - # Add our plugin name to our module map - self._module_map[module_name] = { - 'plugin': set([plugin]), - 'module': module, - 'path': '{}.{}'.format(module_name_prefix, module_name), - 'native': True, - } - - fn = getattr(plugin, 'schemas', None) - schemas = set([]) if not callable(fn) else fn(plugin) - - # map our schema to our plugin - for schema in schemas: - if schema in self._schema_map: - logger.error( - "{} schema ({}) mismatch detected - {} to {}" - .format(self.name, schema, self._schema_map, plugin)) + elif match.group('name') == f'{self.fname_prefix}Base': + # keep going continue - # Assign plugin - self._schema_map[schema] = plugin + # Store our notification/plugin name: + module_name = match.group('name') + module_pyname = '{}.{}'.format(module_name_prefix, module_name) - logger.trace( - '{} {} loaded in {:.6f}s'.format( - self.name, module_name, (time.time() - tl_start))) - logger.debug( - '{} {}(s) and {} Schema(s) loaded in {:.4f}s' - .format( - self.name, len(self._module_map), len(self._schema_map), - (time.time() - t_start))) + if module_name in self._module_map: + logger.warning( + "%s(s) (%s) already loaded; ignoring %s", + self.name, module_name, os.path.join(module_path, f)) + continue + + try: + module = __import__( + module_pyname, + globals(), locals(), + fromlist=[module_name]) + + except ImportError: + # No problem, we can try again another way... + module = import_module( + os.path.join(module_path, f), module_pyname) + if not module: + # logging found in import_module and not needed here + continue + + if not hasattr(module, module_name): + # Not a library we can load as it doesn't follow the simple + # rule that the class must bear the same name as the + # notification file itself. + logger.trace( + "%s (%s) import failed; no filename/Class " + "match found in %s", + self.name, module_name, os.path.join(module_path, f)) + continue + + # Get our plugin + plugin = getattr(module, module_name) + if not hasattr(plugin, 'app_id'): + # Filter out non-notification modules + logger.trace( + "(%s) import failed; no app_id defined in %s", + self.name, module_name, os.path.join(module_path, f)) + continue + + # Add our plugin name to our module map + self._module_map[module_name] = { + 'plugin': set([plugin]), + 'module': module, + 'path': '{}.{}'.format(module_name_prefix, module_name), + 'native': True, + } + + fn = getattr(plugin, 'schemas', None) + schemas = set([]) if not callable(fn) else fn(plugin) + + # map our schema to our plugin + for schema in schemas: + if schema in self._schema_map: + logger.error( + "{} schema ({}) mismatch detected - {} to {}" + .format(self.name, schema, self._schema_map, + plugin)) + continue + + # Assign plugin + self._schema_map[schema] = plugin + + logger.trace( + '{} {} loaded in {:.6f}s'.format( + self.name, module_name, (time.time() - tl_start))) + + # Track the directory loaded so we never load it again + self._loaded.add(module_path) + + logger.debug( + '{} {}(s) and {} Schema(s) loaded in {:.4f}s' + .format( + self.name, + len(self._module_map) - module_count, + len(self._schema_map) - schema_count, + (time.time() - t_start))) def module_detection(self, paths, cache=True): """ @@ -334,67 +365,69 @@ class PluginManager(metaclass=Singleton): # end of _import_module() return - for _path in paths: - path = os.path.abspath(os.path.expanduser(_path)) - if (cache and path in self._paths_previously_scanned) \ - or not os.path.exists(path): - # We're done as we've already scanned this - continue - - # Store our path as a way of hashing it has been handled - self._paths_previously_scanned.add(path) - - if os.path.isdir(path) and not \ - os.path.isfile(os.path.join(path, '__init__.py')): - - logger.debug('Scanning for custom plugins in: %s', path) - for entry in os.listdir(path): - re_match = module_re.match(entry) - if not re_match: - # keep going - logger.trace('Plugin Scan: Ignoring %s', entry) - continue - - new_path = os.path.join(path, entry) - if os.path.isdir(new_path): - # Update our path - new_path = os.path.join(path, entry, '__init__.py') - if not os.path.isfile(new_path): - logger.trace( - 'Plugin Scan: Ignoring %s', - os.path.join(path, entry)) - continue - - if not cache or \ - (cache and - new_path not in self._paths_previously_scanned): - # Load our module - _import_module(new_path) - - # Add our subdir path - self._paths_previously_scanned.add(new_path) - else: - if os.path.isdir(path): - # This logic is safe to apply because we already validated - # the directories state above; update our path - path = os.path.join(path, '__init__.py') - if cache and path in self._paths_previously_scanned: - continue - - self._paths_previously_scanned.add(path) - - # directly load as is - re_match = module_re.match(os.path.basename(path)) - # must be a match and must have a .py extension - if not re_match or not re_match.group(1): - # keep going - logger.trace('Plugin Scan: Ignoring %s', path) + with self._lock: + for _path in paths: + path = os.path.abspath(os.path.expanduser(_path)) + if (cache and path in self._paths_previously_scanned) \ + or not os.path.exists(path): + # We're done as we've already scanned this continue - # Load our module - _import_module(path) + # Store our path as a way of hashing it has been handled + self._paths_previously_scanned.add(path) - return None + if os.path.isdir(path) and not \ + os.path.isfile(os.path.join(path, '__init__.py')): + + logger.debug('Scanning for custom plugins in: %s', path) + for entry in os.listdir(path): + re_match = module_re.match(entry) + if not re_match: + # keep going + logger.trace('Plugin Scan: Ignoring %s', entry) + continue + + new_path = os.path.join(path, entry) + if os.path.isdir(new_path): + # Update our path + new_path = os.path.join(path, entry, '__init__.py') + if not os.path.isfile(new_path): + logger.trace( + 'Plugin Scan: Ignoring %s', + os.path.join(path, entry)) + continue + + if not cache or \ + (cache and new_path not in + self._paths_previously_scanned): + # Load our module + _import_module(new_path) + + # Add our subdir path + self._paths_previously_scanned.add(new_path) + else: + if os.path.isdir(path): + # This logic is safe to apply because we already + # validated the directories state above; update our + # path + path = os.path.join(path, '__init__.py') + if cache and path in self._paths_previously_scanned: + continue + + self._paths_previously_scanned.add(path) + + # directly load as is + re_match = module_re.match(os.path.basename(path)) + # must be a match and must have a .py extension + if not re_match or not re_match.group(1): + # keep going + logger.trace('Plugin Scan: Ignoring %s', path) + continue + + # Load our module + _import_module(path) + + return None def add(self, plugin, schemas=None, url=None, send_func=None): """ @@ -714,4 +747,4 @@ class PluginManager(metaclass=Singleton): """ Determines if object has loaded or not """ - return True if self._module_map is not None else False + return True if self._loaded and self._module_map is not None else False diff --git a/test/test_notification_manager.py b/test/test_notification_manager.py index 9274889a..e4f47db5 100644 --- a/test/test_notification_manager.py +++ b/test/test_notification_manager.py @@ -29,8 +29,10 @@ import re import pytest import types +import threading from inspect import cleandoc +from apprise import Apprise from apprise.NotificationManager import NotificationManager from apprise.plugins.NotifyBase import NotifyBase @@ -248,6 +250,48 @@ def test_notification_manager_module_loading(tmpdir): N_MGR.load_modules() N_MGR.load_modules() + # + # Thread Testing + # + + # This tests against a racing condition when the modules have not been + # loaded. When multiple instances of Apprise are all instantiated, + # the loading of the modules will occur for each instance if detected + # having not been previously done, this tests that we can dynamically + # support the loading of modules once whe multiple instances to apprise + # are instantiated. + thread_count = 10 + + def thread_test(result, no): + """ + Load our apprise object with valid URLs and store our result + """ + apobj = Apprise() + result[no] = apobj.add('json://localhost') and \ + apobj.add('form://localhost') and \ + apobj.add('xml://localhost') + + # Unload our modules + N_MGR.unload_modules() + + # Prepare threads to load + results = [None] * thread_count + threads = [ + threading.Thread(target=thread_test, args=(results, no)) + for no in range(thread_count) + ] + + # Verify we can safely load our modules in a thread safe environment + for t in threads: + t.start() + + for t in threads: + t.join() + + # Verify we loaded our urls in all threads successfully + for result in results: + assert result is True + def test_notification_manager_decorators(tmpdir): """ @@ -376,6 +420,10 @@ def test_notification_manager_decorators(tmpdir): """)) assert 'mytest' not in N_MGR N_MGR.load_modules(path=str(notify_base)) + + # It's still not loaded because the path has already been scanned + assert 'mytest' not in N_MGR + N_MGR.load_modules(path=str(notify_base), force=True) assert 'mytest' in N_MGR # Could not be loaded because the filename did not align with the class @@ -387,3 +435,7 @@ def test_notification_manager_decorators(tmpdir): N_MGR.load_modules(path=str(notify_base)) # Our item is still loaded as expected assert 'mytest' in N_MGR + + # Simple test to make sure we can handle duplicate entries loaded + N_MGR.load_modules(path=str(notify_base), force=True) + N_MGR.load_modules(path=str(notify_base), force=True)