Added Matrix Attachment support (#921)

This commit is contained in:
Chris Caron
2023-08-13 20:00:34 -04:00
committed by GitHub
parent c56d4c500d
commit 207db69e1a
2 changed files with 376 additions and 18 deletions

View File

@@ -32,9 +32,10 @@
from unittest import mock
import os
import requests
import pytest
from apprise import AppriseAsset
from apprise import Apprise, AppriseAsset, AppriseAttachment, NotifyType
from json import dumps
from apprise.plugins.NotifyMatrix import NotifyMatrix
@@ -44,6 +45,17 @@ from helpers import AppriseURLTester
import logging
logging.disable(logging.CRITICAL)
MATRIX_GOOD_RESPONSE = dumps({
'room_id': '!abc123:localhost',
'room_alias': '#abc123:localhost',
'joined_rooms': ['!abc123:localhost', '!def456:localhost'],
'access_token': 'abcd1234',
'home_server': 'localhost',
})
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# Our Testing URLs
apprise_url_tests = (
##################################
@@ -97,6 +109,24 @@ apprise_url_tests = (
# user and token correctly specified with webhook
'instance': NotifyMatrix,
}),
('matrix://user:token@localhost:123/#general/?version=3', {
# Provide version over-ride (using version=)
'instance': NotifyMatrix,
# Our response expected server response
'requests_response_text': MATRIX_GOOD_RESPONSE,
'privacy_url': 'matrix://user:****@localhost:123',
}),
('matrixs://user:token@localhost/#general?v=2', {
# Provide version over-ride (using v=)
'instance': NotifyMatrix,
# Our response expected server response
'requests_response_text': MATRIX_GOOD_RESPONSE,
'privacy_url': 'matrixs://user:****@localhost',
}),
('matrix://user:token@localhost:123/#general/?v=invalid', {
# Invalid version specified
'instance': TypeError
}),
('matrix://user:token@localhost?mode=slack&format=text', {
# user and token correctly specified with webhook
'instance': NotifyMatrix,
@@ -842,3 +872,205 @@ def test_plugin_matrix_image_errors(mock_post, mock_get):
assert obj.access_token is None
assert obj.notify('test', 'test') is True
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_matrix_attachments_api_v3(mock_post, mock_get):
"""
NotifyMatrix() Attachment Checks (v3)
"""
# Prepare a good response
response = mock.Mock()
response.status_code = requests.codes.ok
response.content = MATRIX_GOOD_RESPONSE.encode('utf-8')
# Prepare a bad response
bad_response = mock.Mock()
bad_response.status_code = requests.codes.internal_server_error
# Prepare Mock return object
mock_post.return_value = response
mock_get.return_value = response
# Instantiate our object
obj = Apprise.instantiate('matrix://user:pass@localhost/#general?v=3')
# attach our content
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Test our call count
assert mock_post.call_count == 5
assert mock_post.call_args_list[0][0][0] == \
'http://localhost/_matrix/client/v3/login'
assert mock_post.call_args_list[1][0][0] == \
'http://localhost/_matrix/media/v3/upload'
assert mock_post.call_args_list[2][0][0] == \
'http://localhost/_matrix/client/v3/join/%23general%3Alocalhost'
assert mock_post.call_args_list[3][0][0] == \
'http://localhost/_matrix/client/v3/rooms/%21abc123%3Alocalhost/' \
'send/m.room.message'
assert mock_post.call_args_list[4][0][0] == \
'http://localhost/_matrix/client/v3/rooms/%21abc123%3Alocalhost/' \
'send/m.room.message'
# Attach an unsupported file type
attach = AppriseAttachment(
os.path.join(TEST_VAR_DIR, 'apprise-archive.zip'))
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# An invalid attachment will cause a failure
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
attach = AppriseAttachment(path)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=path) is False
# update our attachment to be valid
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
mock_post.return_value = None
# Throw an exception on the first call to requests.post()
for side_effect in (requests.RequestException(), OSError(), bad_response):
mock_post.side_effect = [side_effect]
# We'll fail now because of our error handling
assert obj.send(body="test", attach=attach) is False
# Throw an exception on the second call to requests.post()
for side_effect in (requests.RequestException(), OSError(), bad_response):
mock_post.side_effect = [response, side_effect]
# We'll fail now because of our error handling
assert obj.send(body="test", attach=attach) is False
# handle a bad response
bad_response = mock.Mock()
bad_response.status_code = requests.codes.internal_server_error
mock_post.side_effect = [response, bad_response]
# We'll fail now because of an internal exception
assert obj.send(body="test", attach=attach) is False
@mock.patch('requests.get')
@mock.patch('requests.post')
def test_plugin_matrix_attachments_api_v2(mock_post, mock_get):
"""
NotifyMatrix() Attachment Checks (v2)
"""
# Prepare a good response
response = mock.Mock()
response.status_code = requests.codes.ok
response.content = MATRIX_GOOD_RESPONSE.encode('utf-8')
# Prepare a bad response
bad_response = mock.Mock()
bad_response.status_code = requests.codes.internal_server_error
# Prepare Mock return object
mock_post.return_value = response
mock_get.return_value = response
# Instantiate our object
obj = Apprise.instantiate('matrix://user:pass@localhost/#general?v=3')
# attach our content
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
# Attach an unsupported file
mock_post.return_value = response
mock_get.return_value = response
mock_post.side_effect = None
mock_get.side_effect = None
# Force a object removal (thus a logout call)
del obj
# Reset our object
mock_post.reset_mock()
mock_get.reset_mock()
# Instantiate our object
obj = Apprise.instantiate('matrixs://user:pass@localhost/#general?v=2')
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is True
# Test our call count
assert mock_post.call_count == 5
assert mock_post.call_args_list[0][0][0] == \
'https://localhost/_matrix/client/r0/login'
assert mock_post.call_args_list[1][0][0] == \
'https://localhost/_matrix/media/r0/upload'
assert mock_post.call_args_list[2][0][0] == \
'https://localhost/_matrix/client/r0/join/%23general%3Alocalhost'
assert mock_post.call_args_list[3][0][0] == \
'https://localhost/_matrix/client/r0/rooms/%21abc123%3Alocalhost/' \
'send/m.room.message'
assert mock_post.call_args_list[4][0][0] == \
'https://localhost/_matrix/client/r0/rooms/%21abc123%3Alocalhost/' \
'send/m.room.message'
# Attach an unsupported file type
attach = AppriseAttachment(
os.path.join(TEST_VAR_DIR, 'apprise-archive.zip'))
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=attach) is False
# An invalid attachment will cause a failure
path = os.path.join(TEST_VAR_DIR, '/invalid/path/to/an/invalid/file.jpg')
attach = AppriseAttachment(path)
assert obj.notify(
body='body', title='title', notify_type=NotifyType.INFO,
attach=path) is False
# update our attachment to be valid
attach = AppriseAttachment(os.path.join(TEST_VAR_DIR, 'apprise-test.gif'))
mock_post.return_value = None
mock_get.return_value = None
# Throw an exception on the first call to requests.post()
for side_effect in (requests.RequestException(), OSError(), bad_response):
mock_post.side_effect = [side_effect]
mock_get.side_effect = [side_effect]
assert obj.send(body="test", attach=attach) is False
# Throw an exception on the second call to requests.post()
for side_effect in (requests.RequestException(), OSError(), bad_response):
mock_post.side_effect = [response, side_effect]
mock_get.side_effect = [side_effect]
# We'll fail now because of our error handling
assert obj.send(body="test", attach=attach) is False
# handle a bad response
bad_response = mock.Mock()
bad_response.status_code = requests.codes.internal_server_error
mock_post.side_effect = [response, bad_response]
mock_get.side_effect = [response, bad_response]
# We'll fail now because of an internal exception
assert obj.send(body="test", attach=attach) is False