apprise/test/test_plugin_vapid.py

698 lines
23 KiB
Python

# -*- coding: utf-8 -*-
# BSD 2-Clause License
#
# Apprise - Push Notification Library.
# Copyright (c) 2025, Chris Caron <lead2gold@gmail.com>
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# 1. Redistributions of source code must retain the above copyright notice,
# this list of conditions and the following disclaimer.
#
# 2. Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
import os
import sys
import json
import requests
import pytest
from unittest import mock
from apprise.plugins.vapid.subscription import (
WebPushSubscription, WebPushSubscriptionManager)
from apprise.plugins.vapid import NotifyVapid
from apprise import exception, asset, url
from apprise.common import PersistentStoreMode
from apprise.utils.pem import ApprisePEMController
from helpers import AppriseURLTester
# Disable logging for a cleaner testing output
import logging
logging.disable(logging.CRITICAL)
# Attachment Directory
TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var')
# a test UUID we can use
SUBSCRIBER = 'user@example.com'
PLUGIN_ID = 'vapid'
# Our Testing URLs
apprise_url_tests = (
('vapid://', {
'instance': TypeError,
}),
('vapid://:@/', {
'instance': TypeError,
}),
('vapid://invalid-subscriber', {
# An invalid Subscriber
'instance': TypeError,
}),
('vapid://user@example.com', {
# bare bone requirements met, but we don't have our subscription file
# or our private key (pem)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com?keyfile=invalid&subfile=invalid', {
# Test passing keyfile and subfile on our path (even if invalid)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com/newuser@example.com', {
# we don't have our subscription file or private key
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.ca/newuser@example.ca', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.uk/newuser@example.uk', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.au/newuser@example.au', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions when this flag
# is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
@pytest.fixture
def patch_persistent_store_namespace(tmpdir):
"""
Force an easy to test environment
"""
with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \
mock.patch.object(
asset.AppriseAsset, 'storage_mode',
PersistentStoreMode.AUTO), \
mock.patch.object(
asset.AppriseAsset, 'storage_path', str(tmpdir)):
tmp_dir = tmpdir.mkdir(PLUGIN_ID)
# Return the directory name
yield str(tmp_dir)
@pytest.fixture
def subscription_reference():
return {
"user@example.com": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/default',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user1": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
"user2": {
"endpoint": 'https://fcm.googleapis.com/fcm/send/def456',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
},
}
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_urls():
"""
NotifyVapid() Apprise URLs - No Config
"""
# Run our general tests
AppriseURLTester(tests=apprise_url_tests).run_all()
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_urls_with_required_assets(
patch_persistent_store_namespace, subscription_reference):
"""
NotifyVapid() Apprise URLs With Config
"""
# Determine our store
pc = ApprisePEMController(path=patch_persistent_store_namespace)
assert pc.keygen() is True
# Write our subscriptions file to disk
subscription_file = os.path.join(
patch_persistent_store_namespace,
NotifyVapid.vapid_subscription_file)
with open(subscription_file, 'w') as f:
f.write(json.dumps(subscription_reference))
tests = (
('vapid://user@example.com', {
# user@example.com loaded (also used as subscriber id)
'instance': NotifyVapid,
}),
('vapid://user@example.com/newuser@example.com', {
# no newuser@example.com key entry
'instance': NotifyVapid,
'notify_response': False,
}),
('vapid://user@example.com/user1?to=user2', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user1?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://?to=user2&from=user@example.com', {
# No host provided
'instance': NotifyVapid,
}),
('vapid://user@example.com?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=15', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=invalid', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=-4000', {
# bad ttl
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=edge', {
# test mode
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&mode=', {
# test mode
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=invalid', {
# test mode more
'instance': TypeError,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions
# when this flag is set and tests that we gracfully handle them
'test_requests_exceptions': True,
}),
)
AppriseURLTester(tests=tests).run_all()
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_subscriptions(tmpdir):
"""
NotifyVapid() Subscriptions
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# Integer not supported
WebPushSubscription(42)
with pytest.raises(exception.AppriseInvalidData):
# Not the correct format
WebPushSubscription('bad-content')
with pytest.raises(exception.AppriseInvalidData):
# Invalid JSON
WebPushSubscription('{')
with pytest.raises(exception.AppriseInvalidData):
# Empty Dictionary
WebPushSubscription({})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 42,
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 42,
"auth": 'k9Xzm43nBGo=',
},
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
})
with pytest.raises(exception.AppriseInvalidData):
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {},
})
with pytest.raises(exception.AppriseInvalidData):
# Invalid p256dh public key provided
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=',
"auth": 'k9Xzm43nBGo=',
},
})
# An empty object
wps = WebPushSubscription()
assert bool(wps) is False
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == ''
assert wps.auth is None
assert wps.endpoint is None
assert wps.p256dh is None
assert wps.public_key is None
# We can't write anything as there is nothing loaded
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False
# A valid key
wps = WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
})
assert bool(wps) is True
assert isinstance(wps.json(), str)
assert json.loads(wps.json())
assert str(wps) == 'abc123'
assert wps.auth == 'k9Xzm43nBGo='
assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123'
assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0'
assert wps.public_key is not None
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Bad content
assert wps.write(object) is False
assert wps.write(None) is False
# Can't write to a name already taken by as a directory
assert wps.write(str(tmpdir0)) is False
# Can't write to a name already taken by as a directory
assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_plugin_vapid_subscriptions_without_c():
"""
NotifyVapid() Subscriptions (no Cryptography)
"""
with pytest.raises(exception.AppriseInvalidData):
# A valid key that can't be loaded because crytography is missing
WebPushSubscription({
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
})
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
def test_plugin_vapid_subscription_manager(tmpdir):
"""
NotifyVapid() Subscription Manager
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr['abc'] = 'invalid'
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr += 'invalid'
smgr = WebPushSubscriptionManager()
assert bool(smgr) is False
assert len(smgr) == 0
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
assert smgr.add(sub) is True
assert bool(smgr) is True
assert len(smgr) == 1
# Same sub (overwrites same slot)
smgr += sub
assert bool(smgr) is True
assert len(smgr) == 1
# This makes a copy
smgr['abc'] = smgr['abc123']
assert bool(smgr) is True
assert len(smgr) == 2
assert isinstance(smgr['abc123'], WebPushSubscription)
# Currently no files here
assert os.listdir(str(tmpdir0)) == []
# Write our content
assert smgr.write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
# Reset our object
smgr.clear()
assert bool(smgr) is False
assert len(smgr) == 0
# Load our content back
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 2
# Write over our file using the standard Subscription format
assert smgr['abc123'].write(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
# We can still open this type as well
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1
smgr.clear()
bad_entry = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'invalid',
"auth": 'garbage',
},
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
# A bad JSON file
f.write('{')
assert smgr.load(subscriptions) is False
with open(subscriptions, 'w', encoding='utf-8') as f:
# not expected dictionary
f.write('null')
assert smgr.load(subscriptions) is False
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_entry, f)
assert smgr.load(subscriptions) is False
# Create bad data
bad_data = {
'bad1': bad_entry,
'bad2': bad_entry,
'bad3': bad_entry,
'bad4': bad_entry,
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_data, f)
assert smgr.load(subscriptions) is False
assert smgr.load('invalid-file') is False
@pytest.mark.skipif(
'cryptography' not in sys.modules, reason="Requires cryptography")
@mock.patch('requests.post')
def test_plugin_vapid_initializations(mock_post, tmpdir):
"""
NotifyVapid() Initializations
"""
# Assign our mock object our return value
okay_response = requests.Request()
okay_response.status_code = requests.codes.ok
okay_response.content = ""
mock_post.return_value = okay_response
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
# Write our subfile
smgr = WebPushSubscriptionManager()
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
assert smgr.add(sub) is True
assert smgr.add(smgr['abc123']) is True
assert os.listdir(str(tmpdir0)) == []
with mock.patch('json.dump', side_effect=OSError):
# We will fial to write
assert smgr.write(subfile) is False
assert smgr.write(subfile) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
assert isinstance(smgr.json(), str)
_asset = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Auto-Key Generation
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
# Our subscription directory + our
# persistent store where our keys were generated
assert len(os.listdir(str(tmpdir0))) == 2
# Second call re-references keys previously generated
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile='/a/bad/path',
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# Detect our keyfile
cache_dir = [x for x in os.listdir(str(tmpdir0))
if not x.endswith('subscriptions.json')][0]
# Test fixed assignment to our keyfile
keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem')
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
# Invalid Keyfile
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=subfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir1 = tmpdir.mkdir('tmp01')
_asset2 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir1),
# Auto-gen our private/public key pair
pem_autogen=True,
)
assert os.listdir(str(tmpdir1)) == []
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
asset=_asset2)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
# We have a temporary subscription file we can use
assert os.listdir(str(tmpdir1)) == ['00088ad3']
# We will have a dud configuration file, but at least it's something
# to help the user with
assert obj.send('test') is False
# Second instance fails as well
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir2 = tmpdir.mkdir('tmp02')
_asset3 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir2),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Test invalid keyfile
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile='invalid-file',
subfile=subfile, asset=_asset3)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
@pytest.mark.skipif(
'cryptography' in sys.modules,
reason="Requires that cryptography NOT be installed")
def test_plugin_vapid_initializations_without_c(tmpdir):
"""
NotifyVapid() Initializations without cryptography
"""
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
# Write our subfile
smgr = WebPushSubscriptionManager()
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
assert smgr.add(sub) is False
_asset = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Auto-Key Generation
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)