test coverage and bulletproofing

pull/1323/head
Chris Caron 2025-05-24 17:31:06 -04:00
parent 5c44e48b64
commit b20c34bd6c
5 changed files with 307 additions and 41 deletions

View File

@ -273,6 +273,15 @@ class AppriseAttachment:
return attach_plugin
def sync(self, abort_on_error=True):
"""
Itereates over all of the attachments and retrieves them
if possible.
"""
# TODO: Change this to async for future
return next((False for a in self.attachments if not a), True) \
if abort_on_error else next((True for a in self.attachments), True)
def clear(self):
"""
Empties our attachment list

View File

@ -115,8 +115,14 @@ class NotifyVapid(NotifyBase):
# If it is more than this, then it is not accepted.
max_vapid_subfile_size = 5242880
# The maximum length of the body
body_maxlen = 1024
# The maximum length of the messge can be 4096
# just choosing a safe number below this to allow for padding and
# encryption
body_maxlen = 4000
# A title can not be used for SMS Messages. Setting this to zero will
# cause any title (if defined) to get placed into the message body.
title_maxlen = 0
# Our default is to no not use persistent storage beyond in-memory
# reference; this allows us to auto-generate our config if needed
@ -218,12 +224,11 @@ class NotifyVapid(NotifyBase):
# default subscriptions
self.subscriptions = {}
self.subscriptions_loaded = False
self.private_key_loaded = False
# Set our Time to Live Flag
if ttl is None:
self.ttl = self.template_args['ttl']['default']
else:
self.ttl = self.template_args['ttl']['default']
if ttl is not None:
try:
self.ttl = int(ttl)
@ -290,9 +295,8 @@ class NotifyVapid(NotifyBase):
self.subscriptions.write(self.subfile):
self.logger.info(
'Vapid auto-generated %s/%s',
os.path.basename(
self.store.path,
self.vapid_subscription_file))
os.path.basename(self.store.path),
self.vapid_subscription_file)
# Acquire our targets for parsing
self.targets = parse_list(targets)
@ -306,12 +310,18 @@ class NotifyVapid(NotifyBase):
"""
Perform Vapid Notification
"""
if not self.pem and not self.pem.load_private_key(self.keyfile):
if not self.private_key_loaded and ((
self.keyfile and not self.pem.private_key(
autogen=False, autodetect=False)
and not self.pem.load_private_key(self.keyfile))
or (not self.keyfile and not self.pem)):
self.logger.warning(
'Provided Vapid/WebPush (PEM) Private Key file could '
'not be loaded.')
self.private_key_loaded = True
return False
else:
self.private_key_loaded = True
if not self.targets:
# There is no one to notify; we're done
@ -332,26 +342,19 @@ class NotifyVapid(NotifyBase):
self.logger.warning('Vapid could not load subscriptions')
return False
if not self.pem:
if not self.pem.private_key(autogen=False, autodetect=False):
self.logger.warning(
'No Vapid/WebPush (PEM) Private Key file could be loaded.')
return False
# Prepare our notify URL (based on our mode)
notify_url = VAPID_API_LOOKUP[self.mode]
jwt_token = self.jwt_token
if not jwt_token:
self.logger.warning(
'A Vapid JWT Token could not be generated')
return False
headers = {
'User-Agent': self.app_id,
"TTL": str(self.ttl),
"Content-Encoding": "aes128gcm",
"Content-Type": "application/octet-stream",
"Authorization": f"vapid t={jwt_token}, k={self.public_key}",
"Authorization": f"vapid t={self.jwt_token}, k={self.public_key}",
}
has_error = False

View File

@ -79,7 +79,7 @@ class WebPushSubscription:
try:
content = json.loads(content)
except json.decoder.JSONDecodeError:
except (json.decoder.JSONDecodeError, TypeError, OSError):
# Bad data
return False
@ -263,7 +263,7 @@ class WebPushSubscriptionManager:
subscription = WebPushSubscription(subscription)
except AppriseInvalidData:
return True
return False
if name is None:
name = str(subscription)
@ -337,7 +337,7 @@ class WebPushSubscriptionManager:
# Enforce maximum file size
attach[0].max_file_size = byte_limit
if not path:
if not attach.sync():
return False
try:
@ -345,10 +345,14 @@ class WebPushSubscriptionManager:
with open(attach[0].path, 'r', encoding='utf-8') as f:
content = json.load(f)
except (TypeError, OSError):
except (json.decoder.JSONDecodeError, TypeError, OSError):
# Could not read
return False
if not isinstance(content, dict):
# Not a list of dictionaries
return False
# Verify if we're dealing with a single element:
# {
# "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...",

View File

@ -154,7 +154,7 @@ class ApprisePEMController:
self.__private_key = None
self.__public_key = None
if not self._prv_keyfile:
if not self._prv_keyfile and self._prv_keyfile.sync():
# Early exit
logger.error(
'Could not access PEM Private Key {}.'.format(path))
@ -168,8 +168,13 @@ class ApprisePEMController:
backend=default_backend()
)
except (ValueError, TypeError):
logger.debug(
'PEM Private Key file specified is not supported (%s)',
type(path))
return False
except FileNotFoundError:
# Generate keys
logger.debug('PEM Private Key file not found: %s', path)
return False
@ -220,7 +225,7 @@ class ApprisePEMController:
self.__private_key = None
self.__public_key = None
if not self._pub_keyfile:
if not self._pub_keyfile and self._pub_keyfile.sync():
# Early exit
logger.error(
'Could not access PEM Public Key {}.'.format(path))
@ -233,6 +238,12 @@ class ApprisePEMController:
backend=default_backend()
)
except (ValueError, TypeError):
logger.debug(
'PEM Public Key file specified is not supported (%s)',
type(path))
return False
except FileNotFoundError:
# Generate keys
logger.debug('PEM Public Key file not found: %s', path)
@ -433,12 +444,12 @@ class ApprisePEMController:
if os.path.isfile(os.path.join(self.path, fname))),
None)
def public_key(self, *names, autogen=None):
def public_key(self, *names, autogen=None, autodetect=True):
"""
Opens a spcified pem public file and returns the key from it which
is used to decrypt the message
"""
if self.__public_key:
if self.__public_key or not autodetect:
return self.__public_key
path = self.public_keyfile(*names)
@ -448,7 +459,7 @@ class ApprisePEMController:
path = self.public_keyfile(*names)
if path:
# We should get a hit now
return self.public_key(*names)
return self.public_key(autogen=False)
logger.warning('No PEM Public Key could be loaded')
return None
@ -459,12 +470,12 @@ class ApprisePEMController:
# public from)
self.private_key(names=names, autogen=autogen)) else None
def private_key(self, *names, autogen=None):
def private_key(self, *names, autogen=None, autodetect=True):
"""
Opens a spcified pem private file and returns the key from it which
is used to encrypt the message
"""
if self.__private_key:
if self.__private_key or not autodetect:
return self.__private_key
path = self.private_keyfile(*names)
@ -474,7 +485,7 @@ class ApprisePEMController:
path = self.private_keyfile(*names)
if path:
# We should get a hit now
return self.private_key(*names)
return self.private_key(autogen=False)
logger.warning('No PEM Private Key could be loaded')
return None
@ -737,4 +748,4 @@ class ApprisePEMController:
"""
Returns True if at least 1 key was loaded
"""
return True if (self.public_key() or self.private_key()) else False
return True if (self.private_key() or self.public_key()) else False

View File

@ -72,6 +72,13 @@ apprise_url_tests = (
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com?keyfile=invalid&subfile=invalid', {
# Test passing keyfile and subfile on our path (even if invalid)
'instance': NotifyVapid,
# We'll fail to respond because we would not have found any
# configuration to load
'notify_response': False,
}),
('vapid://user@example.com/newuser@example.com', {
# we don't have our subscription file or private key
'instance': NotifyVapid,
@ -187,19 +194,59 @@ def test_plugin_vapid_urls_with_required_assets(
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user@example.com/default', {
('vapid://user1?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://?to=user2&from=user@example.com', {
# No host provided
'instance': NotifyVapid,
}),
('vapid://user@example.com?to=user2&from=user@example.com', {
# We'll succesfully notify 2 users
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=15', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=invalid', {
# test ttl
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&ttl=-4000', {
# bad ttl
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=edge', {
# test mode
'instance': NotifyVapid,
}),
('vapid://user@example.com/user1?to=user2&mode=', {
# test mode
'instance': TypeError,
}),
('vapid://user@example.com/user1?to=user2&mode=invalid', {
# test mode more
'instance': TypeError,
}),
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# force a failure
'response': False,
'requests_response_code': requests.codes.internal_server_error,
}),
('vapid://user@example.com/newuser@example.uk', {
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# throw a bizzare code forcing us to fail to look it up
'response': False,
'requests_response_code': 999,
}),
('vapid://user@example.com/newuser@example.au', {
('vapid://user@example.com/user1', {
'instance': NotifyVapid,
# Throws a series of connection and transfer exceptions
# when this flag is set and tests that we gracfully handle them
@ -329,6 +376,16 @@ def test_plugin_vapid_subscription_manager(tmpdir):
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr['abc'] = 'invalid'
with pytest.raises(exception.AppriseInvalidData):
# An invalid object
smgr = WebPushSubscriptionManager()
smgr += 'invalid'
smgr = WebPushSubscriptionManager()
assert bool(smgr) is False
@ -352,10 +409,10 @@ def test_plugin_vapid_subscription_manager(tmpdir):
assert bool(smgr) is True
assert len(smgr) == 1
# indexed by value added
smgr['abc123'] = sub
# This makes a copy
smgr['abc'] = smgr['abc123']
assert bool(smgr) is True
assert len(smgr) == 1
assert len(smgr) == 2
assert isinstance(smgr['abc123'], WebPushSubscription)
@ -377,7 +434,7 @@ def test_plugin_vapid_subscription_manager(tmpdir):
assert smgr.load(
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1
assert len(smgr) == 2
# Write over our file using the standard Subscription format
assert smgr['abc123'].write(
@ -388,3 +445,185 @@ def test_plugin_vapid_subscription_manager(tmpdir):
os.path.join(str(tmpdir0), 'subscriptions.json')) is True
assert bool(smgr) is True
assert len(smgr) == 1
smgr.clear()
bad_entry = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'invalid',
"auth": 'garbage',
},
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
# A bad JSON file
f.write('{')
assert smgr.load(subscriptions) is False
with open(subscriptions, 'w', encoding='utf-8') as f:
# not expected dictionary
f.write('null')
assert smgr.load(subscriptions) is False
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_entry, f)
assert smgr.load(subscriptions) is False
# Create bad data
bad_data = {
'bad1': bad_entry,
'bad2': bad_entry,
'bad3': bad_entry,
'bad4': bad_entry,
}
subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json')
with open(subscriptions, 'w', encoding='utf-8') as f:
json.dump(bad_data, f)
assert smgr.load(subscriptions) is False
assert smgr.load('invalid-file') is False
@mock.patch('requests.post')
def test_plugin_vapid_initializations(mock_post, tmpdir):
"""
NotifyVapid() Initializations
"""
# Assign our mock object our return value
okay_response = requests.Request()
okay_response.status_code = requests.codes.ok
okay_response.content = ""
mock_post.return_value = okay_response
# Temporary directory
tmpdir0 = tmpdir.mkdir('tmp00')
# Write our subfile
smgr = WebPushSubscriptionManager()
sub = {
"endpoint": 'https://fcm.googleapis.com/fcm/send/abc123',
"keys": {
"p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO'
'5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0',
"auth": 'k9Xzm43nBGo=',
},
}
subfile = os.path.join(str(tmpdir0), 'subscriptions.json')
assert smgr.add(sub) is True
assert smgr.add(smgr['abc123']) is True
assert os.listdir(str(tmpdir0)) == []
with mock.patch('json.dump', side_effect=OSError):
# We will fial to write
assert smgr.write(subfile) is False
assert smgr.write(subfile) is True
assert os.listdir(str(tmpdir0)) == ['subscriptions.json']
assert isinstance(smgr.json(), str)
_asset = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir0),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Auto-Key Generation
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
# Our subscription directory + our
# persistent store where our keys were generated
assert len(os.listdir(str(tmpdir0))) == 2
# Second call re-references keys previously generated
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile=subfile,
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], subfile='/a/bad/path',
asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# Detect our keyfile
cache_dir = [x for x in os.listdir(str(tmpdir0))
if not x.endswith('subscriptions.json')][0]
# Test fixed assignment to our keyfile
keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem')
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is True
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is True
# Invalid Keyfile
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=subfile,
subfile=subfile, asset=_asset)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir1 = tmpdir.mkdir('tmp01')
_asset2 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir1),
# Auto-gen our private/public key pair
pem_autogen=True,
)
assert os.listdir(str(tmpdir1)) == []
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile=keyfile,
asset=_asset2)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
# We have a temporary subscription file we can use
assert os.listdir(str(tmpdir1)) == ['00088ad3']
# We will have a dud configuration file, but at least it's something
# to help the user with
assert obj.send('test') is False
# Second instance fails as well
assert obj.send('test') is False
# AutoGen Temporary directory
tmpdir2 = tmpdir.mkdir('tmp02')
_asset3 = asset.AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH,
storage_path=str(tmpdir2),
# Auto-gen our private/public key pair
pem_autogen=True,
)
# Test invalid keyfile
assert os.path.exists(keyfile)
obj = NotifyVapid(
'user@example.ca', targets=['abc123', ], keyfile='invalid-file',
subfile=subfile, asset=_asset3)
assert isinstance(obj, NotifyVapid)
assert isinstance(obj.url(), str)
assert obj.send('test') is False
# A second message makes no difference; what is loaded into memory is used
assert obj.send('test') is False