diff --git a/apprise/apprise_attachment.py b/apprise/apprise_attachment.py index 0883ed6e..e5b5d262 100644 --- a/apprise/apprise_attachment.py +++ b/apprise/apprise_attachment.py @@ -273,14 +273,16 @@ class AppriseAttachment: return attach_plugin - def sync(self, abort_on_error=True): + def sync(self, abort_on_error=True, abort_if_empty=True): """ Itereates over all of the attachments and retrieves them - if possible. """ # TODO: Change this to async for future - return next((False for a in self.attachments if not a), True) \ - if abort_on_error else next((True for a in self.attachments), True) + + return False if abort_if_empty and not self.attachments else ( + next((False for a in self.attachments if not a), True) + if abort_on_error + else next((True for a in self.attachments), True)) def clear(self): """ diff --git a/apprise/utils/pem.py b/apprise/utils/pem.py index 2fea0faf..aaff8627 100644 --- a/apprise/utils/pem.py +++ b/apprise/utils/pem.py @@ -29,6 +29,7 @@ import os import json import base64 +import binascii import struct from typing import Union, Optional from ..utils.base64 import base64_urlencode, base64_urldecode @@ -43,6 +44,7 @@ try: from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.ciphers import ( Cipher, algorithms, modes) + from cryptography.exceptions import InvalidTag from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.serialization import ( @@ -160,7 +162,7 @@ class ApprisePEMController: self.__private_key = None self.__public_key = None - if not self._prv_keyfile and self._prv_keyfile.sync(): + if not self._prv_keyfile.sync(): # Early exit logger.error( 'Could not access PEM Private Key {}.'.format(path)) @@ -231,7 +233,7 @@ class ApprisePEMController: self.__private_key = None self.__public_key = None - if not self._pub_keyfile and self._pub_keyfile.sync(): + if not self._pub_keyfile.sync(): # Early exit logger.error( 'Could not access PEM Public Key {}.'.format(path)) @@ -263,7 +265,7 @@ class ApprisePEMController: # Load our private key return True if self.__public_key else False - def keygen(self, name=None, force=False): + def keygen(self, name: 'Optional[str]' = None, force: bool = False): """ Generates a set of keys based on name configured. """ @@ -273,10 +275,14 @@ class ApprisePEMController: logger.warning(msg) raise ApprisePEMException(msg) - if self._pub_keyfile or not self.path: + # Detect if a key has been loaded or not + has_key = True if self.private_key(autogen=False) \ + or self.public_key(autogen=False) else False + + if (has_key and not name) or not self.path: logger.trace( 'PEM keygen disabled, reason=%s', - 'keyfile-defined' if not self._pub_keyfile + 'keyfile-defined' if not has_key else 'no-write-path') return False @@ -306,11 +312,18 @@ class ApprisePEMController: pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem') prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem') - if os.path.isfile(pub_path) and not force: - logger.debug( - 'PEM generation skipped; Public Key already exists: %s', - pub_path) - return True + if not force: + if os.path.isfile(pub_path): + logger.debug( + 'PEM generation skipped; Public Key already exists: %s/%s', + os.path.dirname(pub_path), os.path.basename(pub_path)) + return False + + if os.path.isfile(prv_path): + logger.debug( + 'PEM generation skipped; Private Key already exists: %s%s', + os.path.dirname(prv_path), os.path.basename(prv_path)) + return False try: # Write our keys to disk @@ -353,6 +366,13 @@ class ApprisePEMController: return False + # Update our local file references + self._prv_keyfile = AppriseAttachment(asset=self.asset) + self._prv_keyfile.add(prv_path) + + self._pub_keyfile = AppriseAttachment(asset=self.asset) + self._pub_keyfile.add(pub_path) + logger.info( 'Wrote Public/Private PEM key pair for %s/%s', os.path.dirname(pub_path), @@ -656,7 +676,7 @@ class ApprisePEMController: ).decode('utf-8') def decrypt(self, - encrypted_payload: str, + encrypted_payload: Union[str, bytes], private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None, salt: Optional[bytes] = None) -> Optional[str]: """ @@ -672,12 +692,25 @@ class ApprisePEMController: raise ApprisePEMException(msg) # 1. Parse input - if isinstance(encrypted_payload, str): - payload_bytes = base64.b64decode(encrypted_payload.encode('utf-8')) - else: - payload_bytes = base64.b64decode(encrypted_payload) + try: + if isinstance(encrypted_payload, str): + payload_bytes = base64.b64decode( + encrypted_payload.encode('utf-8')) - payload = json.loads(payload_bytes.decode('utf-8')) + else: + payload_bytes = base64.b64decode(encrypted_payload) + + except binascii.Error: + # Bad Padding + logger.debug("Unparseable encrypted content provided") + return None + + try: + payload = json.loads(payload_bytes.decode('utf-8')) + + except UnicodeDecodeError: + logger.debug("Unparseable encrypted content provided") + return None ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"]) iv = base64_urldecode(payload["iv"]) @@ -714,7 +747,22 @@ class ApprisePEMController: modes.GCM(iv, tag), ).decryptor() - plaintext = decryptor.update(ciphertext) + decryptor.finalize() + try: + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + except InvalidTag: + logger.debug("Decryption failed - Authentication Mismatch") + # Reason for Error: + # - Mismatched or missing salt + # - Mismatched iv, tag, or ciphertext + # - Incorrect or corrupted ephemeral_pubkey + # - Wrong or incomplete key derivation + # - Data being altered between encryption and decryption + # (truncated/corrupted) + + # Basically if we get here, we tried to decrypt encrypted content + # using the wrong key. + return None # 7. Return decoded message return plaintext.decode('utf-8') diff --git a/test/test_attach_file.py b/test/test_attach_file.py index d1205671..370969d2 100644 --- a/test/test_attach_file.py +++ b/test/test_attach_file.py @@ -248,6 +248,12 @@ def test_attach_file(): # Test hosted configuration and that we can't add a valid file aa = AppriseAttachment(location=ContentLocation.HOSTED) + # No entries defined yet + assert bool(aa) is False + assert aa.sync() is False + + # Entry count does not impact sync if told to act that way + assert aa.sync(abort_if_empty=False) is True assert aa.add(path) is False response = AppriseAttachment.instantiate(path) diff --git a/test/test_utils_pem.py b/test/test_utils_pem.py index 788ffae2..c769d6a2 100644 --- a/test/test_utils_pem.py +++ b/test/test_utils_pem.py @@ -30,6 +30,7 @@ import logging import os import sys import pytest +from unittest import mock from apprise import AppriseAsset from apprise import PersistentStoreMode @@ -50,6 +51,9 @@ def test_utils_pem_general(tmpdir): """ + # string to manipulate/work with + unencrypted_str = "message" + tmpdir0 = tmpdir.mkdir('tmp00') # Currently no files here @@ -68,9 +72,11 @@ def test_utils_pem_general(tmpdir): assert pem_c.public_keyfile() is None assert pem_c.public_key() is None assert pem_c.x962_str == '' - assert pem_c.encrypt("message") is None + assert pem_c.decrypt(b'data') is None + assert pem_c.encrypt(unencrypted_str) is None # Keys can not be generated in memory mode assert pem_c.keygen() is False + assert pem_c.sign(b'data') is None asset = AppriseAsset( storage_mode=PersistentStoreMode.FLUSH, @@ -87,20 +93,175 @@ def test_utils_pem_general(tmpdir): assert pem_c.public_keyfile() is None assert pem_c.public_key() is None assert pem_c.x962_str == '' - assert pem_c.encrypt("message") is None + assert pem_c.encrypt(unencrypted_str) is None - # Keys can not be generated in memory mode + # Generate our keys + assert bool(pem_c) is False assert pem_c.keygen() is True + assert bool(pem_c) is True # We have 2 new key files generated - assert 'public_key.pem' in os.listdir(str(tmpdir0)) - assert 'private_key.pem' in os.listdir(str(tmpdir0)) + pub_keyfile = os.path.join(str(tmpdir0), 'public_key.pem') + prv_keyfile = os.path.join(str(tmpdir0), 'private_key.pem') + assert os.path.isfile(pub_keyfile) + assert os.path.isfile(prv_keyfile) assert pem_c.public_keyfile() is not None + assert pem_c.decrypt("garbage") is None assert pem_c.public_key() is not None + assert isinstance(pem_c.x962_str, str) assert len(pem_c.x962_str) > 20 - content = pem_c.encrypt("message") + content = pem_c.encrypt(unencrypted_str) + assert pem_c.decrypt(pem_c.encrypt(unencrypted_str.encode('utf-8'))) \ + == pem_c.decrypt(pem_c.encrypt(unencrypted_str)) + assert pem_c.decrypt(content) == unencrypted_str assert isinstance(content, str) - assert pem_c.decrypt(content) == "message" + assert pem_c.decrypt(content) == unencrypted_str + # support str as well + assert pem_c.decrypt(content) == unencrypted_str + assert pem_c.decrypt(content.encode('utf-8')) == unencrypted_str + # Sign test + assert isinstance(pem_c.sign(content.encode('utf-8')), bytes) + + # Web Push handling + webpush_content = pem_c.encrypt_webpush( + unencrypted_str, + public_key = pem_c.public_key(), + auth_secret = b'secret') + assert isinstance(webpush_content, bytes) + + # Non Bytes (garbage basically) + with pytest.raises(TypeError): + assert pem_c.decrypt(None) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(5) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(False) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(object) is None + + # Test our initialization + pem_c = utils.pem.ApprisePEMController( + path=None, + prv_keyfile='invalid', + asset=asset) + assert pem_c.private_keyfile() is False + assert pem_c.public_keyfile() is None + assert pem_c.prv_keyfile is False + assert pem_c.pub_keyfile is None + assert pem_c.private_key() is None + assert pem_c.public_key() is None + assert pem_c.decrypt(content) is None + + pem_c = utils.pem.ApprisePEMController( + path=None, + pub_keyfile='invalid', + asset=asset) + assert pem_c.private_keyfile() is None + assert pem_c.public_keyfile() is False + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile is False + assert pem_c.private_key() is None + assert pem_c.public_key() is None + assert pem_c.decrypt(content) is None + + pem_c = utils.pem.ApprisePEMController( + path=None, + prv_keyfile=prv_keyfile, + asset=asset) + assert pem_c.private_keyfile() == prv_keyfile + assert pem_c.public_keyfile() is None + assert pem_c.private_key() is not None + assert pem_c.prv_keyfile == prv_keyfile + assert pem_c.pub_keyfile is None + assert pem_c.public_key() is not None + assert pem_c.decrypt(content) == unencrypted_str + + pem_c = utils.pem.ApprisePEMController( + path=None, + pub_keyfile=pub_keyfile, + asset=asset) + assert pem_c.private_keyfile() is None + assert pem_c.public_keyfile() == pub_keyfile + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile == pub_keyfile + assert pem_c.private_key() is None + assert pem_c.public_key() is not None + assert pem_c.decrypt(content) is None + + # Test our path references + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_private_key(path=None) is True + assert pem_c.private_keyfile() == prv_keyfile + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile is None + assert pem_c.decrypt(content) == unencrypted_str + + # Generate a new key referencing another location + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + + # generate ourselves some keys + assert pem_c.keygen() is True + keygen_prv_file = pem_c.prv_keyfile + keygen_pub_file = pem_c.pub_keyfile + + # Remove 1 (but not both) + os.unlink(keygen_pub_file) + + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + # Private key was found, so this does not work + assert pem_c.keygen() is False + os.unlink(keygen_prv_file) + + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + # It works now + assert pem_c.keygen() is True + + with mock.patch('builtins.open', side_effect=OSError()): + assert pem_c.keygen(force=True) is False + with mock.patch('os.unlink', side_effect=OSError()): + assert pem_c.keygen(force=True) is False + + # Generate a new key referencing another location + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # We can't re-generate keys if ones already exist + assert pem_c.keygen() is False + # the keygen is the big difference here + assert pem_c.keygen(name='test') is True + # under the hood, a key is not regenerated (as one already exists) + assert pem_c.keygen(name='test') is False + # Generate it a second time by force + assert pem_c.keygen(name='test', force=True) is True + + assert pem_c.private_keyfile() == os.path.join( + str(tmpdir0), 'test-private_key.pem') + assert pem_c.public_keyfile() == os.path.join( + str(tmpdir0), 'test-public_key.pem') + assert pem_c.private_key() is not None + assert pem_c.public_key() is not None + assert pem_c.prv_keyfile == os.path.join( + str(tmpdir0), 'test-private_key.pem') + assert pem_c.pub_keyfile == os.path.join( + str(tmpdir0), 'test-public_key.pem') + # 'content' was generated using a different key and can not be + # decrypted + assert pem_c.decrypt(content) is None + + # Test Decryption files + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # Calling decrypt triggers underlining code to auto-load + assert pem_c.decrypt(content) == unencrypted_str + # Using a private key by path + assert pem_c.decrypt( + content, private_key=pem_c.private_key()) == unencrypted_str @pytest.mark.skipif(