Unit test to prove async_mode flag is behaving as expected (#297)

pull/300/head
Chris Caron 2020-09-17 19:33:34 -04:00 committed by GitHub
parent 784e073eea
commit 7187af5991
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 90 additions and 0 deletions

View File

@ -1400,6 +1400,96 @@ def test_apprise_details_plugin_verification():
assert arg in defined_tokens
@pytest.mark.skipif(sys.version_info.major <= 2, reason="Requires Python 3.x+")
@mock.patch('requests.post')
@mock.patch('apprise.py3compat.asyncio.notify')
def test_apprise_async_mode(mock_async_notify, mock_post, tmpdir):
"""
API: Apprise() async_mode tests
"""
mock_post.return_value.status_code = requests.codes.ok
# Define some servers
servers = [
'xml://localhost',
'json://localhost',
]
# Default Async Mode is to be enabled
asset = AppriseAsset()
assert asset.async_mode is True
# Load our asset
a = Apprise(asset=asset)
# add our servers
a.add(servers=servers)
# 2 servers loaded
assert len(a) == 2
# Our servers should carry this flag
for server in a:
assert server.asset.async_mode is True
# Send Notifications Asyncronously
assert a.notify("async") is True
# Verify our async code got executed
assert mock_async_notify.call_count == 1
mock_async_notify.reset_mock()
# Provide an over-ride now
asset = AppriseAsset(async_mode=False)
assert asset.async_mode is False
# Load our asset
a = Apprise(asset=asset)
# Verify our configuration kept
assert a.asset.async_mode is False
# add our servers
a.add(servers=servers)
# 2 servers loaded
assert len(a) == 2
# Our servers should carry this flag
for server in a:
assert server.asset.async_mode is False
# Send Notifications Syncronously
assert a.notify("sync") is True
# Verify our async code never got called
assert mock_async_notify.call_count == 0
mock_async_notify.reset_mock()
# another way of looking a our false set asset configuration
assert a[0].asset.async_mode is False
assert a[1].asset.async_mode is False
# Adjust 1 of the servers async_mode settings
a[0].asset.async_mode = True
assert a[0].asset.async_mode is True
# They all share the same object, so this gets toggled too
assert a[1].asset.async_mode is True
# We'll just change this one
a[1].asset = AppriseAsset(async_mode=False)
assert a[0].asset.async_mode is True
assert a[1].asset.async_mode is False
# Send 1 Notification Syncronously, the other Asyncronously
assert a.notify("a mixed batch") is True
# Verify our async code got called
assert mock_async_notify.call_count == 1
mock_async_notify.reset_mock()
def test_notify_matrix_dynamic_importing(tmpdir):
"""
API: Apprise() Notify Matrix Importing