Fix notify() failing when being called from an existing asynchronous event loop (#624)

pull/631/head
Ryan Young 2022-07-13 15:46:47 -07:00 committed by GitHub
parent 0be8f90244
commit b34051ccaf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 81 additions and 5 deletions

View File

@ -424,7 +424,7 @@ class Apprise(object):
except TypeError:
# No notifications sent, and there was an internal error.
return py3compat.asyncio.toasyncwrap(False)
return py3compat.asyncio.toasyncwrapvalue(False)
else:
if len(coroutines) > 0:
@ -433,7 +433,7 @@ class Apprise(object):
else:
# No notifications sent.
return py3compat.asyncio.toasyncwrap(None)
return py3compat.asyncio.toasyncwrapvalue(None)
@staticmethod
def _notifyhandler(server, **kwargs):
@ -470,7 +470,7 @@ class Apprise(object):
# Send the notification immediately, and wrap the result in a
# coroutine.
status = Apprise._notifyhandler(server, **kwargs)
return py3compat.asyncio.toasyncwrap(status)
return py3compat.asyncio.toasyncwrapvalue(status)
def _notifyall(self, handler, body, title='', notify_type=NotifyType.INFO,
body_format=None, tag=MATCH_ALL_TAG, match_always=True,

View File

@ -63,7 +63,20 @@ def tosync(cor, debug=False):
"""
if ASYNCIO_RUN_SUPPORT:
return asyncio.run(cor, debug=debug)
try:
loop = asyncio.get_running_loop()
except RuntimeError:
# There is no existing event loop, so we can start our own.
return asyncio.run(cor, debug=debug)
else:
# Enable debug mode
loop.set_debug(debug)
# Run the coroutine and wait for the result.
task = loop.create_task(cor)
return asyncio.ensure_future(task, loop=loop)
else:
# The Deprecated Way (<= Python v3.6)
@ -85,7 +98,7 @@ def tosync(cor, debug=False):
return loop.run_until_complete(cor)
async def toasyncwrap(v): # noqa: E999
async def toasyncwrapvalue(v): # noqa: E999
"""
Create a coroutine that, when run, returns the provided value.
"""
@ -93,6 +106,14 @@ async def toasyncwrap(v): # noqa: E999
return v
async def toasyncwrap(fn): # noqa: E999
"""
Create a coroutine that, when run, executes the provided function.
"""
return fn()
class AsyncNotifyBase(URLBase):
"""
asyncio wrapper for the NotifyBase object

View File

@ -24,6 +24,7 @@
# THE SOFTWARE.
from __future__ import print_function
import six
import sys
import pytest
from apprise import Apprise
@ -32,6 +33,9 @@ from apprise import NotifyFormat
from apprise.plugins import SCHEMA_MAP
if not six.PY2:
import apprise.py3compat.asyncio as py3aio
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
@ -106,3 +110,54 @@ def test_apprise_asyncio_runtime_error():
finally:
# Restore our event loop (in the event the above test failed)
asyncio.set_event_loop(loop)
@pytest.mark.skipif(sys.version_info.major <= 2 or sys.version_info < (3, 7),
reason="Requires Python 3.7+")
def test_apprise_works_in_async_loop():
"""
API: Apprise() can execute synchronously in an existing event loop
"""
class GoodNotification(NotifyBase):
def __init__(self, **kwargs):
super(GoodNotification, self).__init__(
notify_format=NotifyFormat.HTML, **kwargs)
def url(self, **kwargs):
# Support URL
return ''
def send(self, **kwargs):
# Pretend everything is okay
return True
@staticmethod
def parse_url(url, *args, **kwargs):
# always parseable
return NotifyBase.parse_url(url, verify_host=False)
# Store our good notification in our schema map
SCHEMA_MAP['good'] = GoodNotification
# Create ourselves an Apprise object
a = Apprise()
# Add a few entries
for _ in range(25):
a.add('good://')
# To ensure backwards compatibility, it should be possible to call
# asynchronous Apprise methods from code that already uses an event loop,
# even when using the synchronous notify() method.
# see https://github.com/caronc/apprise/issues/610
import asyncio
def try_notify():
a.notify(title="title", body="body")
# Convert to a coroutine to run asynchronously.
cor = py3aio.toasyncwrap(try_notify)
# Should execute successfully.
asyncio.run(cor)