refactor: handle parallel synchronous notifications with a thread pool (#839)

pull/846/head
Ryan Young 2023-02-28 07:11:30 -08:00 committed by GitHub
parent 6458ab0506
commit 3a2af45e4d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 76 deletions

View File

@ -31,8 +31,8 @@
# POSSIBILITY OF SUCH DAMAGE.
import asyncio
import concurrent.futures as cf
import os
from functools import partial
from itertools import chain
from . import common
from .conversion import convert_between
@ -376,7 +376,7 @@ class Apprise:
try:
# Process arguments and build synchronous and asynchronous calls
# (this step can throw internal errors).
sync_partials, async_cors = self._create_notify_calls(
sequential_calls, parallel_calls = self._create_notify_calls(
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, match_always=match_always, attach=attach,
@ -387,49 +387,13 @@ class Apprise:
# No notifications sent, and there was an internal error.
return False
if not sync_partials and not async_cors:
if not sequential_calls and not parallel_calls:
# Nothing to send
return None
sync_result = Apprise._notify_all(*sync_partials)
if async_cors:
# A single coroutine sends all asynchronous notifications in
# parallel.
all_cor = Apprise._async_notify_all(*async_cors)
try:
# Python <3.7 automatically starts an event loop if there isn't
# already one for the main thread.
loop = asyncio.get_event_loop()
except RuntimeError:
# Python >=3.7 raises this exception if there isn't already an
# event loop. So, we can spin up our own.
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.set_debug(self.debug)
# Run the coroutine and wait for the result.
async_result = loop.run_until_complete(all_cor)
# Clean up the loop.
loop.close()
asyncio.set_event_loop(None)
else:
old_debug = loop.get_debug()
loop.set_debug(self.debug)
# Run the coroutine and wait for the result.
async_result = loop.run_until_complete(all_cor)
loop.set_debug(old_debug)
else:
async_result = True
return sync_result and async_result
sequential_result = Apprise._notify_sequential(*sequential_calls)
parallel_result = Apprise._notify_parallel_threadpool(*parallel_calls)
return sequential_result and parallel_result
async def async_notify(self, *args, **kwargs):
"""
@ -442,41 +406,42 @@ class Apprise:
try:
# Process arguments and build synchronous and asynchronous calls
# (this step can throw internal errors).
sync_partials, async_cors = self._create_notify_calls(
sequential_calls, parallel_calls = self._create_notify_calls(
*args, **kwargs)
except TypeError:
# No notifications sent, and there was an internal error.
return False
if not sync_partials and not async_cors:
if not sequential_calls and not parallel_calls:
# Nothing to send
return None
sync_result = Apprise._notify_all(*sync_partials)
async_result = await Apprise._async_notify_all(*async_cors)
return sync_result and async_result
sequential_result = Apprise._notify_sequential(*sequential_calls)
parallel_result = \
await Apprise._notify_parallel_asyncio(*parallel_calls)
return sequential_result and parallel_result
def _create_notify_calls(self, *args, **kwargs):
"""
Creates notifications for all the plugins loaded.
Returns a list of synchronous calls (partial functions with no
arguments required) for plugins with async disabled and a list of
asynchronous calls (coroutines) for plugins with async enabled.
Returns a list of (server, notify() kwargs) tuples for plugins with
parallelism disabled and another list for plugins with parallelism
enabled.
"""
all_calls = list(self._create_notify_gen(*args, **kwargs))
# Split into synchronous partials and asynchronous coroutines.
sync_partials, async_cors = [], []
for notify in all_calls:
if asyncio.iscoroutine(notify):
async_cors.append(notify)
# Split into sequential and parallel notify() calls.
sequential, parallel = [], []
for (server, notify_kwargs) in all_calls:
if server.asset.async_mode:
parallel.append((server, notify_kwargs))
else:
sync_partials.append(notify)
sequential.append((server, notify_kwargs))
return sync_partials, async_cors
return sequential, parallel
def _create_notify_gen(self, body, title='',
notify_type=common.NotifyType.INFO,
@ -584,23 +549,20 @@ class Apprise:
attach=attach,
body_format=body_format
)
if server.asset.async_mode:
yield server.async_notify(**kwargs)
else:
yield partial(server.notify, **kwargs)
yield (server, kwargs)
@staticmethod
def _notify_all(*partials):
def _notify_sequential(*servers_kwargs):
"""
Process a list of synchronous notify() calls.
Process a list of notify() calls sequentially and synchronously.
"""
success = True
for notify in partials:
for (server, kwargs) in servers_kwargs:
try:
# Send notification
result = notify()
result = server.notify(**kwargs)
success = success and result
except TypeError:
@ -616,14 +578,59 @@ class Apprise:
return success
@staticmethod
async def _async_notify_all(*cors):
def _notify_parallel_threadpool(*servers_kwargs):
"""
Process a list of asynchronous async_notify() calls.
Process a list of notify() calls in parallel and synchronously.
"""
# 0-length case
if not servers_kwargs:
return True
# Create log entry
logger.info('Notifying %d service(s) asynchronously.', len(cors))
logger.info(
'Notifying %d service(s) with threads.', len(servers_kwargs))
with cf.ThreadPoolExecutor() as executor:
success = True
futures = [executor.submit(server.notify, **kwargs)
for (server, kwargs) in servers_kwargs]
for future in cf.as_completed(futures):
try:
result = future.result()
success = success and result
except TypeError:
# These are our internally thrown notifications.
success = False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Unhandled Notification Exception")
success = False
return success
@staticmethod
async def _notify_parallel_asyncio(*servers_kwargs):
"""
Process a list of async_notify() calls in parallel and asynchronously.
"""
# 0-length case
if not servers_kwargs:
return True
# Create log entry
logger.info(
'Notifying %d service(s) asynchronously.', len(servers_kwargs))
async def do_call(server, kwargs):
return await server.async_notify(**kwargs)
cors = (do_call(server, kwargs) for (server, kwargs) in servers_kwargs)
results = await asyncio.gather(*cors, return_exceptions=True)
if any(isinstance(status, Exception)

View File

@ -32,6 +32,7 @@
from __future__ import print_function
import asyncio
import concurrent.futures
import re
import sys
import pytest
@ -1781,7 +1782,9 @@ def test_apprise_details_plugin_verification():
@mock.patch('requests.post')
@mock.patch('asyncio.gather', wraps=asyncio.gather)
def test_apprise_async_mode(mock_gather, mock_post, tmpdir):
@mock.patch('concurrent.futures.ThreadPoolExecutor',
wraps=concurrent.futures.ThreadPoolExecutor)
def test_apprise_async_mode(mock_threadpool, mock_gather, mock_post, tmpdir):
"""
API: Apprise() async_mode tests
@ -1814,9 +1817,9 @@ def test_apprise_async_mode(mock_gather, mock_post, tmpdir):
# Send Notifications Asyncronously
assert a.notify("async") is True
# Verify our async code got executed
assert mock_gather.call_count > 0
mock_gather.reset_mock()
# Verify our thread pool was created
assert mock_threadpool.call_count == 1
mock_threadpool.reset_mock()
# Provide an over-ride now
asset = AppriseAsset(async_mode=False)
@ -1863,9 +1866,9 @@ def test_apprise_async_mode(mock_gather, mock_post, tmpdir):
# Send 1 Notification Syncronously, the other Asyncronously
assert a.notify("a mixed batch") is True
# Verify our async code got called
assert mock_gather.call_count > 0
mock_gather.reset_mock()
# Verify our thread pool was created
assert mock_threadpool.call_count == 1
mock_threadpool.reset_mock()
def test_notify_matrix_dynamic_importing(tmpdir):

View File

@ -561,7 +561,7 @@ def test_apprise_config_with_apprise_obj(tmpdir):
super().__init__(
notify_format=NotifyFormat.HTML, **kwargs)
async def async_notify(self, **kwargs):
def notify(self, **kwargs):
# Pretend everything is okay
return True