improved test cases

pull/1323/head
Chris Caron 2025-06-01 18:54:27 -04:00
parent b4d8356565
commit 18ce62287c
4 changed files with 245 additions and 28 deletions

View File

@ -273,14 +273,16 @@ class AppriseAttachment:
return attach_plugin return attach_plugin
def sync(self, abort_on_error=True): def sync(self, abort_on_error=True, abort_if_empty=True):
""" """
Itereates over all of the attachments and retrieves them Itereates over all of the attachments and retrieves them
if possible.
""" """
# TODO: Change this to async for future # TODO: Change this to async for future
return next((False for a in self.attachments if not a), True) \
if abort_on_error else next((True for a in self.attachments), True) return False if abort_if_empty and not self.attachments else (
next((False for a in self.attachments if not a), True)
if abort_on_error
else next((True for a in self.attachments), True))
def clear(self): def clear(self):
""" """

View File

@ -29,6 +29,7 @@
import os import os
import json import json
import base64 import base64
import binascii
import struct import struct
from typing import Union, Optional from typing import Union, Optional
from ..utils.base64 import base64_urlencode, base64_urldecode from ..utils.base64 import base64_urlencode, base64_urldecode
@ -43,6 +44,7 @@ try:
from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives import serialization, hashes
from cryptography.hazmat.primitives.ciphers import ( from cryptography.hazmat.primitives.ciphers import (
Cipher, algorithms, modes) Cipher, algorithms, modes)
from cryptography.exceptions import InvalidTag
from cryptography.hazmat.primitives.ciphers.aead import AESGCM from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.asymmetric import ec from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.serialization import ( from cryptography.hazmat.primitives.serialization import (
@ -160,7 +162,7 @@ class ApprisePEMController:
self.__private_key = None self.__private_key = None
self.__public_key = None self.__public_key = None
if not self._prv_keyfile and self._prv_keyfile.sync(): if not self._prv_keyfile.sync():
# Early exit # Early exit
logger.error( logger.error(
'Could not access PEM Private Key {}.'.format(path)) 'Could not access PEM Private Key {}.'.format(path))
@ -231,7 +233,7 @@ class ApprisePEMController:
self.__private_key = None self.__private_key = None
self.__public_key = None self.__public_key = None
if not self._pub_keyfile and self._pub_keyfile.sync(): if not self._pub_keyfile.sync():
# Early exit # Early exit
logger.error( logger.error(
'Could not access PEM Public Key {}.'.format(path)) 'Could not access PEM Public Key {}.'.format(path))
@ -263,7 +265,7 @@ class ApprisePEMController:
# Load our private key # Load our private key
return True if self.__public_key else False return True if self.__public_key else False
def keygen(self, name=None, force=False): def keygen(self, name: 'Optional[str]' = None, force: bool = False):
""" """
Generates a set of keys based on name configured. Generates a set of keys based on name configured.
""" """
@ -273,10 +275,14 @@ class ApprisePEMController:
logger.warning(msg) logger.warning(msg)
raise ApprisePEMException(msg) raise ApprisePEMException(msg)
if self._pub_keyfile or not self.path: # Detect if a key has been loaded or not
has_key = True if self.private_key(autogen=False) \
or self.public_key(autogen=False) else False
if (has_key and not name) or not self.path:
logger.trace( logger.trace(
'PEM keygen disabled, reason=%s', 'PEM keygen disabled, reason=%s',
'keyfile-defined' if not self._pub_keyfile 'keyfile-defined' if not has_key
else 'no-write-path') else 'no-write-path')
return False return False
@ -306,11 +312,18 @@ class ApprisePEMController:
pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem') pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem')
prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem') prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem')
if os.path.isfile(pub_path) and not force: if not force:
logger.debug( if os.path.isfile(pub_path):
'PEM generation skipped; Public Key already exists: %s', logger.debug(
pub_path) 'PEM generation skipped; Public Key already exists: %s/%s',
return True os.path.dirname(pub_path), os.path.basename(pub_path))
return False
if os.path.isfile(prv_path):
logger.debug(
'PEM generation skipped; Private Key already exists: %s%s',
os.path.dirname(prv_path), os.path.basename(prv_path))
return False
try: try:
# Write our keys to disk # Write our keys to disk
@ -353,6 +366,13 @@ class ApprisePEMController:
return False return False
# Update our local file references
self._prv_keyfile = AppriseAttachment(asset=self.asset)
self._prv_keyfile.add(prv_path)
self._pub_keyfile = AppriseAttachment(asset=self.asset)
self._pub_keyfile.add(pub_path)
logger.info( logger.info(
'Wrote Public/Private PEM key pair for %s/%s', 'Wrote Public/Private PEM key pair for %s/%s',
os.path.dirname(pub_path), os.path.dirname(pub_path),
@ -656,7 +676,7 @@ class ApprisePEMController:
).decode('utf-8') ).decode('utf-8')
def decrypt(self, def decrypt(self,
encrypted_payload: str, encrypted_payload: Union[str, bytes],
private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None, private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None,
salt: Optional[bytes] = None) -> Optional[str]: salt: Optional[bytes] = None) -> Optional[str]:
""" """
@ -672,12 +692,25 @@ class ApprisePEMController:
raise ApprisePEMException(msg) raise ApprisePEMException(msg)
# 1. Parse input # 1. Parse input
if isinstance(encrypted_payload, str): try:
payload_bytes = base64.b64decode(encrypted_payload.encode('utf-8')) if isinstance(encrypted_payload, str):
else: payload_bytes = base64.b64decode(
payload_bytes = base64.b64decode(encrypted_payload) encrypted_payload.encode('utf-8'))
payload = json.loads(payload_bytes.decode('utf-8')) else:
payload_bytes = base64.b64decode(encrypted_payload)
except binascii.Error:
# Bad Padding
logger.debug("Unparseable encrypted content provided")
return None
try:
payload = json.loads(payload_bytes.decode('utf-8'))
except UnicodeDecodeError:
logger.debug("Unparseable encrypted content provided")
return None
ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"]) ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"])
iv = base64_urldecode(payload["iv"]) iv = base64_urldecode(payload["iv"])
@ -714,7 +747,22 @@ class ApprisePEMController:
modes.GCM(iv, tag), modes.GCM(iv, tag),
).decryptor() ).decryptor()
plaintext = decryptor.update(ciphertext) + decryptor.finalize() try:
plaintext = decryptor.update(ciphertext) + decryptor.finalize()
except InvalidTag:
logger.debug("Decryption failed - Authentication Mismatch")
# Reason for Error:
# - Mismatched or missing salt
# - Mismatched iv, tag, or ciphertext
# - Incorrect or corrupted ephemeral_pubkey
# - Wrong or incomplete key derivation
# - Data being altered between encryption and decryption
# (truncated/corrupted)
# Basically if we get here, we tried to decrypt encrypted content
# using the wrong key.
return None
# 7. Return decoded message # 7. Return decoded message
return plaintext.decode('utf-8') return plaintext.decode('utf-8')

