From b20c34bd6c46678927f265ee68875bc83f0b8c47 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Sat, 24 May 2025 17:31:06 -0400 Subject: [PATCH] test coverage and bulletproofing --- apprise/apprise_attachment.py | 9 + apprise/plugins/vapid/__init__.py | 43 +++-- apprise/plugins/vapid/subscription.py | 12 +- apprise/utils/pem.py | 31 +++- test/test_plugin_vapid.py | 253 +++++++++++++++++++++++++- 5 files changed, 307 insertions(+), 41 deletions(-) diff --git a/apprise/apprise_attachment.py b/apprise/apprise_attachment.py index 855b42d4..0883ed6e 100644 --- a/apprise/apprise_attachment.py +++ b/apprise/apprise_attachment.py @@ -273,6 +273,15 @@ class AppriseAttachment: return attach_plugin + def sync(self, abort_on_error=True): + """ + Itereates over all of the attachments and retrieves them + if possible. + """ + # TODO: Change this to async for future + return next((False for a in self.attachments if not a), True) \ + if abort_on_error else next((True for a in self.attachments), True) + def clear(self): """ Empties our attachment list diff --git a/apprise/plugins/vapid/__init__.py b/apprise/plugins/vapid/__init__.py index d22920af..6360801b 100644 --- a/apprise/plugins/vapid/__init__.py +++ b/apprise/plugins/vapid/__init__.py @@ -115,8 +115,14 @@ class NotifyVapid(NotifyBase): # If it is more than this, then it is not accepted. max_vapid_subfile_size = 5242880 - # The maximum length of the body - body_maxlen = 1024 + # The maximum length of the messge can be 4096 + # just choosing a safe number below this to allow for padding and + # encryption + body_maxlen = 4000 + + # A title can not be used for SMS Messages. Setting this to zero will + # cause any title (if defined) to get placed into the message body. + title_maxlen = 0 # Our default is to no not use persistent storage beyond in-memory # reference; this allows us to auto-generate our config if needed @@ -218,12 +224,11 @@ class NotifyVapid(NotifyBase): # default subscriptions self.subscriptions = {} self.subscriptions_loaded = False + self.private_key_loaded = False # Set our Time to Live Flag - if ttl is None: - self.ttl = self.template_args['ttl']['default'] - - else: + self.ttl = self.template_args['ttl']['default'] + if ttl is not None: try: self.ttl = int(ttl) @@ -290,9 +295,8 @@ class NotifyVapid(NotifyBase): self.subscriptions.write(self.subfile): self.logger.info( 'Vapid auto-generated %s/%s', - os.path.basename( - self.store.path, - self.vapid_subscription_file)) + os.path.basename(self.store.path), + self.vapid_subscription_file) # Acquire our targets for parsing self.targets = parse_list(targets) @@ -306,12 +310,18 @@ class NotifyVapid(NotifyBase): """ Perform Vapid Notification """ - - if not self.pem and not self.pem.load_private_key(self.keyfile): + if not self.private_key_loaded and (( + self.keyfile and not self.pem.private_key( + autogen=False, autodetect=False) + and not self.pem.load_private_key(self.keyfile)) + or (not self.keyfile and not self.pem)): self.logger.warning( 'Provided Vapid/WebPush (PEM) Private Key file could ' 'not be loaded.') + self.private_key_loaded = True return False + else: + self.private_key_loaded = True if not self.targets: # There is no one to notify; we're done @@ -332,26 +342,19 @@ class NotifyVapid(NotifyBase): self.logger.warning('Vapid could not load subscriptions') return False - if not self.pem: + if not self.pem.private_key(autogen=False, autodetect=False): self.logger.warning( 'No Vapid/WebPush (PEM) Private Key file could be loaded.') return False # Prepare our notify URL (based on our mode) notify_url = VAPID_API_LOOKUP[self.mode] - - jwt_token = self.jwt_token - if not jwt_token: - self.logger.warning( - 'A Vapid JWT Token could not be generated') - return False - headers = { 'User-Agent': self.app_id, "TTL": str(self.ttl), "Content-Encoding": "aes128gcm", "Content-Type": "application/octet-stream", - "Authorization": f"vapid t={jwt_token}, k={self.public_key}", + "Authorization": f"vapid t={self.jwt_token}, k={self.public_key}", } has_error = False diff --git a/apprise/plugins/vapid/subscription.py b/apprise/plugins/vapid/subscription.py index 50fe49ff..1c757740 100644 --- a/apprise/plugins/vapid/subscription.py +++ b/apprise/plugins/vapid/subscription.py @@ -79,7 +79,7 @@ class WebPushSubscription: try: content = json.loads(content) - except json.decoder.JSONDecodeError: + except (json.decoder.JSONDecodeError, TypeError, OSError): # Bad data return False @@ -263,7 +263,7 @@ class WebPushSubscriptionManager: subscription = WebPushSubscription(subscription) except AppriseInvalidData: - return True + return False if name is None: name = str(subscription) @@ -337,7 +337,7 @@ class WebPushSubscriptionManager: # Enforce maximum file size attach[0].max_file_size = byte_limit - if not path: + if not attach.sync(): return False try: @@ -345,10 +345,14 @@ class WebPushSubscriptionManager: with open(attach[0].path, 'r', encoding='utf-8') as f: content = json.load(f) - except (TypeError, OSError): + except (json.decoder.JSONDecodeError, TypeError, OSError): # Could not read return False + if not isinstance(content, dict): + # Not a list of dictionaries + return False + # Verify if we're dealing with a single element: # { # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", diff --git a/apprise/utils/pem.py b/apprise/utils/pem.py index 2c74f4e2..f9871221 100644 --- a/apprise/utils/pem.py +++ b/apprise/utils/pem.py @@ -154,7 +154,7 @@ class ApprisePEMController: self.__private_key = None self.__public_key = None - if not self._prv_keyfile: + if not self._prv_keyfile and self._prv_keyfile.sync(): # Early exit logger.error( 'Could not access PEM Private Key {}.'.format(path)) @@ -168,8 +168,13 @@ class ApprisePEMController: backend=default_backend() ) + except (ValueError, TypeError): + logger.debug( + 'PEM Private Key file specified is not supported (%s)', + type(path)) + return False + except FileNotFoundError: - # Generate keys logger.debug('PEM Private Key file not found: %s', path) return False @@ -220,7 +225,7 @@ class ApprisePEMController: self.__private_key = None self.__public_key = None - if not self._pub_keyfile: + if not self._pub_keyfile and self._pub_keyfile.sync(): # Early exit logger.error( 'Could not access PEM Public Key {}.'.format(path)) @@ -233,6 +238,12 @@ class ApprisePEMController: backend=default_backend() ) + except (ValueError, TypeError): + logger.debug( + 'PEM Public Key file specified is not supported (%s)', + type(path)) + return False + except FileNotFoundError: # Generate keys logger.debug('PEM Public Key file not found: %s', path) @@ -433,12 +444,12 @@ class ApprisePEMController: if os.path.isfile(os.path.join(self.path, fname))), None) - def public_key(self, *names, autogen=None): + def public_key(self, *names, autogen=None, autodetect=True): """ Opens a spcified pem public file and returns the key from it which is used to decrypt the message """ - if self.__public_key: + if self.__public_key or not autodetect: return self.__public_key path = self.public_keyfile(*names) @@ -448,7 +459,7 @@ class ApprisePEMController: path = self.public_keyfile(*names) if path: # We should get a hit now - return self.public_key(*names) + return self.public_key(autogen=False) logger.warning('No PEM Public Key could be loaded') return None @@ -459,12 +470,12 @@ class ApprisePEMController: # public from) self.private_key(names=names, autogen=autogen)) else None - def private_key(self, *names, autogen=None): + def private_key(self, *names, autogen=None, autodetect=True): """ Opens a spcified pem private file and returns the key from it which is used to encrypt the message """ - if self.__private_key: + if self.__private_key or not autodetect: return self.__private_key path = self.private_keyfile(*names) @@ -474,7 +485,7 @@ class ApprisePEMController: path = self.private_keyfile(*names) if path: # We should get a hit now - return self.private_key(*names) + return self.private_key(autogen=False) logger.warning('No PEM Private Key could be loaded') return None @@ -737,4 +748,4 @@ class ApprisePEMController: """ Returns True if at least 1 key was loaded """ - return True if (self.public_key() or self.private_key()) else False + return True if (self.private_key() or self.public_key()) else False diff --git a/test/test_plugin_vapid.py b/test/test_plugin_vapid.py index f5eca671..4fec96e9 100644 --- a/test/test_plugin_vapid.py +++ b/test/test_plugin_vapid.py @@ -72,6 +72,13 @@ apprise_url_tests = ( # configuration to load 'notify_response': False, }), + ('vapid://user@example.com?keyfile=invalid&subfile=invalid', { + # Test passing keyfile and subfile on our path (even if invalid) + 'instance': NotifyVapid, + # We'll fail to respond because we would not have found any + # configuration to load + 'notify_response': False, + }), ('vapid://user@example.com/newuser@example.com', { # we don't have our subscription file or private key 'instance': NotifyVapid, @@ -187,19 +194,59 @@ def test_plugin_vapid_urls_with_required_assets( # We'll succesfully notify 2 users 'instance': NotifyVapid, }), - ('vapid://user@example.com/default', { + ('vapid://user1?to=user2&from=user@example.com', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://?to=user2&from=user@example.com', { + # No host provided + 'instance': NotifyVapid, + }), + ('vapid://user@example.com?to=user2&from=user@example.com', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=15', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=invalid', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=-4000', { + # bad ttl + 'instance': TypeError, + }), + ('vapid://user@example.com/user1?to=user2&mode=edge', { + # test mode + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&mode=', { + # test mode + 'instance': TypeError, + }), + ('vapid://user@example.com/user1?to=user2&mode=invalid', { + # test mode more + 'instance': TypeError, + }), + ('vapid://user@example.com/user1', { 'instance': NotifyVapid, # force a failure 'response': False, 'requests_response_code': requests.codes.internal_server_error, }), - ('vapid://user@example.com/newuser@example.uk', { + ('vapid://user@example.com/user1', { 'instance': NotifyVapid, # throw a bizzare code forcing us to fail to look it up 'response': False, 'requests_response_code': 999, }), - ('vapid://user@example.com/newuser@example.au', { + ('vapid://user@example.com/user1', { 'instance': NotifyVapid, # Throws a series of connection and transfer exceptions # when this flag is set and tests that we gracfully handle them @@ -329,6 +376,16 @@ def test_plugin_vapid_subscription_manager(tmpdir): # Temporary directory tmpdir0 = tmpdir.mkdir('tmp00') + with pytest.raises(exception.AppriseInvalidData): + # An invalid object + smgr = WebPushSubscriptionManager() + smgr['abc'] = 'invalid' + + with pytest.raises(exception.AppriseInvalidData): + # An invalid object + smgr = WebPushSubscriptionManager() + smgr += 'invalid' + smgr = WebPushSubscriptionManager() assert bool(smgr) is False @@ -352,10 +409,10 @@ def test_plugin_vapid_subscription_manager(tmpdir): assert bool(smgr) is True assert len(smgr) == 1 - # indexed by value added - smgr['abc123'] = sub + # This makes a copy + smgr['abc'] = smgr['abc123'] assert bool(smgr) is True - assert len(smgr) == 1 + assert len(smgr) == 2 assert isinstance(smgr['abc123'], WebPushSubscription) @@ -377,7 +434,7 @@ def test_plugin_vapid_subscription_manager(tmpdir): assert smgr.load( os.path.join(str(tmpdir0), 'subscriptions.json')) is True assert bool(smgr) is True - assert len(smgr) == 1 + assert len(smgr) == 2 # Write over our file using the standard Subscription format assert smgr['abc123'].write( @@ -388,3 +445,185 @@ def test_plugin_vapid_subscription_manager(tmpdir): os.path.join(str(tmpdir0), 'subscriptions.json')) is True assert bool(smgr) is True assert len(smgr) == 1 + + smgr.clear() + bad_entry = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'invalid', + "auth": 'garbage', + }, + } + + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + # A bad JSON file + f.write('{') + assert smgr.load(subscriptions) is False + + with open(subscriptions, 'w', encoding='utf-8') as f: + # not expected dictionary + f.write('null') + assert smgr.load(subscriptions) is False + + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + json.dump(bad_entry, f) + assert smgr.load(subscriptions) is False + + # Create bad data + bad_data = { + 'bad1': bad_entry, + 'bad2': bad_entry, + 'bad3': bad_entry, + 'bad4': bad_entry, + } + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + json.dump(bad_data, f) + assert smgr.load(subscriptions) is False + assert smgr.load('invalid-file') is False + + +@mock.patch('requests.post') +def test_plugin_vapid_initializations(mock_post, tmpdir): + """ + NotifyVapid() Initializations + + """ + + # Assign our mock object our return value + okay_response = requests.Request() + okay_response.status_code = requests.codes.ok + okay_response.content = "" + mock_post.return_value = okay_response + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + # Write our subfile + smgr = WebPushSubscriptionManager() + sub = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + } + subfile = os.path.join(str(tmpdir0), 'subscriptions.json') + assert smgr.add(sub) is True + assert smgr.add(smgr['abc123']) is True + assert os.listdir(str(tmpdir0)) == [] + + with mock.patch('json.dump', side_effect=OSError): + # We will fial to write + assert smgr.write(subfile) is False + + assert smgr.write(subfile) is True + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + assert isinstance(smgr.json(), str) + + _asset = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + # Auto-Key Generation + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile=subfile, + asset=_asset) + assert isinstance(obj, NotifyVapid) + # Our subscription directory + our + # persistent store where our keys were generated + assert len(os.listdir(str(tmpdir0))) == 2 + + # Second call re-references keys previously generated + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile=subfile, + asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is True + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is True + + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile='/a/bad/path', + asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False + + # Detect our keyfile + cache_dir = [x for x in os.listdir(str(tmpdir0)) + if not x.endswith('subscriptions.json')][0] + + # Test fixed assignment to our keyfile + keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem') + assert os.path.exists(keyfile) + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=keyfile, + subfile=subfile, asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is True + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is True + + # Invalid Keyfile + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=subfile, + subfile=subfile, asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False + + # AutoGen Temporary directory + tmpdir1 = tmpdir.mkdir('tmp01') + _asset2 = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir1), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + assert os.listdir(str(tmpdir1)) == [] + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=keyfile, + asset=_asset2) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + # We have a temporary subscription file we can use + assert os.listdir(str(tmpdir1)) == ['00088ad3'] + # We will have a dud configuration file, but at least it's something + # to help the user with + assert obj.send('test') is False + # Second instance fails as well + assert obj.send('test') is False + + # AutoGen Temporary directory + tmpdir2 = tmpdir.mkdir('tmp02') + _asset3 = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir2), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + # Test invalid keyfile + assert os.path.exists(keyfile) + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile='invalid-file', + subfile=subfile, asset=_asset3) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False