mirror of https://github.com/caronc/apprise
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
942 lines
31 KiB
942 lines
31 KiB
# -*- coding: utf-8 -*- |
|
# BSD 2-Clause License |
|
# |
|
# Apprise - Push Notification Library. |
|
# Copyright (c) 2023, Chris Caron <lead2gold@gmail.com> |
|
# |
|
# Redistribution and use in source and binary forms, with or without |
|
# modification, are permitted provided that the following conditions are met: |
|
# |
|
# 1. Redistributions of source code must retain the above copyright notice, |
|
# this list of conditions and the following disclaimer. |
|
# |
|
# 2. Redistributions in binary form must reproduce the above copyright notice, |
|
# this list of conditions and the following disclaimer in the documentation |
|
# and/or other materials provided with the distribution. |
|
# |
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" |
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE |
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE |
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE |
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR |
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF |
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS |
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN |
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) |
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE |
|
# POSSIBILITY OF SUCH DAMAGE. |
|
# |
|
# Great Resources: |
|
# - Dev/Legacy API: |
|
# https://firebase.google.com/docs/cloud-messaging/http-server-ref |
|
# - Legacy API (v1) -> OAuth |
|
# - https://firebase.google.com/docs/cloud-messaging/migrate-v1 |
|
|
|
import os |
|
import sys |
|
from unittest import mock |
|
|
|
import pytest |
|
import requests |
|
import json |
|
from apprise import Apprise |
|
from apprise.plugins.NotifyFCM import NotifyFCM |
|
from helpers import AppriseURLTester |
|
|
|
try: |
|
from apprise.plugins.NotifyFCM.oauth import GoogleOAuth |
|
from apprise.plugins.NotifyFCM.common import FCM_MODES |
|
from apprise.plugins.NotifyFCM.priority import ( |
|
FCMPriorityManager, FCM_PRIORITIES) |
|
from apprise.plugins.NotifyFCM.color import FCMColorManager |
|
from cryptography.exceptions import UnsupportedAlgorithm |
|
|
|
except ImportError: |
|
# No problem; there is no cryptography support |
|
pass |
|
|
|
|
|
# Disable logging for a cleaner testing output |
|
import logging |
|
logging.disable(logging.CRITICAL) |
|
|
|
# Test files for KeyFile Directory |
|
PRIVATE_KEYFILE_DIR = os.path.join(os.path.dirname(__file__), 'var', 'fcm') |
|
FCM_KEYFILE = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') |
|
|
|
|
|
# Our Testing URLs |
|
apprise_url_tests = ( |
|
('fcm://', { |
|
# We failed to identify any valid authentication |
|
'instance': TypeError, |
|
}), |
|
('fcm://:@/', { |
|
# We failed to identify any valid authentication |
|
'instance': TypeError, |
|
}), |
|
('fcm://project@%20%20/', { |
|
# invalid apikey |
|
'instance': TypeError, |
|
}), |
|
('fcm://apikey/', { |
|
# no project id specified so we operate in legacy mode |
|
'instance': NotifyFCM, |
|
# but there are no targets specified so we return False |
|
'notify_response': False, |
|
}), |
|
('fcm://apikey/device', { |
|
# Valid device |
|
'instance': NotifyFCM, |
|
'privacy_url': 'fcm://a...y/device', |
|
}), |
|
('fcm://apikey/#topic', { |
|
# Valid topic |
|
'instance': NotifyFCM, |
|
'privacy_url': 'fcm://a...y/%23topic', |
|
}), |
|
('fcm://apikey/device?mode=invalid', { |
|
# Valid device, invalid mode |
|
'instance': TypeError, |
|
}), |
|
('fcm://apikey/#topic1/device/%20/', { |
|
# Valid topic, valid device, and invalid entry |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://apikey?to=#topic1,device', { |
|
# Test to= |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://?apikey=abc123&to=device', { |
|
# Test apikey= to= |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://?apikey=abc123&to=device&image=yes', { |
|
# Test image boolean |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://?apikey=abc123&to=device&color=no', { |
|
# Disable colors |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://?apikey=abc123&to=device&color=aabbcc', { |
|
# custom colors |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://?apikey=abc123&to=device' |
|
'&image_url=http://example.com/interesting.jpg', { |
|
# Test image_url |
|
'instance': NotifyFCM}), |
|
('fcm://?apikey=abc123&to=device' |
|
'&image_url=http://example.com/interesting.jpg&image=no', { |
|
# Test image_url but set to no |
|
'instance': NotifyFCM}), |
|
('fcm://?apikey=abc123&to=device&+key=value&+key2=value2', { |
|
# Test apikey= to= and data arguments |
|
'instance': NotifyFCM, |
|
}), |
|
('fcm://%20?to=device&keyfile=/invalid/path', { |
|
# invalid Project ID |
|
'instance': TypeError, |
|
}), |
|
('fcm://project_id?to=device&keyfile=/invalid/path', { |
|
# Test to= and auto-detection of OAuth mode |
|
'instance': NotifyFCM, |
|
# we'll fail to send our notification as a result |
|
'response': False, |
|
}), |
|
('fcm://?to=device&project=project_id&keyfile=/invalid/path', { |
|
# Test project= & to= and auto detection of OAuth mode |
|
'instance': NotifyFCM, |
|
# we'll fail to send our notification as a result |
|
'response': False, |
|
}), |
|
('fcm://project_id?to=device&mode=oauth2', { |
|
# no keyfile was specified |
|
'instance': TypeError, |
|
}), |
|
('fcm://project_id?to=device&mode=oauth2&keyfile=/invalid/path', { |
|
# Same test as above except we explicitly set our oauth2 mode |
|
# Test to= and auto-detection of OAuth mode |
|
'instance': NotifyFCM, |
|
# we'll fail to send our notification as a result |
|
'response': False, |
|
}), |
|
('fcm://apikey/#topic1/device/?mode=legacy', { |
|
'instance': NotifyFCM, |
|
# throw a bizarre code forcing us to fail to look it up |
|
'response': False, |
|
'requests_response_code': 999, |
|
}), |
|
('fcm://apikey/#topic1/device/?mode=legacy', { |
|
'instance': NotifyFCM, |
|
# Throws a series of connection and transfer exceptions when this flag |
|
# is set and tests that we gracefully handle them |
|
'test_requests_exceptions': True, |
|
}), |
|
('fcm://project/#topic1/device/?mode=oauth2&keyfile=file://{}'.format( |
|
os.path.join( |
|
os.path.dirname(__file__), 'var', 'fcm', |
|
'service_account.json')), { |
|
'instance': NotifyFCM, |
|
# throw a bizarre code forcing us to fail to look it up |
|
'response': False, |
|
'requests_response_code': 999, |
|
}), |
|
('fcm://projectid/#topic1/device/?mode=oauth2&keyfile=file://{}'.format( |
|
os.path.join( |
|
os.path.dirname(__file__), 'var', 'fcm', |
|
'service_account.json')), { |
|
'instance': NotifyFCM, |
|
# Throws a series of connection and transfer exceptions when |
|
# this flag is set and tests that we gracefully handle them |
|
'test_requests_exceptions': True, |
|
}), |
|
) |
|
|
|
|
|
@pytest.fixture |
|
def mock_post(mocker): |
|
""" |
|
Prepare a good OAuth mock response. |
|
""" |
|
|
|
mock_thing = mocker.patch("requests.post") |
|
|
|
response = mock.Mock() |
|
response.content = json.dumps({ |
|
"access_token": "ya29.c.abcd", |
|
"expires_in": 3599, |
|
"token_type": "Bearer", |
|
}) |
|
response.status_code = requests.codes.ok |
|
mock_thing.return_value = response |
|
|
|
return mock_thing |
|
|
|
|
|
@pytest.fixture |
|
def mock_post_legacy(mocker): |
|
""" |
|
Prepare a good legacy mock response. |
|
""" |
|
|
|
mock_thing = mocker.patch("requests.post") |
|
|
|
response = mock.Mock() |
|
response.status_code = requests.codes.ok |
|
mock_thing.return_value = response |
|
|
|
return mock_thing |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_urls(): |
|
""" |
|
NotifyFCM() Apprise URLs |
|
|
|
""" |
|
|
|
# Run our general tests |
|
AppriseURLTester(tests=apprise_url_tests).run_all() |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_legacy_default(mock_post_legacy): |
|
""" |
|
NotifyFCM() Legacy/APIKey default checks. |
|
""" |
|
|
|
# A valid Legacy URL |
|
obj = Apprise.instantiate( |
|
'fcm://abc123/device/' |
|
'?+key=value&+key2=value2' |
|
'&image_url=https://example.com/interesting.png') |
|
|
|
# Send our notification |
|
assert obj.notify("test") is True |
|
|
|
# Test our call count |
|
assert mock_post_legacy.call_count == 1 |
|
assert mock_post_legacy.call_args_list[0][0][0] == \ |
|
'https://fcm.googleapis.com/fcm/send' |
|
|
|
payload = mock_post_legacy.mock_calls[0][2] |
|
data = json.loads(payload['data']) |
|
assert 'data' in data |
|
assert isinstance(data, dict) |
|
assert 'key' in data['data'] |
|
assert data['data']['key'] == 'value' |
|
assert 'key2' in data['data'] |
|
assert data['data']['key2'] == 'value2' |
|
|
|
assert 'notification' in data |
|
assert isinstance(data['notification'], dict) |
|
assert 'notification' in data['notification'] |
|
assert isinstance(data['notification']['notification'], dict) |
|
assert 'image' in data['notification']['notification'] |
|
assert data['notification']['notification']['image'] == \ |
|
'https://example.com/interesting.png' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_legacy_priorities(mock_post_legacy): |
|
""" |
|
NotifyFCM() Legacy/APIKey priorities checks. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
'fcm://abc123/device/?priority=low') |
|
assert mock_post_legacy.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post_legacy.call_count == 1 |
|
assert mock_post_legacy.call_args_list[0][0][0] == \ |
|
'https://fcm.googleapis.com/fcm/send' |
|
|
|
payload = mock_post_legacy.mock_calls[0][2] |
|
data = json.loads(payload['data']) |
|
assert 'data' not in data |
|
assert 'notification' in data |
|
assert isinstance(data['notification'], dict) |
|
assert 'notification' in data['notification'] |
|
assert isinstance(data['notification']['notification'], dict) |
|
assert 'image' not in data['notification']['notification'] |
|
assert 'priority' in data |
|
|
|
# legacy can only switch between high/low |
|
assert data['priority'] == "normal" |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_legacy_no_colors(mock_post_legacy): |
|
""" |
|
NotifyFCM() Legacy/APIKey `color=no` checks. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
'fcm://abc123/device/?color=no') |
|
assert mock_post_legacy.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post_legacy.call_count == 1 |
|
assert mock_post_legacy.call_args_list[0][0][0] == \ |
|
'https://fcm.googleapis.com/fcm/send' |
|
|
|
payload = mock_post_legacy.mock_calls[0][2] |
|
data = json.loads(payload['data']) |
|
assert 'data' not in data |
|
assert 'notification' in data |
|
assert isinstance(data['notification'], dict) |
|
assert 'notification' in data['notification'] |
|
assert isinstance(data['notification']['notification'], dict) |
|
assert 'image' not in data['notification']['notification'] |
|
assert 'color' not in data['notification']['notification'] |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_legacy_colors(mock_post_legacy): |
|
""" |
|
NotifyFCM() Legacy/APIKey colors checks. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
'fcm://abc123/device/?color=AA001b') |
|
assert mock_post_legacy.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post_legacy.call_count == 1 |
|
assert mock_post_legacy.call_args_list[0][0][0] == \ |
|
'https://fcm.googleapis.com/fcm/send' |
|
|
|
payload = mock_post_legacy.mock_calls[0][2] |
|
data = json.loads(payload['data']) |
|
assert 'data' not in data |
|
assert 'notification' in data |
|
assert isinstance(data['notification'], dict) |
|
assert 'notification' in data['notification'] |
|
assert isinstance(data['notification']['notification'], dict) |
|
assert 'image' not in data['notification']['notification'] |
|
assert 'color' in data['notification']['notification'] |
|
assert data['notification']['notification']['color'] == '#aa001b' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_default(mock_post): |
|
""" |
|
NotifyFCM() general OAuth checks - success. |
|
Test using a valid Project ID and key file. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}') |
|
|
|
# send our notification |
|
assert obj.notify("test") is True |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 3 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
assert mock_post.call_args_list[1][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
assert mock_post.call_args_list[2][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_invalid_project_id(mock_post): |
|
""" |
|
NotifyFCM() OAuth checks, with invalid project id. |
|
""" |
|
|
|
# Test having a valid keyfile, but not a valid project id match. |
|
obj = Apprise.instantiate( |
|
f'fcm://invalid_project_id/device/?keyfile={FCM_KEYFILE}') |
|
|
|
# we'll fail as a result |
|
assert obj.notify("test") is False |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 0 |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_keyfile_error(mock_post): |
|
""" |
|
NotifyFCM() OAuth checks, while unable to read key file. |
|
""" |
|
|
|
# Now we test using a valid Project ID but we can't open our file |
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}') |
|
|
|
with mock.patch('builtins.open', side_effect=OSError): |
|
# we'll fail as a result |
|
assert obj.notify("test") is False |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 0 |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_data_parameters(mock_post): |
|
""" |
|
NotifyFCM() OAuth checks, success. |
|
Test using a valid Project ID and data parameters. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}' |
|
'&+key=value&+key2=value2' |
|
'&image_url=https://example.com/interesting.png') |
|
assert mock_post.call_count == 0 |
|
|
|
# send our notification |
|
assert obj.notify("test") is True |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 3 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
assert mock_post.call_args_list[1][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
payload = mock_post.mock_calls[1][2] |
|
data = json.loads(payload['data']) |
|
assert 'message' in data |
|
assert isinstance(data['message'], dict) |
|
assert 'data' in data['message'] |
|
assert isinstance(data['message']['data'], dict) |
|
assert 'key' in data['message']['data'] |
|
assert data['message']['data']['key'] == 'value' |
|
assert 'key2' in data['message']['data'] |
|
assert data['message']['data']['key2'] == 'value2' |
|
|
|
assert 'notification' in data['message'] |
|
assert isinstance(data['message']['notification'], dict) |
|
assert 'image' in data['message']['notification'] |
|
assert data['message']['notification']['image'] == \ |
|
'https://example.com/interesting.png' |
|
|
|
assert mock_post.call_args_list[2][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
|
|
payload = mock_post.mock_calls[2][2] |
|
data = json.loads(payload['data']) |
|
assert 'message' in data |
|
assert isinstance(data['message'], dict) |
|
assert 'data' in data['message'] |
|
assert isinstance(data['message']['data'], dict) |
|
assert 'key' in data['message']['data'] |
|
assert data['message']['data']['key'] == 'value' |
|
assert 'key2' in data['message']['data'] |
|
assert data['message']['data']['key2'] == 'value2' |
|
|
|
assert 'notification' in data['message'] |
|
assert isinstance(data['message']['notification'], dict) |
|
assert 'image' in data['message']['notification'] |
|
assert data['message']['notification']['image'] == \ |
|
'https://example.com/interesting.png' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_priorities(mock_post): |
|
""" |
|
Verify priorities work as intended. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}' |
|
'&priority=high') |
|
assert mock_post.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 2 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
assert mock_post.call_args_list[1][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
payload = mock_post.mock_calls[1][2] |
|
data = json.loads(payload['data']) |
|
assert 'message' in data |
|
assert isinstance(data['message'], dict) |
|
assert 'data' not in data['message'] |
|
assert 'notification' in data['message'] |
|
assert isinstance(data['message']['notification'], dict) |
|
assert 'image' not in data['message']['notification'] |
|
assert data['message']['apns']['headers']['apns-priority'] == "10" |
|
assert data['message']['webpush']['headers']['Urgency'] == "high" |
|
assert data['message']['android']['priority'] == "HIGH" |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_no_colors(mock_post): |
|
""" |
|
Verify `color=no` work as intended. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}' |
|
'&color=no') |
|
assert mock_post.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 2 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
assert mock_post.call_args_list[1][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
payload = mock_post.mock_calls[1][2] |
|
data = json.loads(payload['data']) |
|
assert 'message' in data |
|
assert isinstance(data['message'], dict) |
|
assert 'data' not in data['message'] |
|
assert 'notification' in data['message'] |
|
assert isinstance(data['message']['notification'], dict) |
|
assert 'color' not in data['message']['notification'] |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_oauth_colors(mock_post): |
|
""" |
|
Verify colors work as intended. |
|
""" |
|
|
|
obj = Apprise.instantiate( |
|
f'fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}' |
|
'&color=#12AAbb') |
|
assert mock_post.call_count == 0 |
|
|
|
# Send our notification |
|
assert obj.notify(title="title", body="body") is True |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 2 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
assert mock_post.call_args_list[1][0][0] == \ |
|
'https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send' |
|
payload = mock_post.mock_calls[1][2] |
|
data = json.loads(payload['data']) |
|
assert 'message' in data |
|
assert isinstance(data['message'], dict) |
|
assert 'data' not in data['message'] |
|
assert 'notification' in data['message'] |
|
assert isinstance(data['message']['notification'], dict) |
|
assert 'color' in data['message']['android']['notification'] |
|
assert data['message']['android']['notification']['color'] == '#12aabb' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_parse_default(mock_post): |
|
""" |
|
NotifyFCM() KeyFile Tests |
|
""" |
|
|
|
oauth = GoogleOAuth() |
|
# We can not get an Access Token without content loaded |
|
assert oauth.access_token is None |
|
|
|
# Load our content |
|
assert oauth.load(FCM_KEYFILE) is True |
|
assert oauth.access_token is not None |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 1 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
mock_post.reset_mock() |
|
|
|
# a second call uses cache since our token hasn't expired yet |
|
assert oauth.access_token is not None |
|
assert mock_post.call_count == 0 |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_parse_no_expiry(mock_post): |
|
""" |
|
Test case without `expires_in` entry. |
|
""" |
|
|
|
mock_post.return_value.content = json.dumps({ |
|
"access_token": "ya29.c.abcd", |
|
"token_type": "Bearer", |
|
}) |
|
|
|
oauth = GoogleOAuth() |
|
assert oauth.load(FCM_KEYFILE) is True |
|
assert oauth.access_token is not None |
|
|
|
# Test our call count |
|
assert mock_post.call_count == 1 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_parse_user_agent(mock_post): |
|
""" |
|
Test case with `user-agent` override. |
|
""" |
|
|
|
oauth = GoogleOAuth(user_agent="test-agent-override") |
|
assert oauth.load(FCM_KEYFILE) is True |
|
assert oauth.access_token is not None |
|
assert mock_post.call_count == 1 |
|
assert mock_post.call_args_list[0][0][0] == \ |
|
'https://accounts.google.com/o/oauth2/token' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_parse_keyfile_failures(mock_post: mock.Mock): |
|
""" |
|
Test some errors that can get thrown when trying to handle |
|
the `service_account.json` file. |
|
""" |
|
|
|
# Now we test a case where we can't access the file we've been pointed to: |
|
oauth = GoogleOAuth() |
|
with mock.patch('builtins.open', side_effect=OSError): |
|
# We will fail to retrieve our Access Token |
|
assert oauth.load(FCM_KEYFILE) is False |
|
assert oauth.access_token is None |
|
|
|
oauth = GoogleOAuth() |
|
with mock.patch('json.loads', side_effect=([], )): |
|
# We will fail to retrieve our Access Token since we did not parse |
|
# a dictionary |
|
assert oauth.load(FCM_KEYFILE) is False |
|
assert oauth.access_token is None |
|
|
|
# Case where we can't load the PEM key: |
|
oauth = GoogleOAuth() |
|
with mock.patch( |
|
'cryptography.hazmat.primitives.serialization' |
|
'.load_pem_private_key', |
|
side_effect=ValueError("")): |
|
assert oauth.load(FCM_KEYFILE) is False |
|
assert oauth.access_token is None |
|
|
|
# Case where we can't load the PEM key: |
|
oauth = GoogleOAuth() |
|
with mock.patch( |
|
'cryptography.hazmat.primitives.serialization' |
|
'.load_pem_private_key', |
|
side_effect=TypeError("")): |
|
assert oauth.load(FCM_KEYFILE) is False |
|
assert oauth.access_token is None |
|
|
|
# Case where we can't load the PEM key: |
|
oauth = GoogleOAuth() |
|
with mock.patch( |
|
'cryptography.hazmat.primitives.serialization' |
|
'.load_pem_private_key', |
|
side_effect=UnsupportedAlgorithm("")): |
|
# Note: This test should be te |
|
assert oauth.load(FCM_KEYFILE) is False |
|
assert oauth.access_token is None |
|
|
|
# Verify that not a single call to the web escaped the test harness. |
|
assert mock_post.mock_calls == [] |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_parse_token_failures(mock_post): |
|
""" |
|
Test some web errors that can occur when speaking upstream |
|
with Google to get our token generated. |
|
""" |
|
|
|
mock_post.return_value.status_code = requests.codes.internal_server_error |
|
|
|
oauth = GoogleOAuth() |
|
assert oauth.load(FCM_KEYFILE) is True |
|
|
|
# We'll fail due to an bad web response |
|
assert oauth.access_token is None |
|
|
|
# Return our status code to how it was |
|
mock_post.return_value.status_code = requests.codes.ok |
|
|
|
# No access token |
|
bad_response_1 = mock.Mock() |
|
bad_response_1.content = json.dumps({ |
|
"expires_in": 3599, |
|
"token_type": "Bearer", |
|
}) |
|
|
|
# Invalid JSON |
|
bad_response_2 = mock.Mock() |
|
bad_response_2.content = '{' |
|
|
|
mock_post.return_value = None |
|
# Throw an exception on the first call to requests.post() |
|
for side_effect in ( |
|
requests.RequestException(), bad_response_1, bad_response_2): |
|
mock_post.side_effect = side_effect |
|
|
|
# Test all of our bad side effects |
|
oauth = GoogleOAuth() |
|
assert oauth.load(FCM_KEYFILE) is True |
|
|
|
# We'll fail due to an bad web response |
|
assert oauth.access_token is None |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_bad_keyfile_parse(): |
|
""" |
|
NotifyFCM() KeyFile Bad Service Account Type Tests |
|
""" |
|
|
|
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account-bad-type.json') |
|
oauth = GoogleOAuth() |
|
assert oauth.load(path) is False |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_keyfile_missing_entries_parse(tmpdir): |
|
""" |
|
NotifyFCM() KeyFile Missing Entries Test |
|
""" |
|
|
|
# Prepare a base keyfile reference to use |
|
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') |
|
with open(path, mode="r", encoding='utf-8') as fp: |
|
content = json.loads(fp.read()) |
|
|
|
path = tmpdir.join('fcm_keyfile.json') |
|
|
|
# Test that we fail to load if the following keys are missing: |
|
for entry in ( |
|
'client_email', 'private_key_id', 'private_key', 'type', |
|
'project_id'): |
|
|
|
# Ensure the key actually exists in our file |
|
assert entry in content |
|
|
|
# Create a copy of our content |
|
content_copy = content.copy() |
|
|
|
# Remove our entry we expect to validate against |
|
del content_copy[entry] |
|
assert entry not in content_copy |
|
|
|
path.write(json.dumps(content_copy)) |
|
|
|
oauth = GoogleOAuth() |
|
assert oauth.load(str(path)) is False |
|
|
|
# Now write ourselves a bad JSON file |
|
path.write('{') |
|
oauth = GoogleOAuth() |
|
assert oauth.load(str(path)) is False |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_priority_manager(): |
|
""" |
|
NotifyFCM() FCMPriorityManager() Testing |
|
""" |
|
|
|
for mode in FCM_MODES: |
|
for priority in FCM_PRIORITIES: |
|
instance = FCMPriorityManager(mode, priority) |
|
assert isinstance(instance.payload(), dict) |
|
# Verify it's not empty |
|
assert bool(instance) |
|
assert instance.payload() |
|
assert str(instance) == priority |
|
|
|
# We do not have to set a priority |
|
instance = FCMPriorityManager(mode) |
|
assert isinstance(instance.payload(), dict) |
|
|
|
# Dictionary is empty though |
|
assert not bool(instance) |
|
assert not instance.payload() |
|
assert str(instance) == '' |
|
|
|
with pytest.raises(TypeError): |
|
instance = FCMPriorityManager(mode, 'invalid') |
|
|
|
with pytest.raises(TypeError): |
|
instance = FCMPriorityManager('invald', 'high') |
|
|
|
# mode validation is done at the higher NotifyFCM() level so |
|
# it is not tested here (not required) |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
def test_plugin_fcm_color_manager(): |
|
""" |
|
NotifyFCM() FCMColorManager() Testing |
|
""" |
|
|
|
# No colors |
|
instance = FCMColorManager('no') |
|
assert bool(instance) is False |
|
assert instance.get() is None |
|
# We'll return that we are not defined |
|
assert str(instance) == 'no' |
|
|
|
# Asset colors |
|
instance = FCMColorManager('yes') |
|
assert isinstance(instance.get(), str) |
|
# Output: #rrggbb |
|
assert len(instance.get()) == 7 |
|
# Starts with has symbol |
|
assert instance.get()[0] == '#' |
|
# We'll return that we are defined but using default configuration |
|
assert str(instance) == 'yes' |
|
|
|
# We will be `true` because we can acquire a color based on what was |
|
# passed in |
|
assert bool(instance) is True |
|
|
|
# Custom color |
|
instance = FCMColorManager('#A2B3A4') |
|
assert isinstance(instance.get(), str) |
|
assert instance.get() == '#a2b3a4' |
|
assert bool(instance) is True |
|
# str() response does not include hashtag |
|
assert str(instance) == 'a2b3a4' |
|
|
|
# Custom color (no hashtag) |
|
instance = FCMColorManager('A2B3A4') |
|
assert isinstance(instance.get(), str) |
|
# Hashtag is always part of output |
|
assert instance.get() == '#a2b3a4' |
|
assert bool(instance) is True |
|
# str() response does not include hashtag |
|
assert str(instance) == 'a2b3a4' |
|
|
|
# Custom color (no hashtag) but only using 3 letter rgb values |
|
instance = FCMColorManager('AC4') |
|
assert isinstance(instance.get(), str) |
|
# Hashtag is always part of output |
|
assert instance.get() == '#aacc44' |
|
assert bool(instance) is True |
|
# str() response does not include hashtag |
|
assert str(instance) == 'aacc44' |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' in sys.modules, |
|
reason="Requires that cryptography NOT be installed") |
|
def test_plugin_fcm_cryptography_import_error(): |
|
""" |
|
NotifyFCM Cryptography loading failure |
|
""" |
|
|
|
# Prepare a base keyfile reference to use |
|
path = os.path.join(PRIVATE_KEYFILE_DIR, 'service_account.json') |
|
|
|
# Attempt to instantiate our object |
|
obj = Apprise.instantiate( |
|
'fcm://mock-project-id/device/#topic/?keyfile={}'.format(str(path))) |
|
|
|
# It's not possible because our cryptography depedancy is missing |
|
assert obj is None |
|
|
|
|
|
@pytest.mark.skipif( |
|
'cryptography' not in sys.modules, reason="Requires cryptography") |
|
@mock.patch('requests.post') |
|
def test_plugin_fcm_edge_cases(mock_post): |
|
""" |
|
NotifyFCM() Edge Cases |
|
|
|
""" |
|
|
|
# Prepare a good response |
|
response = mock.Mock() |
|
response.status_code = requests.codes.ok |
|
mock_post.return_value = response |
|
|
|
# this tests an edge case where verify if the data_kwargs is a dictionary |
|
# or not. Below, we don't even define it, so it will be None (causing |
|
# the check to go). We'll still correctly instantiate a plugin: |
|
obj = NotifyFCM("project", "api:123", targets='device') |
|
assert obj is not None
|
|
|