|
|
|
@ -23,29 +23,55 @@
|
|
|
|
|
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN |
|
|
|
|
# THE SOFTWARE. |
|
|
|
|
|
|
|
|
|
import logging |
|
|
|
|
import re |
|
|
|
|
import sys |
|
|
|
|
import ssl |
|
|
|
|
from unittest.mock import call, Mock, ANY |
|
|
|
|
|
|
|
|
|
import pytest |
|
|
|
|
from unittest import mock |
|
|
|
|
|
|
|
|
|
import apprise |
|
|
|
|
from apprise.plugins.NotifyMQTT import NotifyMQTT |
|
|
|
|
|
|
|
|
|
# Disable logging for a cleaner testing output |
|
|
|
|
import logging |
|
|
|
|
logging.disable(logging.CRITICAL) |
|
|
|
|
|
|
|
|
|
from apprise.plugins.NotifyMQTT import NotifyMQTT |
|
|
|
|
|
|
|
|
|
logging.disable(logging.CRITICAL) |
|
|
|
|
@pytest.fixture |
|
|
|
|
def mqtt_client_mock(mocker): |
|
|
|
|
""" |
|
|
|
|
Mocks an MQTT client and response and returns the mocked client. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
if "paho" not in sys.modules: |
|
|
|
|
raise pytest.skip(reason="Requires that `paho-mqtt` is installed") |
|
|
|
|
|
|
|
|
|
# Establish mock of the `publish()` response object. |
|
|
|
|
publish_result = Mock(**{ |
|
|
|
|
"rc": 0, |
|
|
|
|
"is_published.return_value": True, |
|
|
|
|
}) |
|
|
|
|
|
|
|
|
|
# Establish mock of the `Client()` object. |
|
|
|
|
mock_client = Mock(**{ |
|
|
|
|
"connect.return_value": 0, |
|
|
|
|
"reconnect.return_value": 0, |
|
|
|
|
"is_connected.return_value": True, |
|
|
|
|
"publish.return_value": publish_result, |
|
|
|
|
}) |
|
|
|
|
mocker.patch( |
|
|
|
|
"paho.mqtt.client.Client", return_value=mock_client) |
|
|
|
|
|
|
|
|
|
return mock_client |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif( |
|
|
|
|
'paho' in sys.modules, |
|
|
|
|
reason="Requires that cryptography NOT be installed") |
|
|
|
|
@mock.patch('requests.post') |
|
|
|
|
def test_plugin_mqtt_paho_import_error(mock_post): |
|
|
|
|
reason="Requires that `paho-mqtt` is NOT installed") |
|
|
|
|
def test_plugin_mqtt_paho_import_error(): |
|
|
|
|
""" |
|
|
|
|
NotifyFCM Cryptography loading failure |
|
|
|
|
Verify `NotifyMQTT` is disabled when `paho.mqtt.client` fails loading. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# without the library, the object can't be instantiated |
|
|
|
@ -54,185 +80,337 @@ def test_plugin_mqtt_paho_import_error(mock_post):
|
|
|
|
|
assert obj is None |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.skipif( |
|
|
|
|
'paho' not in sys.modules, reason="Requires paho-mqtt") |
|
|
|
|
@mock.patch('paho.mqtt.client.Client') |
|
|
|
|
def test_plugin_mqtt_general(mock_client): |
|
|
|
|
def test_plugin_mqtt_default_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
NotifyMQTT() General Checks |
|
|
|
|
|
|
|
|
|
Verify `NotifyMQTT` succeeds and has appropriate default settings. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# our call to publish() response object |
|
|
|
|
publish_result = mock.Mock() |
|
|
|
|
publish_result.rc = 0 |
|
|
|
|
publish_result.is_published.return_value = True |
|
|
|
|
|
|
|
|
|
# Our mqtt.Client() object |
|
|
|
|
_mock_client = mock.Mock() |
|
|
|
|
_mock_client.connect.return_value = 0 |
|
|
|
|
_mock_client.reconnect.return_value = 0 |
|
|
|
|
_mock_client.is_connected.return_value = True |
|
|
|
|
_mock_client.publish.return_value = publish_result |
|
|
|
|
mock_client.return_value = _mock_client |
|
|
|
|
|
|
|
|
|
# Instantiate our object |
|
|
|
|
# Instantiate the notifier. |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost:1234/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtt://localhost:1234/my/topic') |
|
|
|
|
# Detect our defaults |
|
|
|
|
|
|
|
|
|
# Verify default settings. |
|
|
|
|
assert re.search(r'qos=0', obj.url()) |
|
|
|
|
assert re.search(r'version=v3.1.1', obj.url()) |
|
|
|
|
# Send a good notification |
|
|
|
|
assert re.search(r'session=no', obj.url()) |
|
|
|
|
assert re.search(r'client_id=', obj.url()) is None |
|
|
|
|
|
|
|
|
|
# Verify notification succeeds. |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# leverage the to= argument to identify our topic |
|
|
|
|
# Send another notification (a new connection isn't attempted to be |
|
|
|
|
# established as one already exists) |
|
|
|
|
assert obj.notify(body="foo=bar") is True |
|
|
|
|
|
|
|
|
|
# Verify the right calls have been made to the MQTT client object. |
|
|
|
|
assert mqtt_client_mock.mock_calls == [ |
|
|
|
|
call.max_inflight_messages_set(200), |
|
|
|
|
call.connect('localhost', port=1234, keepalive=30), |
|
|
|
|
call.loop_start(), |
|
|
|
|
call.is_connected(), |
|
|
|
|
call.publish('my/topic', payload='test=test', qos=0, retain=False), |
|
|
|
|
call.publish().is_published(), |
|
|
|
|
call.is_connected(), |
|
|
|
|
call.publish('my/topic', payload='foo=bar', qos=0, retain=False), |
|
|
|
|
call.publish().is_published(), |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_multiple_topics_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify submission to multiple MQTT topics. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Designate multiple topic targets. |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic,my/other/topic', |
|
|
|
|
suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtt://localhost') |
|
|
|
|
assert re.search(r'my/topic', obj.url()) |
|
|
|
|
assert re.search(r'my/other/topic', obj.url()) |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# Verify the right calls have been made to the MQTT client object. |
|
|
|
|
assert mqtt_client_mock.mock_calls == [ |
|
|
|
|
call.max_inflight_messages_set(200), |
|
|
|
|
call.connect('localhost', port=1883, keepalive=30), |
|
|
|
|
call.loop_start(), |
|
|
|
|
call.is_connected(), |
|
|
|
|
call.publish('my/topic', payload='test=test', qos=0, retain=False), |
|
|
|
|
call.publish().is_published(), |
|
|
|
|
call.is_connected(), |
|
|
|
|
call.publish('my/other/topic', payload='test=test', qos=0, |
|
|
|
|
retain=False), |
|
|
|
|
call.publish().is_published(), |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_to_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` succeeds with the `to=` parameter. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Leverage the `to=` argument to identify the topic. |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost?to=my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtt://localhost/my/topic') |
|
|
|
|
# Detect our defaults |
|
|
|
|
|
|
|
|
|
# Verify default settings. |
|
|
|
|
assert re.search(r'qos=0', obj.url()) |
|
|
|
|
assert re.search(r'version=v3.1.1', obj.url()) |
|
|
|
|
# Send a good notification |
|
|
|
|
|
|
|
|
|
# Verify notification succeeds. |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# Send a notification in a situation where our publish failed |
|
|
|
|
publish_result.rc = 2 |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
# Toggle our response object back to what it should be |
|
|
|
|
publish_result.rc = 0 |
|
|
|
|
|
|
|
|
|
# Test case where we provide an invalid/unsupported mqtt version |
|
|
|
|
def test_plugin_mqtt_valid_settings_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify settings as URL parameters will be accepted. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Instantiate the notifier. |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic?qos=1&version=v3.1', |
|
|
|
|
suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtt://localhost') |
|
|
|
|
assert re.search(r'qos=1', obj.url()) |
|
|
|
|
assert re.search(r'version=v3.1', obj.url()) |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_invalid_settings_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify notifier instantiation croaks on invalid settings. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Test case for invalid/unsupported MQTT version. |
|
|
|
|
with pytest.raises(TypeError): |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost?version=v1.0.0.0', suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
# Test case where we provide an invalid/unsupported qos |
|
|
|
|
# Test case for invalid/unsupported `qos`. |
|
|
|
|
with pytest.raises(TypeError): |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost?qos=123', suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
with pytest.raises(TypeError): |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost?qos=invalid', suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
# Test a bad URL |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_bad_url_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify notifier is disabled when using an invalid URL. |
|
|
|
|
""" |
|
|
|
|
obj = apprise.Apprise.instantiate('mqtt://', suppress_exceptions=False) |
|
|
|
|
assert obj is None |
|
|
|
|
|
|
|
|
|
# Instantiate our object without any topics |
|
|
|
|
# we also test that we can set our qos and version if we want from |
|
|
|
|
# the URL |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_no_topic_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify notification fails when no topic is given. |
|
|
|
|
""" |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost?qos=1&version=v3.1', suppress_exceptions=False) |
|
|
|
|
'mqtt://localhost', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtt://localhost') |
|
|
|
|
assert re.search(r'qos=1', obj.url()) |
|
|
|
|
assert re.search(r'version=v3.1', obj.url()) |
|
|
|
|
assert re.search(r'session=no', obj.url()) |
|
|
|
|
assert re.search(r'client_id=', obj.url()) is None |
|
|
|
|
|
|
|
|
|
# Our notification will fail because we have no topics to notify |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
|
|
|
|
|
# A Secure URL |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_tls_connect_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify TLS encrypted connections work. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtts://user:pass@localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtts://user:pass@localhost/my/topic') |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# Clear CA Certificates |
|
|
|
|
ca_certs_backup = \ |
|
|
|
|
list(NotifyMQTT.CA_CERTIFICATE_FILE_LOCATIONS) |
|
|
|
|
NotifyMQTT.CA_CERTIFICATE_FILE_LOCATIONS = [] |
|
|
|
|
# Verify the right calls have been made to the MQTT client object. |
|
|
|
|
assert mqtt_client_mock.mock_calls == [ |
|
|
|
|
call.max_inflight_messages_set(200), |
|
|
|
|
call.username_pw_set('user', password='pass'), |
|
|
|
|
call.tls_set( |
|
|
|
|
ca_certs=ANY, |
|
|
|
|
certfile=None, |
|
|
|
|
keyfile=None, |
|
|
|
|
cert_reqs=ssl.VerifyMode.CERT_REQUIRED, |
|
|
|
|
tls_version=ssl.PROTOCOL_TLS, |
|
|
|
|
ciphers=None, |
|
|
|
|
), |
|
|
|
|
call.tls_insecure_set(True), |
|
|
|
|
call.connect('localhost', port=8883, keepalive=30), |
|
|
|
|
call.loop_start(), |
|
|
|
|
call.is_connected(), |
|
|
|
|
call.publish('my/topic', payload='test=test', qos=0, retain=False), |
|
|
|
|
call.publish().is_published(), |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_tls_no_certificates_failure(mqtt_client_mock, mocker): |
|
|
|
|
""" |
|
|
|
|
Verify TLS does not work without access to CA root certificates. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Clear CA certificates. |
|
|
|
|
mocker.patch.object(NotifyMQTT, "CA_CERTIFICATE_FILE_LOCATIONS", []) |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtts://user:pass@localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtts://user:pass@localhost/my/topic') |
|
|
|
|
|
|
|
|
|
# A notification is not possible now (without ca_certs) |
|
|
|
|
logger: Mock = mocker.spy(obj, "logger") |
|
|
|
|
|
|
|
|
|
# Verify notification fails w/o CA certificates. |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
|
|
|
|
|
# Restore our certificates (for future tests) |
|
|
|
|
NotifyMQTT.CA_CERTIFICATE_FILE_LOCATIONS = ca_certs_backup |
|
|
|
|
assert logger.mock_calls == [ |
|
|
|
|
call.error("MQTT secure communication can not be verified, " |
|
|
|
|
"CA certificates file missing") |
|
|
|
|
] |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_tls_no_verify_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify TLS encrypted connections work with `verify=False`. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# A single user (not password) + no verifying of host |
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtts://user@localhost/my/topic,my/other/topic?verify=False', |
|
|
|
|
'mqtts://user:pass@localhost/my/topic?verify=False', |
|
|
|
|
suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtts://user@localhost') |
|
|
|
|
assert re.search(r'my/other/topic', obj.url()) |
|
|
|
|
assert re.search(r'my/topic', obj.url()) |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# Session and client_id handling |
|
|
|
|
# Verify the right calls have been made to the MQTT client object. |
|
|
|
|
# Let's only validate the single call of interest is present. |
|
|
|
|
# Everything else is identical with `test_plugin_mqtt_tls_connect_success`. |
|
|
|
|
assert call.tls_insecure_set(False) in mqtt_client_mock.mock_calls |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_session_client_id_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify handling `session=yes` and `client_id=` works. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtts://user@localhost/my/topic?session=yes&client_id=apprise', |
|
|
|
|
'mqtt://user@localhost/my/topic?session=yes&client_id=apprise', |
|
|
|
|
suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.url().startswith('mqtts://user@localhost') |
|
|
|
|
assert obj.url().startswith('mqtt://user@localhost') |
|
|
|
|
assert re.search(r'my/topic', obj.url()) |
|
|
|
|
assert re.search(r'client_id=apprise', obj.url()) |
|
|
|
|
assert re.search(r'session=yes', obj.url()) |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# handle case where we fail to connect |
|
|
|
|
_mock_client.connect.return_value = 2 |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_connect_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` fails when MQTT `connect()` fails. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Emulate a situation where the `connect()` method fails. |
|
|
|
|
mqtt_client_mock.connect.return_value = 2 |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
# Restore our values |
|
|
|
|
_mock_client.connect.return_value = 0 |
|
|
|
|
|
|
|
|
|
# handle case where we fail to reconnect |
|
|
|
|
_mock_client.reconnect.return_value = 2 |
|
|
|
|
_mock_client.is_connected.return_value = False |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_reconnect_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` fails when MQTT `reconnect()` fails. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Emulate a situation where MQTT reconnect fails. |
|
|
|
|
mqtt_client_mock.reconnect.return_value = 2 |
|
|
|
|
mqtt_client_mock.is_connected.return_value = False |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
# Restore our values |
|
|
|
|
_mock_client.reconnect.return_value = 0 |
|
|
|
|
_mock_client.is_connected.return_value = True |
|
|
|
|
|
|
|
|
|
# handle case where we fail to publish() |
|
|
|
|
publish_result.rc = 2 |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_publish_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` fails when MQTT `publish()` fails. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Emulate a situation where the `publish()` method fails. |
|
|
|
|
mqtt_response = mqtt_client_mock.publish.return_value |
|
|
|
|
mqtt_response.rc = 2 |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
# Restore our values |
|
|
|
|
publish_result.rc = 0 |
|
|
|
|
# Set another means of failing publish() |
|
|
|
|
publish_result.is_published.return_value = False |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
# Restore our values |
|
|
|
|
publish_result.is_published.return_value = True |
|
|
|
|
# Verify that was all we had to do |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
# A slight variation on the same failure (but with recovery) |
|
|
|
|
publish_result.is_published.return_value = None |
|
|
|
|
publish_result.is_published.side_effect = (False, True) |
|
|
|
|
# Our notification is still sent okay |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|
|
|
|
|
|
# Exception handling |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_exception_failure(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` fails when an exception happens. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
assert isinstance(obj, NotifyMQTT) |
|
|
|
|
_mock_client.connect.return_value = None |
|
|
|
|
|
|
|
|
|
# Emulate a situation where `connect()` raises an exception. |
|
|
|
|
mqtt_client_mock.connect.return_value = None |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
for side_effect in ( |
|
|
|
|
ValueError, |
|
|
|
|
ConnectionError, |
|
|
|
|
ssl.CertificateError): |
|
|
|
|
_mock_client.connect.side_effect = side_effect |
|
|
|
|
mqtt_client_mock.connect.side_effect = side_effect |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
|
|
|
|
|
# Restore our values |
|
|
|
|
_mock_client.connect.side_effect = None |
|
|
|
|
_mock_client.connect.return_value = 0 |
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_not_published_failure(mqtt_client_mock, mocker): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` fails there if the message has not been published. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Speed up testing by making `NotifyMQTT` not block anywhere. |
|
|
|
|
mocker.patch.object(NotifyMQTT, "socket_read_timeout", 0.00025) |
|
|
|
|
mocker.patch.object(NotifyMQTT, "mqtt_block_time_sec", 0) |
|
|
|
|
|
|
|
|
|
# Emulate a situation where `is_published()` returns `False`. |
|
|
|
|
mqtt_response = mqtt_client_mock.publish.return_value |
|
|
|
|
mqtt_response.is_published.return_value = False |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
assert obj.notify(body="test=test") is False |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_plugin_mqtt_not_published_recovery_success(mqtt_client_mock): |
|
|
|
|
""" |
|
|
|
|
Verify `NotifyMQTT` success after recovering from is_published==False. |
|
|
|
|
""" |
|
|
|
|
|
|
|
|
|
# Emulate a situation where `is_published()` returns `False`. |
|
|
|
|
mqtt_response = mqtt_client_mock.publish.return_value |
|
|
|
|
mqtt_response.is_published.return_value = None |
|
|
|
|
mqtt_response.is_published.side_effect = (False, True) |
|
|
|
|
|
|
|
|
|
obj = apprise.Apprise.instantiate( |
|
|
|
|
'mqtt://localhost/my/topic', suppress_exceptions=False) |
|
|
|
|
|
|
|
|
|
# Verify notification fails. |
|
|
|
|
assert obj.notify(body="test=test") is True |
|
|
|
|