Implemented a new async_notify() method (#397)

pull/402/head
Ryan Young 3 years ago committed by GitHub
parent 037c5881bb
commit 3cfbdea101
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -319,17 +319,126 @@ class Apprise(object):
such as turning a \n into an actual new line, etc.
"""
if len(self) == 0:
# Nothing to notify
if ASYNCIO_SUPPORT:
return py3compat.asyncio.tosync(
self.async_notify(
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, attach=attach,
interpret_escapes=interpret_escapes,
),
debug=self.debug
)
else:
try:
results = list(
self._notifyall(
Apprise._notifyhandler,
body, title,
notify_type=notify_type, body_format=body_format,
tag=tag, attach=attach,
interpret_escapes=interpret_escapes,
)
)
except TypeError:
# No notifications sent, and there was an internal error.
return False
# Initialize our return result which only turns to True if we send
# at least one valid notification
status = None
else:
if len(results) > 0:
# All notifications sent, return False if any failed.
return all(results)
if not (title or body):
else:
# No notifications sent.
return None
def async_notify(self, *args, **kwargs):
"""
Send a notification to all of the plugins previously loaded, for
asynchronous callers. This method is an async method that should be
awaited on, even if it is missing the async keyword in its signature.
(This is omitted to preserve syntax compatibility with Python 2.)
The arguments are identical to those of Apprise.notify(). This method
is not available in Python 2.
"""
try:
coroutines = list(
self._notifyall(
Apprise._notifyhandlerasync, *args, **kwargs))
except TypeError:
# No notifications sent, and there was an internal error.
return py3compat.asyncio.toasyncwrap(False)
else:
if len(coroutines) > 0:
# All notifications sent, return False if any failed.
return py3compat.asyncio.notify(coroutines)
else:
# No notifications sent.
return py3compat.asyncio.toasyncwrap(None)
@staticmethod
def _notifyhandler(server, **kwargs):
"""
The synchronous notification sender. Returns True if the notification
sent successfully.
"""
try:
# Send notification
return server.notify(**kwargs)
except TypeError:
# These our our internally thrown notifications
return False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
return False
@staticmethod
def _notifyhandlerasync(server, **kwargs):
"""
The asynchronous notification sender. Returns a coroutine that yields
True if the notification sent successfully.
"""
if server.asset.async_mode:
return server.async_notify(**kwargs)
else:
# Send the notification immediately, and wrap the result in a
# coroutine.
status = Apprise._notifyhandler(server, **kwargs)
return py3compat.asyncio.toasyncwrap(status)
def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG, attach=None,
interpret_escapes=None):
"""
Creates notifications for all of the plugins loaded.
Returns a generator that calls handler for each notification. The first
and only argument supplied to handler is the server, and the keyword
arguments are exactly as they would be passed to server.notify().
"""
if len(self) == 0:
# Nothing to notify
raise TypeError
if not (title or body):
raise TypeError
if six.PY2:
# Python 2.7.x Unicode Character Handling
# Ensure we're working with utf-8
@ -344,14 +453,9 @@ class Apprise(object):
# Prepare attachments if required
if attach is not None and not isinstance(attach, AppriseAttachment):
try:
attach = AppriseAttachment(
attach, asset=self.asset, location=self.location)
except TypeError:
# bad attachments
return False
# Allow Asset default value
body_format = self.asset.body_format \
if body_format is None else body_format
@ -360,17 +464,8 @@ class Apprise(object):
interpret_escapes = self.asset.interpret_escapes \
if interpret_escapes is None else interpret_escapes
# for asyncio support; we track a list of our servers to notify
coroutines = []
# Iterate over our loaded plugins
for server in self.find(tag):
if status is None:
# We have at least one server to notify; change status
# to be a default value of True from now (purely an
# initialiation at this point)
status = True
# If our code reaches here, we either did not define a tag (it
# was set to None), or we did define a tag and the logic above
# determined we need to notify the service it's associated with
@ -443,7 +538,7 @@ class Apprise(object):
except AttributeError:
# Must be of string type
logger.error('Failed to escape message body')
return False
raise TypeError
try:
# Added overhead required due to Python 3 Encoding Bug
@ -460,48 +555,15 @@ class Apprise(object):
except AttributeError:
# Must be of string type
logger.error('Failed to escape message title')
return False
raise TypeError
if ASYNCIO_SUPPORT and server.asset.async_mode:
# Build a list of servers requiring notification
# that will be triggered asynchronously afterwards
coroutines.append(server.async_notify(
yield handler(
server,
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach))
# We gather at this point and notify at the end
continue
try:
# Send notification
if not server.notify(
body=conversion_map[server.notify_format],
title=title,
notify_type=notify_type,
attach=attach):
# Toggle our return status flag
status = False
except TypeError:
# These our our internally thrown notifications
status = False
except Exception:
# A catch all so we don't have to abort early
# just because one of our plugins has a bug in it.
logger.exception("Notification Exception")
status = False
if coroutines:
# perform our async notification(s)
if not py3compat.asyncio.notify(coroutines, debug=self.debug):
# Toggle our status only if we had a failure
status = False
return status
attach=attach
)
def details(self, lang=None):
"""
@ -658,3 +720,7 @@ class Apprise(object):
"""
return sum([1 if not isinstance(s, (ConfigBase, AppriseConfig))
else len(s.servers()) for s in self.servers])
if six.PY2:
del Apprise.async_notify

@ -25,7 +25,6 @@
import sys
import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import partial
from ..URLBase import URLBase
from ..logger import logger
@ -37,9 +36,11 @@ ASYNCIO_RUN_SUPPORT = \
(sys.version_info.major == 3 and sys.version_info.minor >= 7)
def notify(coroutines, debug=False):
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def notify(coroutines): # noqa: E999
"""
A Wrapper to the AsyncNotifyBase.async_notify() calls allowing us
An async wrapper to the AsyncNotifyBase.async_notify() calls allowing us
to call gather() and collect the responses
"""
@ -47,59 +48,49 @@ def notify(coroutines, debug=False):
logger.info(
'Notifying {} service(s) asynchronously.'.format(len(coroutines)))
if ASYNCIO_RUN_SUPPORT:
# async reference produces a SyntaxError (E999) in Python v2.7
# For this reason we turn on the noqa flag
async def main(results, coroutines): # noqa: E999
"""
Task: Notify all servers specified and return our result set
through a mutable object.
"""
# send our notifications and store our result set into
# our results dictionary
results['response'] = \
await asyncio.gather(*coroutines, return_exceptions=True)
results = await asyncio.gather(*coroutines, return_exceptions=True)
# Returns True if all notifications succeeded, otherwise False is
# returned.
failed = any(not status or isinstance(status, Exception)
for status in results)
return not failed
# Initialize a mutable object we can populate with our notification
# responses
results = {}
# Send our notifications
asyncio.run(main(results, coroutines), debug=debug)
def tosync(cor, debug=False):
"""
Await a coroutine from non-async code.
"""
# Acquire our return status
status = next((s for s in results['response'] if s is False), True)
if ASYNCIO_RUN_SUPPORT:
return asyncio.run(cor, debug=debug)
else:
#
# The Deprecated Way (<= Python v3.6)
#
try:
# acquire access to our event loop
loop = asyncio.get_event_loop()
except RuntimeError:
# This happens if we're inside a thread of another application
# where there is no running event_loop(). Pythong v3.7 and higher
# automatically take care of this case for us. But for the lower
# versions we need to do the following:
# where there is no running event_loop(). Pythong v3.7 and
# higher automatically take care of this case for us. But for
# the lower versions we need to do the following:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
if debug:
# Enable debug mode
loop.set_debug(1)
loop.set_debug(debug)
# Send our notifications and acquire our status
results = loop.run_until_complete(asyncio.gather(*coroutines))
return loop.run_until_complete(cor)
# Acquire our return status
status = next((r for r in results if r is False), True)
# Returns True if all notifications succeeded, otherwise False is
# returned.
return status
async def toasyncwrap(v): # noqa: E999
"""
Create a coroutine that, when run, returns the provided value.
"""
return v
class AsyncNotifyBase(URLBase):
@ -111,13 +102,12 @@ class AsyncNotifyBase(URLBase):
"""
Async Notification Wrapper
"""
try:
loop = asyncio.get_event_loop()
with ThreadPoolExecutor() as executor:
try:
return await loop.run_in_executor(
executor,
partial(self.notify, *args, **kwargs),
)
None, partial(self.notify, *args, **kwargs))
except TypeError:
# These our our internally thrown notifications

@ -54,6 +54,15 @@ import inspect
import logging
logging.disable(logging.CRITICAL)
# Sending notifications requires the coroutines to be awaited, so we need to
# wrap the original function when mocking it. But don't import for Python 2.
if not six.PY2:
import apprise.py3compat.asyncio as py3aio
else:
class py3aio:
def notify():
pass
# Attachment Directory
TEST_VAR_DIR = join(dirname(__file__), 'var')
@ -63,6 +72,25 @@ def test_apprise():
API: Apprise() object
"""
def do_notify(server, *args, **kwargs):
return server.notify(*args, **kwargs)
apprise_test(do_notify)
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
def test_apprise_async():
"""
API: Apprise() object asynchronous methods
"""
def do_notify(server, *args, **kwargs):
return py3aio.tosync(server.async_notify(*args, **kwargs))
apprise_test(do_notify)
def apprise_test(do_notify):
# Caling load matix a second time which is an internal function causes it
# to skip over content already loaded into our matrix and thefore accesses
# other if/else parts of the code that aren't otherwise called
@ -153,7 +181,7 @@ def test_apprise():
a.clear()
# No servers to notify
assert a.notify(title="my title", body="my body") is False
assert do_notify(a, title="my title", body="my body") is False
class BadNotification(NotifyBase):
def __init__(self, **kwargs):
@ -202,8 +230,8 @@ def test_apprise():
assert len(a) == 0
# We'll fail because we've got nothing to notify
assert a.notify(
title="my title", body="my body") is False
assert do_notify(
a, title="my title", body="my body") is False
# Clear our server listings again
a.clear()
@ -213,50 +241,50 @@ def test_apprise():
# Bad Notification Type is still allowed as it is presumed the user
# know's what their doing
assert a.notify(
title="my title", body="my body", notify_type='bad') is True
assert do_notify(
a, title="my title", body="my body", notify_type='bad') is True
# No Title/Body combo's
assert a.notify(title=None, body=None) is False
assert a.notify(title='', body=None) is False
assert a.notify(title=None, body='') is False
assert do_notify(a, title=None, body=None) is False
assert do_notify(a, title='', body=None) is False
assert do_notify(a, title=None, body='') is False
# As long as one is present, we're good
assert a.notify(title=None, body='present') is True
assert a.notify(title='present', body=None) is True
assert a.notify(title="present", body="present") is True
assert do_notify(a, title=None, body='present') is True
assert do_notify(a, title='present', body=None) is True
assert do_notify(a, title="present", body="present") is True
# Send Attachment with success
attach = join(TEST_VAR_DIR, 'apprise-test.gif')
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a, body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# Send the attachment as an AppriseAttachment object
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a, body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# test a invalid attachment
assert a.notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a, body='body', title='test', notify_type=NotifyType.INFO,
attach='invalid://') is False
# Repeat the same tests above...
# however do it by directly accessing the object; this grants the similar
# results:
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a[0], body='body', title='test', notify_type=NotifyType.INFO,
attach=attach) is True
# Send the attachment as an AppriseAttachment object
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a[0], body='body', title='test', notify_type=NotifyType.INFO,
attach=AppriseAttachment(attach)) is True
# test a invalid attachment
assert a[0].notify(
body='body', title='test', notify_type=NotifyType.INFO,
assert do_notify(
a[0], body='body', title='test', notify_type=NotifyType.INFO,
attach='invalid://') is False
class ThrowNotification(NotifyBase):
@ -310,7 +338,7 @@ def test_apprise():
# Test when our notify both throws an exception and or just
# simply returns False
assert a.notify(title="present", body="present") is False
assert do_notify(a, title="present", body="present") is False
# Create a Notification that throws an unexected exception
class ThrowInstantiateNotification(NotifyBase):
@ -498,7 +526,27 @@ def test_apprise_tagging(mock_post, mock_get):
API: Apprise() object tagging functionality
"""
def do_notify(server, *args, **kwargs):
return server.notify(*args, **kwargs)
apprise_tagging_test(mock_post, mock_get, do_notify)
@mock.patch('requests.get')
@mock.patch('requests.post')
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
def test_apprise_tagging_async(mock_post, mock_get):
"""
API: Apprise() object tagging functionality asynchronous methods
"""
def do_notify(server, *args, **kwargs):
return py3aio.tosync(server.async_notify(*args, **kwargs))
apprise_tagging_test(mock_post, mock_get, do_notify)
def apprise_tagging_test(mock_post, mock_get, do_notify):
# A request
robj = mock.Mock()
setattr(robj, 'raw', mock.Mock())
@ -535,18 +583,19 @@ def test_apprise_tagging(mock_post, mock_get):
# notify the awesome tag; this would notify both services behind the
# scenes
assert a.notify(title="my title", body="my body", tag='awesome') is True
assert do_notify(
a, title="my title", body="my body", tag='awesome') is True
# notify all of the tags
assert a.notify(
title="my title", body="my body", tag=['awesome', 'mmost']) is True
assert do_notify(
a, title="my title", body="my body", tag=['awesome', 'mmost']) is True
# When we query against our loaded notifications for a tag that simply
# isn't assigned to anything, we return None. None (different then False)
# tells us that we litterally had nothing to query. We didn't fail...
# but we also didn't do anything...
assert a.notify(
title="my title", body="my body", tag='missing') is None
assert do_notify(
a, title="my title", body="my body", tag='missing') is None
# Now to test the ability to and and/or notifications
a = Apprise()
@ -571,20 +620,20 @@ def test_apprise_tagging(mock_post, mock_get):
# Matches the following only:
# - json://localhost/tagCD/
# - json://localhost/tagCDE/
assert a.notify(
title="my title", body="my body", tag=[('TagC', 'TagD')]) is True
assert do_notify(
a, title="my title", body="my body", tag=[('TagC', 'TagD')]) is True
# Expression: (TagY and TagZ) or TagX
# Matches nothing, None is returned in this case
assert a.notify(
title="my title", body="my body",
assert do_notify(
a, title="my title", body="my body",
tag=[('TagY', 'TagZ'), 'TagX']) is None
# Expression: (TagY and TagZ) or TagA
# Matches the following only:
# - json://localhost/tagAB/
assert a.notify(
title="my title", body="my body",
assert do_notify(
a, title="my title", body="my body",
tag=[('TagY', 'TagZ'), 'TagA']) is True
# Expression: (TagE and TagD) or TagB
@ -592,8 +641,8 @@ def test_apprise_tagging(mock_post, mock_get):
# - json://localhost/tagCDE/
# - json://localhost/tagAB/
# - json://localhost/tagB/
assert a.notify(
title="my title", body="my body",
assert do_notify(
a, title="my title", body="my body",
tag=[('TagE', 'TagD'), 'TagB']) is True
# Garbage Entries in tag field just get stripped out. the below
@ -602,8 +651,8 @@ def test_apprise_tagging(mock_post, mock_get):
# we fail. None is returned as a way of letting us know that we
# had Notifications to notify, but since none of them matched our tag
# none were notified.
assert a.notify(
title="my title", body="my body",
assert do_notify(
a, title="my title", body="my body",
tag=[(object, ), ]) is None
@ -1414,7 +1463,7 @@ def test_apprise_details_plugin_verification():
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
@mock.patch('requests.post')
@mock.patch('apprise.py3compat.asyncio.notify')
@mock.patch('apprise.py3compat.asyncio.notify', wraps=py3aio.notify)
def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
"""
API: Apprise() async_mode tests
@ -1474,8 +1523,8 @@ def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
# Send Notifications Syncronously
assert a.notify("sync") is True
# Verify our async code never got called
assert mock_async_notify.call_count == 0
# Verify our async code got called
assert mock_async_notify.call_count == 1
mock_async_notify.reset_mock()
# another way of looking a our false set asset configuration

Loading…
Cancel
Save