mirror of https://github.com/caronc/apprise
1056 lines
32 KiB
Python
1056 lines
32 KiB
Python
# BSD 2-Clause License
|
|
#
|
|
# Apprise - Push Notification Library.
|
|
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice,
|
|
# this list of conditions and the following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice,
|
|
# this list of conditions and the following disclaimer in the documentation
|
|
# and/or other materials provided with the distribution.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
|
|
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
|
|
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
|
|
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
|
|
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
|
|
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
|
|
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
|
|
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
|
|
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
|
|
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
# Great Resources:
|
|
# - Dev/Legacy API:
|
|
# https://firebase.google.com/docs/cloud-messaging/http-server-ref
|
|
# - Legacy API (v1) -> OAuth
|
|
# - https://firebase.google.com/docs/cloud-messaging/migrate-v1
|
|
|
|
import json
|
|
import os
|
|
import sys
|
|
from unittest import mock
|
|
|
|
from helpers import AppriseURLTester
|
|
import pytest
|
|
import requests
|
|
|
|
from apprise import Apprise
|
|
from apprise.plugins.fcm import NotifyFCM
|
|
|
|
try:
|
|
from cryptography.exceptions import UnsupportedAlgorithm
|
|
|
|
from apprise.plugins.fcm.color import FCMColorManager
|
|
from apprise.plugins.fcm.common import FCM_MODES
|
|
from apprise.plugins.fcm.oauth import GoogleOAuth
|
|
from apprise.plugins.fcm.priority import FCM_PRIORITIES, FCMPriorityManager
|
|
|
|
except ImportError:
|
|
# No problem; there is no cryptography support
|
|
pass
|
|
|
|
|
|
# Disable logging for a cleaner testing output
|
|
import logging
|
|
|
|
logging.disable(logging.CRITICAL)
|
|
|
|
# Test files for KeyFile Directory
|
|
PRIVATE_KEYFILE_DIR = os.path.join(os.path.dirname(__file__), "var", "fcm")
|
|
FCM_KEYFILE = os.path.join(PRIVATE_KEYFILE_DIR, "service_account.json")
|
|
|
|
|
|
# Our Testing URLs
|
|
apprise_url_tests = (
|
|
(
|
|
"fcm://",
|
|
{
|
|
# We failed to identify any valid authentication
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://:@/",
|
|
{
|
|
# We failed to identify any valid authentication
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://project@%20%20/",
|
|
{
|
|
# invalid apikey
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/",
|
|
{
|
|
# no project id specified so we operate in legacy mode
|
|
"instance": NotifyFCM,
|
|
# but there are no targets specified so we return False
|
|
"notify_response": False,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/device",
|
|
{
|
|
# Valid device
|
|
"instance": NotifyFCM,
|
|
"privacy_url": "fcm://a...y/device",
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/#topic",
|
|
{
|
|
# Valid topic
|
|
"instance": NotifyFCM,
|
|
"privacy_url": "fcm://a...y/%23topic",
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/device?mode=invalid",
|
|
{
|
|
# Valid device, invalid mode
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/#topic1/device/%20/",
|
|
{
|
|
# Valid topic, valid device, and invalid entry
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey?to=#topic1,device",
|
|
{
|
|
# Test to=
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://?apikey=abc123&to=device",
|
|
{
|
|
# Test apikey= to=
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://?apikey=abc123&to=device&image=yes",
|
|
{
|
|
# Test image boolean
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://?apikey=abc123&to=device&color=no",
|
|
{
|
|
# Disable colors
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://?apikey=abc123&to=device&color=aabbcc",
|
|
{
|
|
# custom colors
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
(
|
|
"fcm://?apikey=abc123&to=device"
|
|
"&image_url=http://example.com/interesting.jpg"
|
|
),
|
|
{
|
|
# Test image_url
|
|
"instance": NotifyFCM
|
|
},
|
|
),
|
|
(
|
|
(
|
|
"fcm://?apikey=abc123&to=device"
|
|
"&image_url=http://example.com/interesting.jpg&image=no"
|
|
),
|
|
{
|
|
# Test image_url but set to no
|
|
"instance": NotifyFCM
|
|
},
|
|
),
|
|
(
|
|
"fcm://?apikey=abc123&to=device&+key=value&+key2=value2",
|
|
{
|
|
# Test apikey= to= and data arguments
|
|
"instance": NotifyFCM,
|
|
},
|
|
),
|
|
(
|
|
"fcm://%20?to=device&keyfile=/invalid/path",
|
|
{
|
|
# invalid Project ID
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://project_id?to=device&keyfile=/invalid/path",
|
|
{
|
|
# Test to= and auto-detection of OAuth mode
|
|
"instance": NotifyFCM,
|
|
# we'll fail to send our notification as a result
|
|
"response": False,
|
|
},
|
|
),
|
|
(
|
|
"fcm://?to=device&project=project_id&keyfile=/invalid/path",
|
|
{
|
|
# Test project= & to= and auto detection of OAuth mode
|
|
"instance": NotifyFCM,
|
|
# we'll fail to send our notification as a result
|
|
"response": False,
|
|
},
|
|
),
|
|
(
|
|
"fcm://project_id?to=device&mode=oauth2",
|
|
{
|
|
# no keyfile was specified
|
|
"instance": TypeError,
|
|
},
|
|
),
|
|
(
|
|
"fcm://project_id?to=device&mode=oauth2&keyfile=/invalid/path",
|
|
{
|
|
# Same test as above except we explicitly set our oauth2 mode
|
|
# Test to= and auto-detection of OAuth mode
|
|
"instance": NotifyFCM,
|
|
# we'll fail to send our notification as a result
|
|
"response": False,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/#topic1/device/?mode=legacy",
|
|
{
|
|
"instance": NotifyFCM,
|
|
# throw a bizarre code forcing us to fail to look it up
|
|
"response": False,
|
|
"requests_response_code": 999,
|
|
},
|
|
),
|
|
(
|
|
"fcm://apikey/#topic1/device/?mode=legacy",
|
|
{
|
|
"instance": NotifyFCM,
|
|
# Throws a series of i/o exceptions with this flag
|
|
# is set and tests that we gracefully handle them
|
|
"test_requests_exceptions": True,
|
|
},
|
|
),
|
|
(
|
|
"fcm://project/#topic1/device/?mode=oauth2&keyfile=file://{}".format(
|
|
os.path.join(
|
|
os.path.dirname(__file__), "var", "fcm", "service_account.json"
|
|
)
|
|
),
|
|
{
|
|
"instance": NotifyFCM,
|
|
# throw a bizarre code forcing us to fail to look it up
|
|
"response": False,
|
|
"requests_response_code": 999,
|
|
},
|
|
),
|
|
(
|
|
"fcm://projectid/#topic1/device/?mode=oauth2&keyfile=file://{}".format(
|
|
os.path.join(
|
|
os.path.dirname(__file__), "var", "fcm", "service_account.json"
|
|
)
|
|
),
|
|
{
|
|
"instance": NotifyFCM,
|
|
# Throws a series of connection and transfer exceptions when
|
|
# this flag is set and tests that we gracefully handle them
|
|
"test_requests_exceptions": True,
|
|
},
|
|
),
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_post(mocker):
|
|
"""Prepare a good OAuth mock response."""
|
|
|
|
mock_thing = mocker.patch("requests.post")
|
|
|
|
response = mock.Mock()
|
|
response.content = json.dumps({
|
|
"access_token": "ya29.c.abcd",
|
|
"expires_in": 3599,
|
|
"token_type": "Bearer",
|
|
})
|
|
response.status_code = requests.codes.ok
|
|
mock_thing.return_value = response
|
|
|
|
return mock_thing
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_post_legacy(mocker):
|
|
"""Prepare a good legacy mock response."""
|
|
|
|
mock_thing = mocker.patch("requests.post")
|
|
|
|
response = mock.Mock()
|
|
response.status_code = requests.codes.ok
|
|
mock_thing.return_value = response
|
|
|
|
return mock_thing
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_urls():
|
|
"""NotifyFCM() Apprise URLs."""
|
|
|
|
# Run our general tests
|
|
AppriseURLTester(tests=apprise_url_tests).run_all()
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_legacy_default(mock_post_legacy):
|
|
"""NotifyFCM() Legacy/APIKey default checks."""
|
|
|
|
# A valid Legacy URL
|
|
obj = Apprise.instantiate(
|
|
"fcm://abc123/device/"
|
|
"?+key=value&+key2=value2"
|
|
"&image_url=https://example.com/interesting.png"
|
|
)
|
|
|
|
# Send our notification
|
|
assert obj.notify("test") is True
|
|
|
|
# Test our call count
|
|
assert mock_post_legacy.call_count == 1
|
|
assert (
|
|
mock_post_legacy.call_args_list[0][0][0]
|
|
== "https://fcm.googleapis.com/fcm/send"
|
|
)
|
|
|
|
payload = mock_post_legacy.mock_calls[0][2]
|
|
data = json.loads(payload["data"])
|
|
assert "data" in data
|
|
assert isinstance(data, dict)
|
|
assert "key" in data["data"]
|
|
assert data["data"]["key"] == "value"
|
|
assert "key2" in data["data"]
|
|
assert data["data"]["key2"] == "value2"
|
|
|
|
assert "notification" in data
|
|
assert isinstance(data["notification"], dict)
|
|
assert "notification" in data["notification"]
|
|
assert isinstance(data["notification"]["notification"], dict)
|
|
assert "image" in data["notification"]["notification"]
|
|
assert (
|
|
data["notification"]["notification"]["image"]
|
|
== "https://example.com/interesting.png"
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_legacy_priorities(mock_post_legacy):
|
|
"""NotifyFCM() Legacy/APIKey priorities checks."""
|
|
|
|
obj = Apprise.instantiate("fcm://abc123/device/?priority=low")
|
|
assert mock_post_legacy.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post_legacy.call_count == 1
|
|
assert (
|
|
mock_post_legacy.call_args_list[0][0][0]
|
|
== "https://fcm.googleapis.com/fcm/send"
|
|
)
|
|
|
|
payload = mock_post_legacy.mock_calls[0][2]
|
|
data = json.loads(payload["data"])
|
|
assert "data" not in data
|
|
assert "notification" in data
|
|
assert isinstance(data["notification"], dict)
|
|
assert "notification" in data["notification"]
|
|
assert isinstance(data["notification"]["notification"], dict)
|
|
assert "image" not in data["notification"]["notification"]
|
|
assert "priority" in data
|
|
|
|
# legacy can only switch between high/low
|
|
assert data["priority"] == "normal"
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_legacy_no_colors(mock_post_legacy):
|
|
"""NotifyFCM() Legacy/APIKey `color=no` checks."""
|
|
|
|
obj = Apprise.instantiate("fcm://abc123/device/?color=no")
|
|
assert mock_post_legacy.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post_legacy.call_count == 1
|
|
assert (
|
|
mock_post_legacy.call_args_list[0][0][0]
|
|
== "https://fcm.googleapis.com/fcm/send"
|
|
)
|
|
|
|
payload = mock_post_legacy.mock_calls[0][2]
|
|
data = json.loads(payload["data"])
|
|
assert "data" not in data
|
|
assert "notification" in data
|
|
assert isinstance(data["notification"], dict)
|
|
assert "notification" in data["notification"]
|
|
assert isinstance(data["notification"]["notification"], dict)
|
|
assert "image" not in data["notification"]["notification"]
|
|
assert "color" not in data["notification"]["notification"]
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_legacy_colors(mock_post_legacy):
|
|
"""NotifyFCM() Legacy/APIKey colors checks."""
|
|
|
|
obj = Apprise.instantiate("fcm://abc123/device/?color=AA001b")
|
|
assert mock_post_legacy.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post_legacy.call_count == 1
|
|
assert (
|
|
mock_post_legacy.call_args_list[0][0][0]
|
|
== "https://fcm.googleapis.com/fcm/send"
|
|
)
|
|
|
|
payload = mock_post_legacy.mock_calls[0][2]
|
|
data = json.loads(payload["data"])
|
|
assert "data" not in data
|
|
assert "notification" in data
|
|
assert isinstance(data["notification"], dict)
|
|
assert "notification" in data["notification"]
|
|
assert isinstance(data["notification"]["notification"], dict)
|
|
assert "image" not in data["notification"]["notification"]
|
|
assert "color" in data["notification"]["notification"]
|
|
assert data["notification"]["notification"]["color"] == "#aa001b"
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_default(mock_post):
|
|
"""
|
|
NotifyFCM() general OAuth checks - success.
|
|
Test using a valid Project ID and key file.
|
|
"""
|
|
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}"
|
|
)
|
|
|
|
# send our notification
|
|
assert obj.notify("test") is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 3
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
assert (
|
|
mock_post.call_args_list[1][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
assert (
|
|
mock_post.call_args_list[2][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_invalid_project_id(mock_post):
|
|
"""NotifyFCM() OAuth checks, with invalid project id."""
|
|
|
|
# Test having a valid keyfile, but not a valid project id match.
|
|
obj = Apprise.instantiate(
|
|
f"fcm://invalid_project_id/device/?keyfile={FCM_KEYFILE}"
|
|
)
|
|
|
|
# we'll fail as a result
|
|
assert obj.notify("test") is False
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 0
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_keyfile_error(mock_post):
|
|
"""NotifyFCM() OAuth checks, while unable to read key file."""
|
|
|
|
# Now we test using a valid Project ID but we can't open our file
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}"
|
|
)
|
|
|
|
with mock.patch("builtins.open", side_effect=OSError):
|
|
# we'll fail as a result
|
|
assert obj.notify("test") is False
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 0
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_data_parameters(mock_post):
|
|
"""NotifyFCM() OAuth checks, success.
|
|
|
|
Test using a valid Project ID and data parameters.
|
|
"""
|
|
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/#topic/?keyfile={FCM_KEYFILE}"
|
|
"&+key=value&+key2=value2"
|
|
"&image_url=https://example.com/interesting.png"
|
|
)
|
|
assert mock_post.call_count == 0
|
|
|
|
# send our notification
|
|
assert obj.notify("test") is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 3
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
assert (
|
|
mock_post.call_args_list[1][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
payload = mock_post.mock_calls[1][2]
|
|
data = json.loads(payload["data"])
|
|
assert "message" in data
|
|
assert isinstance(data["message"], dict)
|
|
assert "data" in data["message"]
|
|
assert isinstance(data["message"]["data"], dict)
|
|
assert "key" in data["message"]["data"]
|
|
assert data["message"]["data"]["key"] == "value"
|
|
assert "key2" in data["message"]["data"]
|
|
assert data["message"]["data"]["key2"] == "value2"
|
|
|
|
assert "notification" in data["message"]
|
|
assert isinstance(data["message"]["notification"], dict)
|
|
assert "image" in data["message"]["notification"]
|
|
assert (
|
|
data["message"]["notification"]["image"]
|
|
== "https://example.com/interesting.png"
|
|
)
|
|
|
|
assert (
|
|
mock_post.call_args_list[2][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
|
|
payload = mock_post.mock_calls[2][2]
|
|
data = json.loads(payload["data"])
|
|
assert "message" in data
|
|
assert isinstance(data["message"], dict)
|
|
assert "data" in data["message"]
|
|
assert isinstance(data["message"]["data"], dict)
|
|
assert "key" in data["message"]["data"]
|
|
assert data["message"]["data"]["key"] == "value"
|
|
assert "key2" in data["message"]["data"]
|
|
assert data["message"]["data"]["key2"] == "value2"
|
|
|
|
assert "notification" in data["message"]
|
|
assert isinstance(data["message"]["notification"], dict)
|
|
assert "image" in data["message"]["notification"]
|
|
assert (
|
|
data["message"]["notification"]["image"]
|
|
== "https://example.com/interesting.png"
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_priorities(mock_post):
|
|
"""Verify priorities work as intended."""
|
|
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}&priority=high"
|
|
)
|
|
assert mock_post.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 2
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
assert (
|
|
mock_post.call_args_list[1][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
payload = mock_post.mock_calls[1][2]
|
|
data = json.loads(payload["data"])
|
|
assert "message" in data
|
|
assert isinstance(data["message"], dict)
|
|
assert "data" not in data["message"]
|
|
assert "notification" in data["message"]
|
|
assert isinstance(data["message"]["notification"], dict)
|
|
assert "image" not in data["message"]["notification"]
|
|
assert data["message"]["apns"]["headers"]["apns-priority"] == "10"
|
|
assert data["message"]["webpush"]["headers"]["Urgency"] == "high"
|
|
assert data["message"]["android"]["priority"] == "HIGH"
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_no_colors(mock_post):
|
|
"""Verify `color=no` work as intended."""
|
|
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}&color=no"
|
|
)
|
|
assert mock_post.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 2
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
assert (
|
|
mock_post.call_args_list[1][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
payload = mock_post.mock_calls[1][2]
|
|
data = json.loads(payload["data"])
|
|
assert "message" in data
|
|
assert isinstance(data["message"], dict)
|
|
assert "data" not in data["message"]
|
|
assert "notification" in data["message"]
|
|
assert isinstance(data["message"]["notification"], dict)
|
|
assert "color" not in data["message"]["notification"]
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_oauth_colors(mock_post):
|
|
"""Verify colors work as intended."""
|
|
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/?keyfile={FCM_KEYFILE}&color=#12AAbb"
|
|
)
|
|
assert mock_post.call_count == 0
|
|
|
|
# Send our notification
|
|
assert obj.notify(title="title", body="body") is True
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 2
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
assert (
|
|
mock_post.call_args_list[1][0][0]
|
|
== "https://fcm.googleapis.com/v1/projects/mock-project-id/messages:send"
|
|
)
|
|
payload = mock_post.mock_calls[1][2]
|
|
data = json.loads(payload["data"])
|
|
assert "message" in data
|
|
assert isinstance(data["message"], dict)
|
|
assert "data" not in data["message"]
|
|
assert "notification" in data["message"]
|
|
assert isinstance(data["message"]["notification"], dict)
|
|
assert "color" in data["message"]["android"]["notification"]
|
|
assert data["message"]["android"]["notification"]["color"] == "#12aabb"
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_parse_default(mock_post):
|
|
"""NotifyFCM() KeyFile Tests."""
|
|
|
|
oauth = GoogleOAuth()
|
|
# We can not get an Access Token without content loaded
|
|
assert oauth.access_token is None
|
|
|
|
# Load our content
|
|
assert oauth.load(FCM_KEYFILE) is True
|
|
assert oauth.access_token is not None
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 1
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
mock_post.reset_mock()
|
|
|
|
# a second call uses cache since our token hasn't expired yet
|
|
assert oauth.access_token is not None
|
|
assert mock_post.call_count == 0
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_parse_no_expiry(mock_post):
|
|
"""Test case without `expires_in` entry."""
|
|
|
|
mock_post.return_value.content = json.dumps({
|
|
"access_token": "ya29.c.abcd",
|
|
"token_type": "Bearer",
|
|
})
|
|
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(FCM_KEYFILE) is True
|
|
assert oauth.access_token is not None
|
|
|
|
# Test our call count
|
|
assert mock_post.call_count == 1
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_parse_user_agent(mock_post):
|
|
"""Test case with `user-agent` override."""
|
|
|
|
oauth = GoogleOAuth(user_agent="test-agent-override")
|
|
assert oauth.load(FCM_KEYFILE) is True
|
|
assert oauth.access_token is not None
|
|
assert mock_post.call_count == 1
|
|
assert (
|
|
mock_post.call_args_list[0][0][0]
|
|
== "https://accounts.google.com/o/oauth2/token"
|
|
)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_parse_keyfile_failures(mock_post: mock.Mock):
|
|
"""Test some errors that can get thrown when trying to handle the
|
|
`service_account.json` file."""
|
|
|
|
# Now we test a case where we can't access the file we've been pointed to:
|
|
oauth = GoogleOAuth()
|
|
with mock.patch("builtins.open", side_effect=OSError):
|
|
# We will fail to retrieve our Access Token
|
|
assert oauth.load(FCM_KEYFILE) is False
|
|
assert oauth.access_token is None
|
|
|
|
oauth = GoogleOAuth()
|
|
with mock.patch("json.loads", side_effect=([],)):
|
|
# We will fail to retrieve our Access Token since we did not parse
|
|
# a dictionary
|
|
assert oauth.load(FCM_KEYFILE) is False
|
|
assert oauth.access_token is None
|
|
|
|
# Case where we can't load the PEM key:
|
|
oauth = GoogleOAuth()
|
|
with mock.patch(
|
|
"cryptography.hazmat.primitives.serialization.load_pem_private_key",
|
|
side_effect=ValueError(""),
|
|
):
|
|
assert oauth.load(FCM_KEYFILE) is False
|
|
assert oauth.access_token is None
|
|
|
|
# Case where we can't load the PEM key:
|
|
oauth = GoogleOAuth()
|
|
with mock.patch(
|
|
"cryptography.hazmat.primitives.serialization.load_pem_private_key",
|
|
side_effect=TypeError(""),
|
|
):
|
|
assert oauth.load(FCM_KEYFILE) is False
|
|
assert oauth.access_token is None
|
|
|
|
# Case where we can't load the PEM key:
|
|
oauth = GoogleOAuth()
|
|
with mock.patch(
|
|
"cryptography.hazmat.primitives.serialization.load_pem_private_key",
|
|
side_effect=UnsupportedAlgorithm(""),
|
|
):
|
|
# Note: This test should be te
|
|
assert oauth.load(FCM_KEYFILE) is False
|
|
assert oauth.access_token is None
|
|
|
|
# Verify that not a single call to the web escaped the test harness.
|
|
assert mock_post.mock_calls == []
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_parse_token_failures(mock_post):
|
|
"""Test some web errors that can occur when speaking upstream with Google
|
|
to get our token generated."""
|
|
|
|
mock_post.return_value.status_code = requests.codes.internal_server_error
|
|
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(FCM_KEYFILE) is True
|
|
|
|
# We'll fail due to an bad web response
|
|
assert oauth.access_token is None
|
|
|
|
# Return our status code to how it was
|
|
mock_post.return_value.status_code = requests.codes.ok
|
|
|
|
# No access token
|
|
bad_response_1 = mock.Mock()
|
|
bad_response_1.content = json.dumps({
|
|
"expires_in": 3599,
|
|
"token_type": "Bearer",
|
|
})
|
|
|
|
# Invalid JSON
|
|
bad_response_2 = mock.Mock()
|
|
bad_response_2.content = "{"
|
|
|
|
mock_post.return_value = None
|
|
# Throw an exception on the first call to requests.post()
|
|
for side_effect in (
|
|
requests.RequestException(),
|
|
bad_response_1,
|
|
bad_response_2,
|
|
):
|
|
mock_post.side_effect = side_effect
|
|
|
|
# Test all of our bad side effects
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(FCM_KEYFILE) is True
|
|
|
|
# We'll fail due to an bad web response
|
|
assert oauth.access_token is None
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_bad_keyfile_parse():
|
|
"""NotifyFCM() KeyFile Bad Service Account Type Tests."""
|
|
|
|
path = os.path.join(PRIVATE_KEYFILE_DIR, "service_account-bad-type.json")
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(path) is False
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_keyfile_missing_entries_parse(tmpdir):
|
|
"""NotifyFCM() KeyFile Missing Entries Test."""
|
|
|
|
# Prepare a base keyfile reference to use
|
|
path = os.path.join(PRIVATE_KEYFILE_DIR, "service_account.json")
|
|
with open(path, encoding="utf-8") as fp:
|
|
content = json.loads(fp.read())
|
|
|
|
path = tmpdir.join("fcm_keyfile.json")
|
|
|
|
# Test that we fail to load if the following keys are missing:
|
|
for entry in (
|
|
"client_email",
|
|
"private_key_id",
|
|
"private_key",
|
|
"type",
|
|
"project_id",
|
|
):
|
|
|
|
# Ensure the key actually exists in our file
|
|
assert entry in content
|
|
|
|
# Create a copy of our content
|
|
content_copy = content.copy()
|
|
|
|
# Remove our entry we expect to validate against
|
|
del content_copy[entry]
|
|
assert entry not in content_copy
|
|
|
|
path.write(json.dumps(content_copy))
|
|
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(str(path)) is False
|
|
|
|
# Now write ourselves a bad JSON file
|
|
path.write("{")
|
|
oauth = GoogleOAuth()
|
|
assert oauth.load(str(path)) is False
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_priority_manager():
|
|
"""NotifyFCM() FCMPriorityManager() Testing."""
|
|
|
|
for mode in FCM_MODES:
|
|
for priority in FCM_PRIORITIES:
|
|
instance = FCMPriorityManager(mode, priority)
|
|
assert isinstance(instance.payload(), dict)
|
|
# Verify it's not empty
|
|
assert bool(instance)
|
|
assert instance.payload()
|
|
assert str(instance) == priority
|
|
|
|
# We do not have to set a priority
|
|
instance = FCMPriorityManager(mode)
|
|
assert isinstance(instance.payload(), dict)
|
|
|
|
# Dictionary is empty though
|
|
assert not bool(instance)
|
|
assert not instance.payload()
|
|
assert str(instance) == ""
|
|
|
|
with pytest.raises(TypeError):
|
|
instance = FCMPriorityManager(mode, "invalid")
|
|
|
|
with pytest.raises(TypeError):
|
|
instance = FCMPriorityManager("invald", "high")
|
|
|
|
# mode validation is done at the higher NotifyFCM() level so
|
|
# it is not tested here (not required)
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
def test_plugin_fcm_color_manager():
|
|
"""NotifyFCM() FCMColorManager() Testing."""
|
|
|
|
# No colors
|
|
instance = FCMColorManager("no")
|
|
assert bool(instance) is False
|
|
assert instance.get() is None
|
|
# We'll return that we are not defined
|
|
assert str(instance) == "no"
|
|
|
|
# Asset colors
|
|
instance = FCMColorManager("yes")
|
|
assert isinstance(instance.get(), str)
|
|
# Output: #rrggbb
|
|
assert len(instance.get()) == 7
|
|
# Starts with has symbol
|
|
assert instance.get()[0] == "#"
|
|
# We'll return that we are defined but using default configuration
|
|
assert str(instance) == "yes"
|
|
|
|
# We will be `true` because we can acquire a color based on what was
|
|
# passed in
|
|
assert bool(instance)
|
|
|
|
# Custom color
|
|
instance = FCMColorManager("#A2B3A4")
|
|
assert isinstance(instance.get(), str)
|
|
assert instance.get() == "#a2b3a4"
|
|
assert bool(instance)
|
|
# str() response does not include hashtag
|
|
assert str(instance) == "a2b3a4"
|
|
|
|
# Custom color (no hashtag)
|
|
instance = FCMColorManager("A2B3A4")
|
|
assert isinstance(instance.get(), str)
|
|
# Hashtag is always part of output
|
|
assert instance.get() == "#a2b3a4"
|
|
assert bool(instance)
|
|
# str() response does not include hashtag
|
|
assert str(instance) == "a2b3a4"
|
|
|
|
# Custom color (no hashtag) but only using 3 letter rgb values
|
|
instance = FCMColorManager("AC4")
|
|
assert isinstance(instance.get(), str)
|
|
# Hashtag is always part of output
|
|
assert instance.get() == "#aacc44"
|
|
assert bool(instance)
|
|
# str() response does not include hashtag
|
|
assert str(instance) == "aacc44"
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" in sys.modules,
|
|
reason="Requires that cryptography NOT be installed",
|
|
)
|
|
def test_plugin_fcm_cryptography_import_error():
|
|
"""NotifyFCM Cryptography loading failure."""
|
|
|
|
# Prepare a base keyfile reference to use
|
|
path = os.path.join(PRIVATE_KEYFILE_DIR, "service_account.json")
|
|
|
|
# Attempt to instantiate our object
|
|
obj = Apprise.instantiate(
|
|
f"fcm://mock-project-id/device/#topic/?keyfile={path!s}"
|
|
)
|
|
|
|
# It's not possible because our cryptography depedancy is missing
|
|
assert obj is None
|
|
|
|
|
|
@pytest.mark.skipif(
|
|
"cryptography" not in sys.modules, reason="Requires cryptography"
|
|
)
|
|
@mock.patch("requests.post")
|
|
def test_plugin_fcm_edge_cases(mock_post):
|
|
"""NotifyFCM() Edge Cases."""
|
|
|
|
# Prepare a good response
|
|
response = mock.Mock()
|
|
response.status_code = requests.codes.ok
|
|
mock_post.return_value = response
|
|
|
|
# this tests an edge case where verify if the data_kwargs is a dictionary
|
|
# or not. Below, we don't even define it, so it will be None (causing
|
|
# the check to go). We'll still correctly instantiate a plugin:
|
|
obj = NotifyFCM("project", "api:123", targets="device")
|
|
assert obj is not None
|