View File

@ -248,6 +248,12 @@ def test_attach_file():
# Test hosted configuration and that we can't add a valid file # Test hosted configuration and that we can't add a valid file
aa = AppriseAttachment(location=ContentLocation.HOSTED) aa = AppriseAttachment(location=ContentLocation.HOSTED)
# No entries defined yet
assert bool(aa) is False
assert aa.sync() is False
# Entry count does not impact sync if told to act that way
assert aa.sync(abort_if_empty=False) is True
assert aa.add(path) is False assert aa.add(path) is False
response = AppriseAttachment.instantiate(path) response = AppriseAttachment.instantiate(path)

View File

@ -30,6 +30,7 @@ import logging
import os import os
import sys import sys
import pytest import pytest
from unittest import mock
from apprise import AppriseAsset from apprise import AppriseAsset
from apprise import PersistentStoreMode from apprise import PersistentStoreMode
@ -50,6 +51,9 @@ def test_utils_pem_general(tmpdir):
""" """
# string to manipulate/work with
unencrypted_str = "message"
tmpdir0 = tmpdir.mkdir('tmp00') tmpdir0 = tmpdir.mkdir('tmp00')
# Currently no files here # Currently no files here
@ -68,9 +72,11 @@ def test_utils_pem_general(tmpdir):
assert pem_c.public_keyfile() is None assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None assert pem_c.public_key() is None
assert pem_c.x962_str == '' assert pem_c.x962_str == ''
assert pem_c.encrypt("message") is None assert pem_c.decrypt(b'data') is None
assert pem_c.encrypt(unencrypted_str) is None
# Keys can not be generated in memory mode # Keys can not be generated in memory mode
assert pem_c.keygen() is False assert pem_c.keygen() is False
assert pem_c.sign(b'data') is None
asset = AppriseAsset( asset = AppriseAsset(
storage_mode=PersistentStoreMode.FLUSH, storage_mode=PersistentStoreMode.FLUSH,
@ -87,20 +93,175 @@ def test_utils_pem_general(tmpdir):
assert pem_c.public_keyfile() is None assert pem_c.public_keyfile() is None
assert pem_c.public_key() is None assert pem_c.public_key() is None
assert pem_c.x962_str == '' assert pem_c.x962_str == ''
assert pem_c.encrypt("message") is None assert pem_c.encrypt(unencrypted_str) is None
# Keys can not be generated in memory mode # Generate our keys
assert bool(pem_c) is False
assert pem_c.keygen() is True assert pem_c.keygen() is True
assert bool(pem_c) is True
# We have 2 new key files generated # We have 2 new key files generated
assert 'public_key.pem' in os.listdir(str(tmpdir0)) pub_keyfile = os.path.join(str(tmpdir0), 'public_key.pem')
assert 'private_key.pem' in os.listdir(str(tmpdir0)) prv_keyfile = os.path.join(str(tmpdir0), 'private_key.pem')
assert os.path.isfile(pub_keyfile)
assert os.path.isfile(prv_keyfile)
assert pem_c.public_keyfile() is not None assert pem_c.public_keyfile() is not None
assert pem_c.decrypt("garbage") is None
assert pem_c.public_key() is not None assert pem_c.public_key() is not None
assert isinstance(pem_c.x962_str, str)
assert len(pem_c.x962_str) > 20 assert len(pem_c.x962_str) > 20
content = pem_c.encrypt("message") content = pem_c.encrypt(unencrypted_str)
assert pem_c.decrypt(pem_c.encrypt(unencrypted_str.encode('utf-8'))) \
== pem_c.decrypt(pem_c.encrypt(unencrypted_str))
assert pem_c.decrypt(content) == unencrypted_str
assert isinstance(content, str) assert isinstance(content, str)
assert pem_c.decrypt(content) == "message" assert pem_c.decrypt(content) == unencrypted_str
# support str as well
assert pem_c.decrypt(content) == unencrypted_str
assert pem_c.decrypt(content.encode('utf-8')) == unencrypted_str
# Sign test
assert isinstance(pem_c.sign(content.encode('utf-8')), bytes)
# Web Push handling
webpush_content = pem_c.encrypt_webpush(
unencrypted_str,
public_key = pem_c.public_key(),
auth_secret = b'secret')
assert isinstance(webpush_content, bytes)
# Non Bytes (garbage basically)
with pytest.raises(TypeError):
assert pem_c.decrypt(None) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(5) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(False) is None
with pytest.raises(TypeError):
assert pem_c.decrypt(object) is None
# Test our initialization
pem_c = utils.pem.ApprisePEMController(
path=None,
prv_keyfile='invalid',
asset=asset)
assert pem_c.private_keyfile() is False
assert pem_c.public_keyfile() is None
assert pem_c.prv_keyfile is False
assert pem_c.pub_keyfile is None
assert pem_c.private_key() is None
assert pem_c.public_key() is None
assert pem_c.decrypt(content) is None
pem_c = utils.pem.ApprisePEMController(
path=None,
pub_keyfile='invalid',
asset=asset)
assert pem_c.private_keyfile() is None
assert pem_c.public_keyfile() is False
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile is False
assert pem_c.private_key() is None
assert pem_c.public_key() is None
assert pem_c.decrypt(content) is None
pem_c = utils.pem.ApprisePEMController(
path=None,
prv_keyfile=prv_keyfile,
asset=asset)
assert pem_c.private_keyfile() == prv_keyfile
assert pem_c.public_keyfile() is None
assert pem_c.private_key() is not None
assert pem_c.prv_keyfile == prv_keyfile
assert pem_c.pub_keyfile is None
assert pem_c.public_key() is not None
assert pem_c.decrypt(content) == unencrypted_str
pem_c = utils.pem.ApprisePEMController(
path=None,
pub_keyfile=pub_keyfile,
asset=asset)
assert pem_c.private_keyfile() is None
assert pem_c.public_keyfile() == pub_keyfile
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile == pub_keyfile
assert pem_c.private_key() is None
assert pem_c.public_key() is not None
assert pem_c.decrypt(content) is None
# Test our path references
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
assert pem_c.load_private_key(path=None) is True
assert pem_c.private_keyfile() == prv_keyfile
assert pem_c.prv_keyfile is None
assert pem_c.pub_keyfile is None
assert pem_c.decrypt(content) == unencrypted_str
# Generate a new key referencing another location
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# generate ourselves some keys
assert pem_c.keygen() is True
keygen_prv_file = pem_c.prv_keyfile
keygen_pub_file = pem_c.pub_keyfile
# Remove 1 (but not both)
os.unlink(keygen_pub_file)
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# Private key was found, so this does not work
assert pem_c.keygen() is False
os.unlink(keygen_prv_file)
pem_c = utils.pem.ApprisePEMController(
name='keygen-tests',
path=str(tmpdir0), asset=asset)
# It works now
assert pem_c.keygen() is True
with mock.patch('builtins.open', side_effect=OSError()):
assert pem_c.keygen(force=True) is False
with mock.patch('os.unlink', side_effect=OSError()):
assert pem_c.keygen(force=True) is False
# Generate a new key referencing another location
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# We can't re-generate keys if ones already exist
assert pem_c.keygen() is False
# the keygen is the big difference here
assert pem_c.keygen(name='test') is True
# under the hood, a key is not regenerated (as one already exists)
assert pem_c.keygen(name='test') is False
# Generate it a second time by force
assert pem_c.keygen(name='test', force=True) is True
assert pem_c.private_keyfile() == os.path.join(
str(tmpdir0), 'test-private_key.pem')
assert pem_c.public_keyfile() == os.path.join(
str(tmpdir0), 'test-public_key.pem')
assert pem_c.private_key() is not None
assert pem_c.public_key() is not None
assert pem_c.prv_keyfile == os.path.join(
str(tmpdir0), 'test-private_key.pem')
assert pem_c.pub_keyfile == os.path.join(
str(tmpdir0), 'test-public_key.pem')
# 'content' was generated using a different key and can not be
# decrypted
assert pem_c.decrypt(content) is None
# Test Decryption files
pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset)
# Calling decrypt triggers underlining code to auto-load
assert pem_c.decrypt(content) == unencrypted_str
# Using a private key by path
assert pem_c.decrypt(
content, private_key=pem_c.private_key()) == unencrypted_str
@pytest.mark.skipif( @pytest.mark.skipif(