You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
apprise/test/test_plugin_mqtt.py

446 lines
14 KiB

# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2024, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import logging
import re
import sys
import ssl
from unittest.mock import call, Mock, ANY
import pytest
import apprise
from apprise.plugins.mqtt import NotifyMQTT
# Disable logging for a cleaner testing output
logging.disable(logging.CRITICAL)
@pytest.fixture
def mqtt_client_mock(mocker):
"""
Mocks an MQTT client and response and returns the mocked client.
"""
if "paho" not in sys.modules:
raise pytest.skip("Requires that `paho-mqtt` is installed")
# Establish mock of the `publish()` response object.
publish_result = Mock(**{
"rc": 0,
"is_published.return_value": True,
})
# Establish mock of the `Client()` object.
mock_client = Mock(**{
"connect.return_value": 0,
"reconnect.return_value": 0,
"is_connected.return_value": True,
"publish.return_value": publish_result,
})
mocker.patch(
"paho.mqtt.client.Client", return_value=mock_client)
return mock_client
@pytest.mark.skipif(
'paho' in sys.modules,
reason="Requires that `paho-mqtt` is NOT installed")
def test_plugin_mqtt_paho_import_error():
"""
Verify `NotifyMQTT` is disabled when `paho.mqtt.client` fails loading.
"""
# without the library, the object can't be instantiated
obj = apprise.Apprise.instantiate(
'mqtt://user:pass@localhost/my/topic')
assert obj is None
def test_plugin_mqtt_default_success(mqtt_client_mock):
"""
Verify `NotifyMQTT` succeeds and has appropriate default settings.
"""
# Instantiate the notifier.
obj = apprise.Apprise.instantiate(
'mqtt://localhost:1234/my/topic', suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
# We only loaded 1 topic
assert len(obj) == 1
assert obj.url().startswith('mqtt://localhost:1234/my/topic')
# Genrate the URL Identifier
assert isinstance(obj.url_id(), str)
# Verify default settings.
assert re.search(r'qos=0', obj.url())
assert re.search(r'version=v3.1.1', obj.url())
assert re.search(r'session=no', obj.url())
assert re.search(r'client_id=', obj.url()) is None
# Verify notification succeeds.
assert obj.notify(body="test=test") is True
# Send another notification (a new connection isn't attempted to be
# established as one already exists)
assert obj.notify(body="foo=bar") is True
# Verify the right calls have been made to the MQTT client object.
assert mqtt_client_mock.mock_calls == [
call.max_inflight_messages_set(200),
call.connect('localhost', port=1234, keepalive=30),
call.loop_start(),
call.is_connected(),
call.publish('my/topic', payload='test=test', qos=0, retain=False),
call.publish().is_published(),
call.is_connected(),
call.publish('my/topic', payload='foo=bar', qos=0, retain=False),
call.publish().is_published(),
]
def test_plugin_mqtt_multiple_topics_success(mqtt_client_mock):
"""
Verify submission to multiple MQTT topics.
"""
# Designate multiple topic targets.
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic,my/other/topic',
suppress_exceptions=False)
# Verify we have loaded 2 topics
assert len(obj) == 2
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtt://localhost')
assert re.search(r'my/topic', obj.url())
assert re.search(r'my/other/topic', obj.url())
assert obj.notify(body="test=test") is True
# Verify the right calls have been made to the MQTT client object.
assert mqtt_client_mock.mock_calls == [
call.max_inflight_messages_set(200),
call.connect('localhost', port=1883, keepalive=30),
call.loop_start(),
call.is_connected(),
call.publish('my/topic', payload='test=test', qos=0, retain=False),
call.publish().is_published(),
call.is_connected(),
call.publish('my/other/topic', payload='test=test', qos=0,
retain=False),
call.publish().is_published(),
]
def test_plugin_mqtt_to_success(mqtt_client_mock):
"""
Verify `NotifyMQTT` succeeds with the `to=` parameter.
"""
# Leverage the `to=` argument to identify the topic.
obj = apprise.Apprise.instantiate(
'mqtt://localhost?to=my/topic', suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtt://localhost/my/topic')
# Verify default settings.
assert re.search(r'qos=0', obj.url())
assert re.search(r'version=v3.1.1', obj.url())
# Verify notification succeeds.
assert obj.notify(body="test=test") is True
def test_plugin_mqtt_valid_settings_success(mqtt_client_mock):
"""
Verify settings as URL parameters will be accepted.
"""
# Instantiate the notifier.
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic?qos=1&version=v3.1',
suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtt://localhost')
assert re.search(r'qos=1', obj.url())
assert re.search(r'version=v3.1', obj.url())
def test_plugin_mqtt_invalid_settings_failure(mqtt_client_mock):
"""
Verify notifier instantiation croaks on invalid settings.
"""
# Test case for invalid/unsupported MQTT version.
with pytest.raises(TypeError):
apprise.Apprise.instantiate(
'mqtt://localhost?version=v1.0.0.0', suppress_exceptions=False)
# Test case for invalid/unsupported `qos`.
with pytest.raises(TypeError):
apprise.Apprise.instantiate(
'mqtt://localhost?qos=123', suppress_exceptions=False)
with pytest.raises(TypeError):
apprise.Apprise.instantiate(
'mqtt://localhost?qos=invalid', suppress_exceptions=False)
def test_plugin_mqtt_bad_url_failure(mqtt_client_mock):
"""
Verify notifier is disabled when using an invalid URL.
"""
obj = apprise.Apprise.instantiate('mqtt://', suppress_exceptions=False)
assert obj is None
def test_plugin_mqtt_no_topic_failure(mqtt_client_mock):
"""
Verify notification fails when no topic is given.
"""
obj = apprise.Apprise.instantiate(
'mqtt://localhost', suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_tls_connect_success(mqtt_client_mock):
"""
Verify TLS encrypted connections work.
"""
obj = apprise.Apprise.instantiate(
'mqtts://user:pass@localhost/my/topic', suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtts://user:pass@localhost/my/topic')
assert obj.notify(body="test=test") is True
# Verify the right calls have been made to the MQTT client object.
assert mqtt_client_mock.mock_calls == [
call.max_inflight_messages_set(200),
call.username_pw_set('user', password='pass'),
call.tls_set(
ca_certs=ANY,
certfile=None,
keyfile=None,
cert_reqs=ssl.VerifyMode.CERT_REQUIRED,
tls_version=ssl.PROTOCOL_TLS,
ciphers=None,
),
call.tls_insecure_set(False),
call.connect('localhost', port=8883, keepalive=30),
call.loop_start(),
call.is_connected(),
call.publish('my/topic', payload='test=test', qos=0, retain=False),
call.publish().is_published(),
]
def test_plugin_mqtt_tls_no_certificates_failure(mqtt_client_mock, mocker):
"""
Verify TLS does not work without access to CA root certificates.
"""
# Clear CA certificates.
mocker.patch.object(NotifyMQTT, "CA_CERTIFICATE_FILE_LOCATIONS", [])
obj = apprise.Apprise.instantiate(
'mqtts://user:pass@localhost/my/topic', suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
logger: Mock = mocker.spy(obj, "logger")
# Verify notification fails w/o CA certificates.
assert obj.notify(body="test=test") is False
assert logger.mock_calls == [
call.error("MQTT secure communication can not be verified, "
"CA certificates file missing")
]
def test_plugin_mqtt_tls_no_verify_success(mqtt_client_mock):
"""
Verify TLS encrypted connections work with `verify=False`.
"""
# A single user (not password) + no verifying of host
obj = apprise.Apprise.instantiate(
'mqtts://user:pass@localhost/my/topic?verify=False',
suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.notify(body="test=test") is True
# Verify the right calls have been made to the MQTT client object.
# Let's only validate the single call of interest is present.
# Everything else is identical with `test_plugin_mqtt_tls_connect_success`.
assert call.tls_insecure_set(True) in mqtt_client_mock.mock_calls
def test_plugin_mqtt_session_client_id_success(mqtt_client_mock):
"""
Verify handling `session=yes` and `client_id=` works.
"""
obj = apprise.Apprise.instantiate(
'mqtt://user@localhost/my/topic?session=yes&client_id=apprise',
suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtt://user@localhost')
assert re.search(r'my/topic', obj.url())
assert re.search(r'client_id=apprise', obj.url())
assert re.search(r'session=yes', obj.url())
assert re.search(r'retain=no', obj.url())
assert obj.notify(body="test=test") is True
def test_plugin_mqtt_retain(mqtt_client_mock):
"""
Verify handling of Retain Message Flag
"""
obj = apprise.Apprise.instantiate(
'mqtt://user@localhost/my/topic?retain=yes',
suppress_exceptions=False)
assert isinstance(obj, NotifyMQTT)
assert obj.url().startswith('mqtt://user@localhost')
assert re.search(r'my/topic', obj.url())
assert re.search(r'session=no', obj.url())
assert re.search(r'retain=yes', obj.url())
assert obj.notify(body="test=test") is True
def test_plugin_mqtt_connect_failure(mqtt_client_mock):
"""
Verify `NotifyMQTT` fails when MQTT `connect()` fails.
"""
# Emulate a situation where the `connect()` method fails.
mqtt_client_mock.connect.return_value = 2
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Verify notification fails.
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_reconnect_failure(mqtt_client_mock):
"""
Verify `NotifyMQTT` fails when MQTT `reconnect()` fails.
"""
# Emulate a situation where MQTT reconnect fails.
mqtt_client_mock.reconnect.return_value = 2
mqtt_client_mock.is_connected.return_value = False
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Verify notification fails.
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_publish_failure(mqtt_client_mock):
"""
Verify `NotifyMQTT` fails when MQTT `publish()` fails.
"""
# Emulate a situation where the `publish()` method fails.
mqtt_response = mqtt_client_mock.publish.return_value
mqtt_response.rc = 2
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Verify notification fails.
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_exception_failure(mqtt_client_mock):
"""
Verify `NotifyMQTT` fails when an exception happens.
"""
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Emulate a situation where `connect()` raises an exception.
mqtt_client_mock.connect.return_value = None
# Verify notification fails.
for side_effect in (
ValueError,
ConnectionError,
ssl.CertificateError):
mqtt_client_mock.connect.side_effect = side_effect
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_not_published_failure(mqtt_client_mock, mocker):
"""
Verify `NotifyMQTT` fails there if the message has not been published.
"""
# Speed up testing by making `NotifyMQTT` not block anywhere.
mocker.patch.object(NotifyMQTT, "socket_read_timeout", 0.00025)
mocker.patch.object(NotifyMQTT, "mqtt_block_time_sec", 0)
# Emulate a situation where `is_published()` returns `False`.
mqtt_response = mqtt_client_mock.publish.return_value
mqtt_response.is_published.return_value = False
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Verify notification fails.
assert obj.notify(body="test=test") is False
def test_plugin_mqtt_not_published_recovery_success(mqtt_client_mock):
"""
Verify `NotifyMQTT` success after recovering from is_published==False.
"""
# Emulate a situation where `is_published()` returns `False`.
mqtt_response = mqtt_client_mock.publish.return_value
mqtt_response.is_published.return_value = None
mqtt_response.is_published.side_effect = (False, True)
obj = apprise.Apprise.instantiate(
'mqtt://localhost/my/topic', suppress_exceptions=False)
# Verify notification fails.
assert obj.notify(body="test=test") is True