From b2e4b921ea95f2108d8ea7b572bc80e28493c033 Mon Sep 17 00:00:00 2001 From: Chris Caron Date: Mon, 30 Jun 2025 04:00:41 +0200 Subject: [PATCH] Vapid/WebPush Support (#1323) --- .coveragerc | 1 - KEYWORDS | 2 + README.md | 1 + apprise/apprise_attachment.py | 11 + apprise/asset.py | 6 + apprise/exception.py | 8 + apprise/plugins/vapid/__init__.py | 590 ++++++++++++++++++ apprise/plugins/vapid/subscription.py | 431 ++++++++++++++ apprise/utils/base64.py | 27 + apprise/utils/pem.py | 825 ++++++++++++++++++++++++++ packaging/redhat/python-apprise.spec | 6 +- test/test_apprise_utils.py | 24 + test/test_attach_file.py | 6 + test/test_plugin_vapid.py | 697 ++++++++++++++++++++++ test/test_utils_pem.py | 562 ++++++++++++++++++ 15 files changed, 3193 insertions(+), 4 deletions(-) create mode 100644 apprise/plugins/vapid/__init__.py create mode 100644 apprise/plugins/vapid/subscription.py create mode 100644 apprise/utils/pem.py create mode 100644 test/test_plugin_vapid.py create mode 100644 test/test_utils_pem.py diff --git a/.coveragerc b/.coveragerc index a739da1b..ed09dda5 100644 --- a/.coveragerc +++ b/.coveragerc @@ -15,4 +15,3 @@ source = show_missing = True skip_covered = True skip_empty = True -fail_under = 95.0 diff --git a/KEYWORDS b/KEYWORDS index 52ca10a2..69a8502d 100644 --- a/KEYWORDS +++ b/KEYWORDS @@ -109,9 +109,11 @@ Threema Gateway Twilio Twist Twitter +Vapid VictorOps Voipms Vonage +Webpush Webex WeCom Bot WhatsApp diff --git a/README.md b/README.md index 6059039e..3d111c97 100644 --- a/README.md +++ b/README.md @@ -132,6 +132,7 @@ The table below identifies the services this tool supports and some example serv | [Telegram](https://github.com/caronc/apprise/wiki/Notify_telegram) | tgram:// | (TCP) 443 | tgram://bottoken/ChatID
tgram://bottoken/ChatID1/ChatID2/ChatIDN | [Twitter](https://github.com/caronc/apprise/wiki/Notify_twitter) | twitter:// | (TCP) 443 | twitter://CKey/CSecret/AKey/ASecret
twitter://user@CKey/CSecret/AKey/ASecret
twitter://CKey/CSecret/AKey/ASecret/User1/User2/User2
twitter://CKey/CSecret/AKey/ASecret?mode=tweet | [Twist](https://github.com/caronc/apprise/wiki/Notify_twist) | twist:// | (TCP) 443 | twist://pasword:login
twist://password:login/#channel
twist://password:login/#team:channel
twist://password:login/#team:channel1/channel2/#team3:channel +| [Vapid (WebPush)](https://github.com/caronc/apprise/wiki/Notify_vapid) | vapid:// | (TCP) 443 | vapid://subscriber/target
vapid://subscriber/target?subfile=path&keyfile=path | [Webex Teams (Cisco)](https://github.com/caronc/apprise/wiki/Notify_wxteams) | wxteams:// | (TCP) 443 | wxteams://Token | [WeCom Bot](https://github.com/caronc/apprise/wiki/Notify_wecombot) | wecombot:// | (TCP) 443 | wecombot://BotKey | [WhatsApp](https://github.com/caronc/apprise/wiki/Notify_whatsapp) | whatsapp:// | (TCP) 443 | whatsapp://AccessToken@FromPhoneID/ToPhoneNo
whatsapp://Template:AccessToken@FromPhoneID/ToPhoneNo diff --git a/apprise/apprise_attachment.py b/apprise/apprise_attachment.py index 855b42d4..e5b5d262 100644 --- a/apprise/apprise_attachment.py +++ b/apprise/apprise_attachment.py @@ -273,6 +273,17 @@ class AppriseAttachment: return attach_plugin + def sync(self, abort_on_error=True, abort_if_empty=True): + """ + Itereates over all of the attachments and retrieves them + """ + # TODO: Change this to async for future + + return False if abort_if_empty and not self.attachments else ( + next((False for a in self.attachments if not a), True) + if abort_on_error + else next((True for a in self.attachments), True)) + def clear(self): """ Empties our attachment list diff --git a/apprise/asset.py b/apprise/asset.py index aef90f08..f9399a77 100644 --- a/apprise/asset.py +++ b/apprise/asset.py @@ -149,6 +149,12 @@ class AppriseAsset: # if Persistent Storage was set to `memory` pgp_autogen = True + # Automatically generate our Privacy Enhanced Mail (PEM) keys if one isn't + # present and our environment configuration allows for it. + # For example, a case where the environment wouldn't allow for it would be + # if Persistent Storage was set to `memory` + pem_autogen = True + # For more detail see CWE-312 @ # https://cwe.mitre.org/data/definitions/312.html # diff --git a/apprise/exception.py b/apprise/exception.py index 216e5cc7..8d7f86eb 100644 --- a/apprise/exception.py +++ b/apprise/exception.py @@ -53,6 +53,14 @@ class AppriseDiskIOError(AppriseException): super().__init__(message, error_code=error_code) +class AppriseInvalidData(AppriseException): + """ + Thrown when bad data was passed into an internal function + """ + def __init__(self, message, error_code=errno.EINVAL): + super().__init__(message, error_code=error_code) + + class AppriseFileNotFound(AppriseDiskIOError, FileNotFoundError): """ Thrown when a persistent write occured in MEMORY mode diff --git a/apprise/plugins/vapid/__init__.py b/apprise/plugins/vapid/__init__.py new file mode 100644 index 00000000..83d30695 --- /dev/null +++ b/apprise/plugins/vapid/__init__.py @@ -0,0 +1,590 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import requests +from itertools import chain +from json import dumps +from . import subscription +from ..base import NotifyBase +from ...common import NotifyType +from ...common import NotifyImageSize +from ...common import PersistentStoreMode +from ...utils.parse import parse_list, parse_bool, is_email +from ...utils.base64 import base64_urlencode +from ...utils import pem as _pem +from ...locale import gettext_lazy as _ +import time + + +class VapidPushMode: + """ + Supported Vapid Push Services + """ + CHROME = 'chrome' + FIREFOX = 'firefox' + EDGE = 'edge' + OPERA = 'opera' + APPLE = 'apple' + SAMSUNG = 'samsung' + BRAVE = 'brave' + GENERIC = 'generic' + + +VAPID_API_LOOKUP = { + VapidPushMode.CHROME: + 'https://fcm.googleapis.com/fcm/send', + VapidPushMode.FIREFOX: + 'https://updates.push.services.mozilla.com/wpush/v1', + VapidPushMode.EDGE: + 'https://fcm.googleapis.com/fcm/send', # Edge uses FCM too + VapidPushMode.OPERA: + 'https://fcm.googleapis.com/fcm/send', # Opera is Chromium-based + VapidPushMode.APPLE: + 'https://web.push.apple.com', # Apple Web Push base endpoint + VapidPushMode.BRAVE: + 'https://fcm.googleapis.com/fcm/send', + VapidPushMode.SAMSUNG: + 'https://fcm.googleapis.com/fcm/send', + VapidPushMode.GENERIC: + 'https://fcm.googleapis.com/fcm/send', +} + +VAPID_PUSH_MODES = ( + VapidPushMode.CHROME, + VapidPushMode.FIREFOX, + VapidPushMode.EDGE, + VapidPushMode.OPERA, + VapidPushMode.APPLE, +) + + +class NotifyVapid(NotifyBase): + """ + A wrapper for WebPush/Vapid notifications + """ + # Set our global enabled flag + enabled = subscription.CRYPTOGRAPHY_SUPPORT and _pem.PEM_SUPPORT + + requirements = { + # Define our required packaging in order to work + 'packages_required': 'cryptography' + } + + # The default descriptive name associated with the Notification + service_name = 'Vapid Web Push Notifications' + + # The services URL + service_url = \ + 'https://datatracker.ietf.org/doc/html/draft-thomson-webpush-vapid' + + # The default protocol + secure_protocol = 'vapid' + + # A URL that takes you to the setup/help of the specific protocol + setup_url = 'https://github.com/caronc/apprise/wiki/Notify_vapid' + + # There is no reason we should exceed 5KB when reading in a PEM file. + # If it is more than this, then it is not accepted. + max_vapid_keyfile_size = 5000 + + # There is no reason we should exceed 5MB when reading in a JSON file. + # If it is more than this, then it is not accepted. + max_vapid_subfile_size = 5242880 + + # The maximum length of the messge can be 4096 + # just choosing a safe number below this to allow for padding and + # encryption + body_maxlen = 4000 + + # A title can not be used for SMS Messages. Setting this to zero will + # cause any title (if defined) to get placed into the message body. + title_maxlen = 0 + + # Our default is to no not use persistent storage beyond in-memory + # reference; this allows us to auto-generate our config if needed + storage_mode = PersistentStoreMode.AUTO + + # 43200 = 12 hours + vapid_jwt_expiration_sec = 43200 + + # Subscription file + vapid_subscription_file = 'subscriptions.json' + + # Allows the user to specify the NotifyImageSize object + image_size = NotifyImageSize.XY_72 + + # Define object templates + templates = ( + '{schema}://{subscriber}', + '{schema}://{subscriber}/{targets}', + ) + + # Define our template tokens + template_tokens = dict(NotifyBase.template_tokens, **{ + 'subscriber': { + 'name': _('API Key'), + 'type': 'string', + 'private': True, + 'required': True, + }, + 'targets': { + 'name': _('Targets'), + 'type': 'list:string', + }, + }) + + # Define our template args + template_args = dict(NotifyBase.template_tokens, **{ + 'mode': { + 'name': _('Mode'), + 'type': 'choice:string', + 'values': VAPID_PUSH_MODES, + 'default': VAPID_PUSH_MODES[0], + 'map_to': 'mode', + }, + # Default Time To Live (defined in seconds) + # 0 (Zero) - message will be delivered only if the device is reacheable + 'ttl': { + 'name': _('ttl'), + 'type': 'int', + 'default': 0, + 'min': 0, + 'max': 60, + }, + 'to': { + 'alias_of': 'targets', + }, + 'from': { + 'alias_of': 'subscriber', + }, + 'keyfile': { + # A Private Keyfile is required to sign header + 'name': _('PEM Private KeyFile'), + 'type': 'string', + 'private': True, + }, + 'subfile': { + # A Subscripion File is required to sign header + 'name': _('Subscripion File'), + 'type': 'string', + 'private': True, + }, + 'image': { + 'name': _('Include Image'), + 'type': 'bool', + 'default': True, + 'map_to': 'include_image', + }, + }) + + def __init__(self, subscriber, mode=None, targets=None, keyfile=None, + subfile=None, include_image=None, ttl=None, **kwargs): + """ + Initialize Vapid Messaging + + """ + super().__init__(**kwargs) + + # Path to our Private Key file + self.keyfile = None + + # Path to our subscription.json file + self.subfile = None + + # + # Our Targets + # + self.targets = [] + self._invalid_targets = [] + + # default subscriptions + self.subscriptions = {} + self.subscriptions_loaded = False + self.private_key_loaded = False + + # Set our Time to Live Flag + self.ttl = self.template_args['ttl']['default'] + if ttl is not None: + try: + self.ttl = int(ttl) + + except (ValueError, TypeError): + # Do nothing + pass + + if self.ttl < self.template_args['ttl']['min'] or \ + self.ttl > self.template_args['ttl']['max']: + msg = 'The Vapid TTL specified ({}) is out of range.'\ + .format(self.ttl) + self.logger.warning(msg) + raise TypeError(msg) + + # Place a thumbnail image inline with the message body + self.include_image = \ + self.template_args['image']['default'] \ + if include_image is None else include_image + + result = is_email(subscriber) + if not result: + msg = 'An invalid Vapid Subscriber' \ + '({}) was specified.'.format(subscriber) + self.logger.warning(msg) + raise TypeError(msg) + self.subscriber = result['full_email'] + + # Store our Mode/service + try: + self.mode = \ + NotifyVapid.template_args['mode']['default'] \ + if mode is None else mode.lower() + + if self.mode not in VAPID_PUSH_MODES: + # allow the outer except to handle this common response + raise + except: + # Invalid region specified + msg = 'The Vapid mode specified ({}) is invalid.' \ + .format(mode) + self.logger.warning(msg) + raise TypeError(msg) + + # Our Private keyfile + self.keyfile = keyfile + + # Our Subscription file + self.subfile = subfile + + # Prepare our PEM Object + self.pem = _pem.ApprisePEMController(self.store.path, asset=self.asset) + + # Create our subscription object + self.subscriptions = subscription.WebPushSubscriptionManager( + asset=self.asset) + + if self.subfile is None and \ + self.store.mode != PersistentStoreMode.MEMORY and \ + self.asset.pem_autogen: + + self.subfile = os.path.join( + self.store.path, self.vapid_subscription_file) + if not os.path.exists(self.subfile) and \ + self.subscriptions.write(self.subfile): + self.logger.info( + 'Vapid auto-generated %s/%s', + os.path.basename(self.store.path), + self.vapid_subscription_file) + + # Acquire our targets for parsing + self.targets = parse_list(targets) + if not self.targets: + # Add ourselves + self.targets.append(self.subscriber) + + return + + def send(self, body, title='', notify_type=NotifyType.INFO, **kwargs): + """ + Perform Vapid Notification + """ + if not self.private_key_loaded and (( + self.keyfile and not self.pem.private_key( + autogen=False, autodetect=False) + and not self.pem.load_private_key(self.keyfile)) + or (not self.keyfile and not self.pem)): + self.logger.warning( + 'Provided Vapid/WebPush (PEM) Private Key file could ' + 'not be loaded.') + self.private_key_loaded = True + return False + else: + self.private_key_loaded = True + + if not self.targets: + # There is no one to notify; we're done + self.logger.warning('There are no Vapid targets to notify') + return False + + if not self.subscriptions_loaded and self.subfile: + # Toggle our loaded flag to prevent trying again later + self.subscriptions_loaded = True + if not self.subscriptions.load( + self.subfile, byte_limit=self.max_vapid_subfile_size): + self.logger.warning( + 'Provided Vapid/WebPush subscriptions file could not be ' + 'loaded.') + return False + + if not self.subscriptions: + self.logger.warning('Vapid could not load subscriptions') + return False + + if not self.pem.private_key(autogen=False, autodetect=False): + self.logger.warning( + 'No Vapid/WebPush (PEM) Private Key file could be loaded.') + return False + + # Prepare our notify URL (based on our mode) + notify_url = VAPID_API_LOOKUP[self.mode] + headers = { + 'User-Agent': self.app_id, + "TTL": str(self.ttl), + "Content-Encoding": "aes128gcm", + "Content-Type": "application/octet-stream", + "Authorization": f"vapid t={self.jwt_token}, k={self.public_key}", + } + + has_error = False + + # Create a copy of the targets list + targets = list(self.targets) + while len(targets): + target = targets.pop(0) + if target not in self.subscriptions: + self.logger.warning( + 'Dropped Vapid user ' + '(%s) specified - not found in subscriptions.json.' % + target, + ) + # Save ourselves from doing this again + self._invalid_targets.append(target) + self.targets.remove(target) + has_error = True + continue + + # Encrypt our payload + encrypted_payload = self.pem.encrypt_webpush( + body, + public_key=self.subscriptions[target].public_key, + auth_secret=self.subscriptions[target].auth_secret) + + self.logger.debug( + 'Vapid %s POST URL: %s (cert_verify=%r)', + self.mode, notify_url, self.verify_certificate, + ) + self.logger.debug( + 'Vapid %s Encrypted Payload: %d byte(s)', self.mode, len(body)) + + # Always call throttle before any remote server i/o is made + self.throttle() + try: + r = requests.post( + notify_url, + data=encrypted_payload, + headers=headers, + verify=self.verify_certificate, + timeout=self.request_timeout, + ) + if r.status_code not in ( + requests.codes.ok, requests.codes.no_content): + # We had a problem + status_str = \ + NotifyBase.http_response_code_lookup(r.status_code) + + self.logger.warning( + 'Failed to send {} Vapid notification: ' + '{}{}error={}.'.format( + self.mode, + status_str, + ', ' if status_str else '', + r.status_code)) + + self.logger.debug( + 'Response Details:\r\n%s', r.content) + + has_error = True + + else: + self.logger.info('Sent %s Vapid notification.', self.mode) + + except requests.RequestException as e: + self.logger.warning( + 'A Connection error occurred sending Vapid ' + 'notification.' + ) + self.logger.debug('Socket Exception: %s', str(e)) + + has_error = True + + return not has_error + + @property + def url_identifier(self): + """ + Returns all of the identifiers that make this URL unique from + another simliar one. Targets or end points should never be identified + here. + """ + return (self.secure_protocol, self.mode, self.subscriber) + + def url(self, privacy=False, *args, **kwargs): + """ + Returns the URL built dynamically based on specified arguments. + """ + + # Define any URL parameters + params = { + 'mode': self.mode, + 'ttl': str(self.ttl), + } + + if self.keyfile: + # Include our keyfile if specified + params['keyfile'] = self.keyfile + + if self.subfile: + # Include our subfile if specified + params['subfile'] = self.subfile + + # Extend our parameters + params.update(self.url_parameters(privacy=privacy, *args, **kwargs)) + + targets = self.targets if not ( + self.targets == 1 and + self.targets[0].lower() == self.subscriber.lower()) else [] + return '{schema}://{subscriber}/{targets}?{params}'.format( + schema=self.secure_protocol, + subscriber=NotifyVapid.quote(self.subscriber, safe='@'), + targets='/'.join(chain( + [str(t) for t in targets], + [NotifyVapid.quote(x, safe='@') + for x in self._invalid_targets])), + params=NotifyVapid.urlencode(params), + ) + + def __len__(self): + """ + Returns the number of targets associated with this notification + """ + targets = len(self.targets) + return targets if targets else 1 + + @staticmethod + def parse_url(url): + """ + Parses the URL and returns enough arguments that can allow + us to re-instantiate this object. + + """ + results = NotifyBase.parse_url(url, verify_host=False) + if not results: + # We're done early as we couldn't load the results + return results + + # Prepare our targets + results['targets'] = [] + if 'from' in results['qsd'] and len(results['qsd']['from']): + results['subscriber'] = \ + NotifyVapid.unquote(results['qsd']['from']) + + if results['user'] and results['host']: + # whatever is left on the URL goes + results['targets'].append('{}@{}'.format( + NotifyVapid.unquote(results['user']), + NotifyVapid.unquote(results['host']), + )) + + elif results['host']: + results['targets'].append( + NotifyVapid.unquote(results['host'])) + + else: + # Acquire our subscriber information + results['subscriber'] = '{}@{}'.format( + NotifyVapid.unquote(results['user']), + NotifyVapid.unquote(results['host']), + ) + + results['targets'].extend( + NotifyVapid.split_path(results['fullpath'])) + + # Get our mode + results['mode'] = results['qsd'].get('mode') + + # Get Image Flag + results['include_image'] = \ + parse_bool(results['qsd'].get( + 'image', NotifyVapid.template_args['image']['default'])) + + # The 'to' makes it easier to use yaml configuration + if 'to' in results['qsd'] and len(results['qsd']['to']): + results['targets'] += \ + NotifyVapid.parse_list(results['qsd']['to']) + + # Our Private Keyfile (PEM) + if 'keyfile' in results['qsd'] and results['qsd']['keyfile']: + results['keyfile'] = \ + NotifyVapid.unquote(results['qsd']['keyfile']) + + # Our Subscription File (JSON) + if 'subfile' in results['qsd'] and results['qsd']['subfile']: + results['subfile'] = \ + NotifyVapid.unquote(results['qsd']['subfile']) + + # Support the 'ttl' variable + if 'ttl' in results['qsd'] and len(results['qsd']['ttl']): + results['ttl'] = \ + NotifyVapid.unquote(results['qsd']['ttl']) + + return results + + @property + def jwt_token(self): + """ + Returns our VAPID Token based on class details + """ + # JWT header + header = { + "alg": "ES256", + "typ": "JWT" + } + + # JWT payload + payload = { + "aud": VAPID_API_LOOKUP[self.mode], + "exp": int(time.time()) + self.vapid_jwt_expiration_sec, + "sub": f"mailto:{self.subscriber}" + } + + # Base64 URL encode header and payload + header_b64 = base64_urlencode( + dumps(header, separators=(",", ":")).encode('utf-8')) + payload_b64 = base64_urlencode( + dumps(payload, separators=(",", ":")).encode('utf-8')) + signing_input = f"{header_b64}.{payload_b64}".encode('utf-8') + signature_b64 = base64_urlencode(self.pem.sign(signing_input)) + + # Return final token + return f"{header_b64}.{payload_b64}.{signature_b64}" + + @property + def public_key(self): + """ + Returns our public key representation + """ + return self.pem.x962_str diff --git a/apprise/plugins/vapid/subscription.py b/apprise/plugins/vapid/subscription.py new file mode 100644 index 00000000..27a79eda --- /dev/null +++ b/apprise/plugins/vapid/subscription.py @@ -0,0 +1,431 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import json +from typing import Optional, Union +from ...asset import AppriseAsset +from ...utils.base64 import base64_urldecode +from ...exception import AppriseInvalidData +from ...apprise_attachment import AppriseAttachment + +try: + from cryptography.hazmat.primitives.asymmetric import ec + + # Cryptography Support enabled + CRYPTOGRAPHY_SUPPORT = True + +except ImportError: + # Cryptography Support disabled + CRYPTOGRAPHY_SUPPORT = False + + +class WebPushSubscription: + """ + WebPush Subscription + """ + # Format: + # { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # } + def __init__(self, content: Union[str, dict, None] = None) -> None: + """ + Prepares a webpush object provided with content + Content can be a dictionary, or JSON String + """ + + # Our variables + self.__endpoint = None + self.__p256dh = None + self.__auth = None + self.__auth_secret = None + self.__public_key = None + + if content is not None: + if not self.load(content): + raise AppriseInvalidData('Could not load subscription') + + def load(self, content: Union[str, dict, None] = None) -> bool: + """ + Performs the loading/validation of the object + """ + + # Reset our variables + self.__endpoint = None + self.__p256dh = None + self.__auth = None + self.__auth_secret = None + self.__public_key = None + + if not CRYPTOGRAPHY_SUPPORT: + return False + + if isinstance(content, str): + try: + content = json.loads(content) + + except (json.decoder.JSONDecodeError, TypeError, OSError): + # Bad data + return False + + if not isinstance(content, dict): + # We could not load he result set + return False + + # Retreive our contents for validation + endpoint = content.get('endpoint') + if not isinstance(endpoint, str): + return False + + try: + p256dh = base64_urldecode(content['keys']['p256dh']) + if not p256dh: + return False + + auth_secret = base64_urldecode(content['keys']['auth']) + if not auth_secret: + return False + + except KeyError: + return False + + try: + # Store our data + self.__public_key = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), p256dh, + ) + + except ValueError: + # Invalid p256dh key (Can't load Public Key) + return False + + self.__endpoint = endpoint + self.__p256dh = content['keys']['p256dh'] + self.__auth = content['keys']['auth'] + self.__auth_secret = auth_secret + + return True + + def write(self, path: str, indent: int = 2) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + """ + if not self.__public_key: + return False + + try: + with open(path, 'w', encoding='utf-8') as f: + json.dump(self.dict, f, indent=indent) + + except (TypeError, OSError): + # Could not write content + return False + + return True + + @property + def auth(self) -> Optional[str]: + return self.__auth if self.__public_key else None + + @property + def endpoint(self) -> Optional[str]: + return self.__endpoint if self.__public_key else None + + @property + def p256dh(self) -> Optional[str]: + return self.__p256dh if self.__public_key else None + + @property + def auth_secret(self) -> Optional[bytes]: + return self.__auth_secret if self.__public_key else None + + @property + def public_key(self) -> Optional['ec.EllipticCurvePublicKey']: + return self.__public_key + + @property + def dict(self) -> dict: + return { + "endpoint": self.__endpoint, + "keys": { + "p256dh": self.__p256dh, + "auth": self.__auth, + }, + } if self.__public_key else { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123...', + "keys": { + "p256dh": '', + "auth": '', + }, + } + + def json(self, indent: int = 2) -> str: + """ + Returns JSON representation of the object + """ + return json.dumps(self.dict, indent=indent) + + def __bool__(self) -> bool: + """ + handle 'if' statement + """ + return True if self.__public_key else False + + def __str__(self) -> str: + """ + Returns our JSON entry as a string + """ + # Return the first 16 characters of the detected endpoint subscription + # id + return '' if not self.__endpoint \ + else self.__endpoint.split('/')[-1][:16] + + +class WebPushSubscriptionManager: + """ + WebPush Subscription Manager + """ + # Format: + # { + # "name1": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + # "name2": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + + # Defines the number of failures we can accept before we abort and assume + # the file is bad + max_load_failure_count = 3 + + def __init__(self, asset: Optional['AppriseAsset'] = None) -> None: + """ + Webpush Subscription Manager + """ + + # Our subscriptions + self.__subscriptions = {} + + # Prepare our Asset Object + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + def __getitem__(self, key: str) -> WebPushSubscription: + """ + Returns our indexed value if it exists + """ + return self.__subscriptions[key.lower()] + + def __setitem__(self, name: str, + subscription: Union[WebPushSubscription, str, dict] + ) -> None: + """ + Set's our object if possible + """ + + if not self.add(subscription, name=name.lower()): + raise AppriseInvalidData('Invalid subscription provided') + + def add(self, subscription: Union[WebPushSubscription, str, dict], + name: Optional[str] = None) -> bool: + """ + Add a subscription into our manager + """ + + if not isinstance(subscription, WebPushSubscription): + try: + # Support loading our object + subscription = WebPushSubscription(subscription) + + except AppriseInvalidData: + return False + + if name is None: + name = str(subscription) + + self.__subscriptions[name.lower()] = subscription + return True + + def __bool__(self) -> bool: + """ + True is returned if at least one subscription has been loaded. + """ + return True if self.__subscriptions else False + + def __len__(self) -> int: + """ + Returns the number of servers loaded; this includes those found within + loaded configuration. This funtion nnever actually counts the + Config entry themselves (if they exist), only what they contain. + """ + return len(self.__subscriptions) + + def __iadd__(self, subscription: Union[WebPushSubscription, str, dict] + ) -> 'WebPushSubscriptionManager': + + if not self.add(subscription): + raise AppriseInvalidData('Invalid subscription provided') + + return self + + def __contains__(self, key: str) -> bool: + """ + Checks if the key exists + """ + return key.lower() in self.__subscriptions + + def clear(self) -> None: + """ + Empties our server list + + """ + self.__subscriptions.clear() + + @property + def dict(self) -> dict: + """ + Returns a dictionary of all entries + """ + return {k: v.dict for k, v in self.__subscriptions.items()} \ + if self.__subscriptions else {} + + def load(self, path: str, byte_limit=0) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + + if byte_limit is zero, then we do not limit our file size, otherwise + set this to the bytes you want to restrict yourself by + """ + + # Reset our object + self.clear() + + # Create our attachment object + attach = AppriseAttachment(asset=self.asset) + + # Add our path + attach.add(path) + + if byte_limit > 0: + # Enforce maximum file size + attach[0].max_file_size = byte_limit + + if not attach.sync(): + return False + + try: + # Otherwise open our path + with open(attach[0].path, 'r', encoding='utf-8') as f: + content = json.load(f) + + except (json.decoder.JSONDecodeError, TypeError, OSError): + # Could not read + return False + + if not isinstance(content, dict): + # Not a list of dictionaries + return False + + # Verify if we're dealing with a single element: + # { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # } + # + # or if we're dealing with a multiple set + # + # { + # "name1": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + # "name2": { + # "endpoint": "https://fcm.googleapis.com/fcm/send/abc123...", + # "keys": { + # "p256dh": "BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR...", + # "auth": "k9Xzm43nBGo=", + # } + # }, + + error_count = 0 + if 'endpoint' in content and 'keys' in content: + if not self.add(content): + return False + + else: + for name, subscription in content.items(): + if not self.add(subscription, name=name.lower()): + error_count += 1 + if error_count > self.max_load_failure_count: + self.clear() + return False + + return True + + def write(self, path: str, indent: int = 2) -> bool: + """ + Writes content to disk based on path specified. Content is a JSON + file, so ideally you may wish to have `.json' as it's extension for + clarity + """ + try: + with open(path, 'w', encoding='utf-8') as f: + json.dump(self.dict, f, indent=indent) + + except (TypeError, OSError): + # Could not write content + return False + + return True + + def json(self, indent: int = 2) -> str: + """ + Returns JSON representation of the object + """ + return json.dumps(self.dict, indent=indent) diff --git a/apprise/utils/base64.py b/apprise/utils/base64.py index e5a8a568..33bb4934 100644 --- a/apprise/utils/base64.py +++ b/apprise/utils/base64.py @@ -32,6 +32,33 @@ import typing import base64 +def base64_urlencode(data: bytes) -> str: + """ + URL Safe Base64 Encoding + """ + try: + return base64.urlsafe_b64encode(data).rstrip(b'=').decode('utf-8') + + except TypeError: + # data is not supported; avoid raising exception + return None + + +def base64_urldecode(data: str) -> bytes: + """ + URL Safe Base64 Encoding + """ + + try: + # Normalize base64url string (remove padding, add it back) + padding = '=' * (-len(data) % 4) + return base64.urlsafe_b64decode(data + padding) + + except TypeError: + # data is not supported; avoid raising exception + return None + + def decode_b64_dict(di: dict) -> dict: """ decodes base64 dictionary previously encoded diff --git a/apprise/utils/pem.py b/apprise/utils/pem.py new file mode 100644 index 00000000..67b948a1 --- /dev/null +++ b/apprise/utils/pem.py @@ -0,0 +1,825 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import json +import base64 +import binascii +import struct +from typing import Union, Optional +from ..utils.base64 import base64_urlencode, base64_urldecode +from ..apprise_attachment import AppriseAttachment +from ..asset import AppriseAsset +from ..logger import logger +from ..exception import ApprisePluginException + +try: + from cryptography.hazmat.backends import default_backend + from cryptography.hazmat.primitives.kdf.hkdf import HKDF + from cryptography.hazmat.primitives import serialization, hashes + from cryptography.hazmat.primitives.ciphers import ( + Cipher, algorithms, modes) + from cryptography.exceptions import InvalidTag + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + from cryptography.hazmat.primitives.asymmetric import ec + from cryptography.hazmat.primitives.serialization import ( + Encoding, + NoEncryption, + PrivateFormat, + PublicFormat, + ) + from cryptography.hazmat.primitives.asymmetric.utils import ( + decode_dss_signature + ) + + # PEM Support enabled + PEM_SUPPORT = True + +except ImportError: + # PEM Support disabled + PEM_SUPPORT = False + + +class ApprisePEMException(ApprisePluginException): + """ + Thrown when there is an error with the PEM Controller + """ + def __init__(self, message, error_code=612): + super().__init__(message, error_code=error_code) + + +class ApprisePEMController: + """ + PEM Controller Tool for the Apprise Library + """ + + # There is no reason a PEM Public Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pem_public_key_size = 8000 + + # There is no reason a PEM Private Key should exceed 8K in size + # If it is more than this, then it is not accepted + max_pem_private_key_size = 8000 + + # Maximum Vapid Message Size + max_webpush_record_size = 4096 + + def __init__(self, path: str, + pub_keyfile: Optional[str] = None, + prv_keyfile: Optional[str] = None, + name: Optional[str] = None, + asset: Optional[AppriseAsset] = None, + **kwargs) -> None: + """ + Path should be the directory keys can be written and read from such as + .store.path + + Optionally additionally specify a keyfile to explicitly open + """ + + # Directory we can work with + self.path = path + + # Prepare our Key Placeholders + self.__private_key = None + self.__public_key = None + + # Our name (id) + self.name = name.strip(' \t/-+!$@#*').lower() \ + if isinstance(name, str) else '' + + # Prepare our Asset Object + self.asset = \ + asset if isinstance(asset, AppriseAsset) else AppriseAsset() + + # Our temporary reference points + self._prv_keyfile = AppriseAttachment(asset=self.asset) + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + if prv_keyfile: + self.load_private_key(prv_keyfile) + + elif pub_keyfile: + self.load_public_key(pub_keyfile) + + else: + self._pub_keyfile = None + + def load_private_key(self, path: Optional[str] = None, + *names: str) -> bool: + """ + Load Private key and from that we can prepare our public key + """ + + if path is None: + # Auto-load our content + return True if self.private_keyfile(*names) else False + + # Create ourselves an Attachment to work with; this grants us the + # ability to pull this key from a remote site or anything else + # supported by the Attachment object + self._prv_keyfile = AppriseAttachment(asset=self.asset) + + # Add our definition to our pem_key reference + self._prv_keyfile.add(path) + + # Enforce maximum file size + self._prv_keyfile[0].max_file_size = self.max_pem_private_key_size + + # + # Reset Public key + # + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + # + # Reset our internal keys + # + self.__private_key = None + self.__public_key = None + + if not self._prv_keyfile.sync(): + # Early exit + logger.error( + 'Could not access PEM Private Key {}.'.format(path)) + return False + + try: + with open(self._prv_keyfile[0].path, "rb") as f: + self.__private_key = serialization.load_pem_private_key( + f.read(), + password=None, # or provide the password if encrypted + backend=default_backend() + ) + + except (ValueError, TypeError): + logger.debug( + 'PEM Private Key file specified is not supported (%s)', + type(path)) + return False + + except FileNotFoundError: + logger.debug('PEM Private Key file not found: %s', path) + return False + + except OSError as e: + logger.warning('Error accessing PEM Private Key file %s', path) + logger.debug(f'I/O Exception: {e}') + return False + + # + # Generate our public key + # + self.__public_key = self.__private_key.public_key() + + # Load our private key + return True if self.__private_key else False + + def load_public_key(self, path: Optional[str] = None, *names: str) -> bool: + """ + Load Public key only + + Note: with just a public key you can only decrypt, encryption is not + possible. + """ + + if path is None: + # Auto-load our content + return True if self.public_keyfile(*names) else False + + # Create ourselves an Attachment to work with; this grants us the + # ability to pull this key from a remote site or anything else + # supported by the Attachment object + self._pub_keyfile = AppriseAttachment(asset=self.asset) + + # Add our definition to our pem_key reference + self._pub_keyfile.add(path) + + # Enforce maximum file size + self._pub_keyfile[0].max_file_size = self.max_pem_public_key_size + + # + # Reset Private key + # + self._prv_keyfile = AppriseAttachment(asset=self.asset) + + # + # Reset our internal keys + # + self.__private_key = None + self.__public_key = None + + if not self._pub_keyfile.sync(): + # Early exit + logger.error( + 'Could not access PEM Public Key {}.'.format(path)) + return False + + try: + with open(path, 'rb') as key_file: + self.__public_key = serialization.load_pem_public_key( + key_file.read(), + backend=default_backend() + ) + + except (ValueError, TypeError): + logger.debug( + 'PEM Public Key file specified is not supported (%s)', + type(path)) + return False + + except FileNotFoundError: + # Generate keys + logger.debug('PEM Public Key file not found: %s', path) + return False + + except OSError as e: + logger.warning('Error accessing PEM Public Key file %s', path) + logger.debug(f'I/O Exception: {e}') + return False + + # Load our private key + return True if self.__public_key else False + + def keygen(self, name: 'Optional[str]' = None, force: bool = False): + """ + Generates a set of keys based on name configured. + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + # Detect if a key has been loaded or not + has_key = True if self.private_key(autogen=False) \ + or self.public_key(autogen=False) else False + + if (has_key and not (name or force)) or not self.path: + logger.trace( + 'PEM keygen disabled, reason=%s', + 'keyfile-defined' if not has_key + else 'no-write-path') + return False + + # Create a new private/public key pair + self.__private_key = ec.generate_private_key( + ec.SECP256R1(), default_backend()) + self.__public_key = self.__private_key.public_key() + + # + # Prepare our PEM formatted output files + # + private_key = self.__private_key.private_bytes( + Encoding.PEM, + PrivateFormat.PKCS8, + encryption_algorithm=NoEncryption(), + ) + + public_key = self.__public_key.public_bytes( + encoding=Encoding.PEM, + format=PublicFormat.SubjectPublicKeyInfo, + ) + + if not name: + name = self.name + + file_prefix = '' if not name else f'{name}-' + pub_path = os.path.join(self.path, f'{file_prefix}public_key.pem') + prv_path = os.path.join(self.path, f'{file_prefix}private_key.pem') + + if not force: + if os.path.isfile(pub_path): + logger.debug( + 'PEM generation skipped; Public Key already exists: %s/%s', + os.path.dirname(pub_path), os.path.basename(pub_path)) + return False + + if os.path.isfile(prv_path): + logger.debug( + 'PEM generation skipped; Private Key already exists: %s%s', + os.path.dirname(prv_path), os.path.basename(prv_path)) + return False + + try: + # Write our keys to disk + with open(pub_path, 'wb') as f: + f.write(public_key) + + except OSError as e: + logger.warning('Error writing Public PEM file %s', pub_path) + logger.debug(f'I/O Exception: {e}') + + # Cleanup + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + return False + + try: + with open(prv_path, 'wb') as f: + f.write(private_key) + + except OSError as e: + logger.warning('Error writing Private PEM file %s', prv_path) + logger.debug(f'I/O Exception: {e}') + + try: + os.unlink(pub_path) + logger.trace('Removed %s', pub_path) + + except OSError: + pass + + try: + os.unlink(prv_path) + logger.trace('Removed %s', prv_path) + + except OSError: + pass + + return False + + # Update our local file references + self._prv_keyfile = AppriseAttachment(asset=self.asset) + self._prv_keyfile.add(prv_path) + + self._pub_keyfile = AppriseAttachment(asset=self.asset) + self._pub_keyfile.add(pub_path) + + logger.info( + 'Wrote Public/Private PEM key pair for %s/%s', + os.path.dirname(pub_path), + os.path.basename(pub_path)) + return True + + def public_keyfile(self, *names: str) -> Optional[str]: + """ + Returns the first match of a useable public key based names provided + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + if self._pub_keyfile: + # If our code reaches here, then we fetch our public key + pem_key = self._pub_keyfile[0] + if not pem_key: + # We could not access the attachment + logger.error( + 'Could not access PEM Public Key {}.'.format( + pem_key.url(privacy=True))) + return False + + return pem_key.path + + elif not self.path: + # No path + return None + + fnames = [ + 'public_key.pem', + 'public.pem', + 'pub.pem', + ] + + if self.name: + # Include our name in the list + fnames = [self.name] + [*names] + + for name in names: + fnames.insert(0, f'{name}-public_key.pem') + + _entry = name.lower() + fnames.insert(0, f'{_entry}-public_key.pem') + + return next( + (os.path.join(self.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.path, fname))), + None) + + def private_keyfile(self, *names: str) -> Optional[str]: + """ + Returns the first match of a useable private key based names provided + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + if self._prv_keyfile: + # If our code reaches here, then we fetch our private key + pem_key = self._prv_keyfile[0] + if not pem_key: + # We could not access the attachment + logger.error( + 'Could not access PEM Private Key {}.'.format( + pem_key.url(privacy=True))) + return False + + return pem_key.path + + elif not self.path: + # No path + return None + + fnames = [ + 'private_key.pem', + 'private.pem', + 'prv.pem', + ] + + if self.name: + # Include our name in the list + fnames = [self.name] + [*names] + + for name in names: + fnames.insert(0, f'{name}-private_key.pem') + + _entry = name.lower() + fnames.insert(0, f'{_entry}-private_key.pem') + + return next( + (os.path.join(self.path, fname) + for fname in fnames + if os.path.isfile(os.path.join(self.path, fname))), + None) + + def public_key(self, *names: str, autogen: Optional[bool] = None, + autodetect: bool = True + ) -> Optional['ec.EllipticCurvePublicKey']: + """ + Opens a spcified pem public file and returns the key from it which + is used to decrypt the message + """ + if self.__public_key or not autodetect: + return self.__public_key + + path = self.public_keyfile(*names) + if not path: + if (autogen if autogen is not None else self.asset.pem_autogen) \ + and self.keygen(*names): + path = self.public_keyfile(*names) + if path: + # We should get a hit now + return self.public_key(autogen=False) + + logger.warning('No PEM Public Key could be loaded') + return None + + return self.__public_key if ( + self.load_public_key(path) or + # Try to see if we can load a private key (which we can generate a + # public from) + self.private_key(*names, autogen=autogen)) else None + + def private_key(self, *names: str, autogen: Optional[bool] = None, + autodetect: bool = True + ) -> Optional['ec.EllipticCurvePrivateKey']: + """ + Opens a spcified pem private file and returns the key from it which + is used to encrypt the message + """ + if self.__private_key or not autodetect: + return self.__private_key + + path = self.private_keyfile(*names) + if not path: + if (autogen if autogen is not None else self.asset.pem_autogen) \ + and self.keygen(*names): + path = self.private_keyfile(*names) + if path: + # We should get a hit now + return self.private_key(autogen=False) + + logger.warning('No PEM Private Key could be loaded') + return None + + return self.__private_key if self.load_private_key(path) else None + + def encrypt_webpush(self, message: Union[str, bytes], + public_key: 'ec.EllipticCurvePublicKey', + auth_secret: bytes) -> bytes: + """ + Encrypt a WebPush message using the recipient's public key and auth + secret. + + Accepts input message as str or bytes. + """ + if isinstance(message, str): + message = message.encode('utf-8') + + # 1. Generate ephemeral EC private/Public key + ephemeral_private_key = \ + ec.generate_private_key(ec.SECP256R1(), default_backend()) + ephemeral_public_key = ephemeral_private_key.public_key().public_bytes( + encoding=Encoding.X962, format=PublicFormat.UncompressedPoint) + + # 2. Random salt + salt = os.urandom(16) + + # 3. Generate shared secret via ECDH + shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key) + + # 4. Derive PRK using HKDF (first phase) + recipient_public_key_bytes = public_key.public_bytes( + encoding=Encoding.X962, + format=PublicFormat.UncompressedPoint, + ) + + # 5. Derive Encryption key + hkdf_secret = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=auth_secret, + info=b"WebPush: info\x00" + + recipient_public_key_bytes + ephemeral_public_key, + backend=default_backend(), + ).derive(shared_secret) + + # 6. Derive Content Encryption Key + hkdf_key = HKDF( + algorithm=hashes.SHA256(), + length=16, + salt=salt, + info=b"Content-Encoding: aes128gcm\x00", + backend=default_backend(), + ).derive(hkdf_secret) + + # 7. Derive Nonce + hkdf_nonce = HKDF( + algorithm=hashes.SHA256(), + length=12, + salt=salt, + info=b"Content-Encoding: nonce\x00", + backend=default_backend(), + ).derive(hkdf_secret) + + # 8. Encrypt the message + aesgcm = AESGCM(hkdf_key) + # RFC8291 requires us to add '\0x02' byte to end of message + ciphertext = aesgcm.encrypt( + hkdf_nonce, message + b"\x02", associated_data=None) + + # 9. Build WebPush header + payload + header = salt + header += struct.pack("!L", self.max_webpush_record_size) + header += struct.pack("!B", len(ephemeral_public_key)) + header += ephemeral_public_key + header += ciphertext + + return header + + def encrypt(self, + message: Union[str, bytes], + public_key: 'Optional[ec.EllipticCurvePublicKey]' = None, + salt: Optional[bytes] = None) -> Optional[str]: + """ + Encrypts a message using the recipient's public key (or self public + key if none provided). Message can be str or bytes. + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + # 1. Handle string vs bytes input + if isinstance(message, str): + message = message.encode('utf-8') + + # 2. Select public key + if public_key is None: + public_key = self.public_key() + if public_key is None: + logger.debug("No public key available for encryption.") + return None + + # 3. Generate ephemeral EC private key + ephemeral_private_key = ec.generate_private_key(ec.SECP256R1()) + + # 4. Derive shared secret + shared_secret = ephemeral_private_key.exchange(ec.ECDH(), public_key) + + # 5. Derive symmetric AES key using HKDF + derived_key = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=salt, # Allow salt=None if not provided + info=b'ecies-encryption', + backend=default_backend() + ).derive(shared_secret) + + # 6. Encrypt the message using AES-GCM + iv = os.urandom(12) # 96-bit random IV for GCM + encryptor = Cipher( + algorithms.AES(derived_key), + modes.GCM(iv), + backend=default_backend() + ).encryptor() + + ciphertext = encryptor.update(message) + encryptor.finalize() + tag = encryptor.tag + + # 7. Serialize ephemeral public key as X9.62 Uncompressed Point + ephemeral_public_key_bytes = \ + ephemeral_private_key.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint, + ) + + # 8. Combine everything cleanly + full_payload = { + "ephemeral_pubkey": base64_urlencode(ephemeral_public_key_bytes), + "iv": base64_urlencode(iv), + "tag": base64_urlencode(tag), + "ciphertext": base64_urlencode(ciphertext), + } + + return base64.b64encode( + json.dumps(full_payload).encode('utf-8') + ).decode('utf-8') + + def decrypt(self, + encrypted_payload: Union[str, bytes], + private_key: 'Optional[ec.EllipticCurvePrivateKey]' = None, + salt: Optional[bytes] = None) -> Optional[str]: + """ + Decrypts a message using the provided private key or fallback to + self's private key. + + Payload is the base64-encoded JSON from encrypt(). + """ + + if not PEM_SUPPORT: + msg = 'PEM Support unavailable; install cryptography library' + logger.warning(msg) + raise ApprisePEMException(msg) + + # 1. Parse input + try: + if isinstance(encrypted_payload, str): + payload_bytes = base64.b64decode( + encrypted_payload.encode('utf-8')) + + else: + payload_bytes = base64.b64decode(encrypted_payload) + + except binascii.Error: + # Bad Padding + logger.debug("Unparseable encrypted content provided") + return None + + try: + payload = json.loads(payload_bytes.decode('utf-8')) + + except UnicodeDecodeError: + logger.debug("Unparseable encrypted content provided") + return None + + ephemeral_pubkey_bytes = base64_urldecode(payload["ephemeral_pubkey"]) + iv = base64_urldecode(payload["iv"]) + tag = base64_urldecode(payload["tag"]) + ciphertext = base64_urldecode(payload["ciphertext"]) + + # 2. Select private key + if private_key is None: + private_key = self.private_key() + if private_key is None: + logger.debug("No private key available for decryption.") + return None + + # 3. Load ephemeral public key from sender + ephemeral_pubkey = ec.EllipticCurvePublicKey.from_encoded_point( + ec.SECP256R1(), + ephemeral_pubkey_bytes + ) + + # 4. ECDH shared secret + shared_secret = private_key.exchange(ec.ECDH(), ephemeral_pubkey) + + # 5. Derive symmetric AES key with HKDF + derived_key = HKDF( + algorithm=hashes.SHA256(), + length=32, + salt=salt, + info=b'ecies-encryption', + ).derive(shared_secret) + + # 6. Decrypt using AES-GCM + decryptor = Cipher( + algorithms.AES(derived_key), + modes.GCM(iv, tag), + ).decryptor() + + try: + plaintext = decryptor.update(ciphertext) + decryptor.finalize() + + except InvalidTag: + logger.debug("Decryption failed - Authentication Mismatch") + # Reason for Error: + # - Mismatched or missing salt + # - Mismatched iv, tag, or ciphertext + # - Incorrect or corrupted ephemeral_pubkey + # - Wrong or incomplete key derivation + # - Data being altered between encryption and decryption + # (truncated/corrupted) + + # Basically if we get here, we tried to decrypt encrypted content + # using the wrong key. + return None + + # 7. Return decoded message + return plaintext.decode('utf-8') + + def sign(self, content: bytes) -> Optional[bytes]: + """ + Sign the message using ES256 (ECDSA w/ SHA256) via private key + """ + + try: + # Sign the message using ES256 (ECDSA w/ SHA256) + der_sig = self.private_key()\ + .sign(content, ec.ECDSA(hashes.SHA256())) + + except AttributeError: + # NoneType; could not load key + return None + + # Convert DER to raw R||S + r, s = decode_dss_signature(der_sig) + return r.to_bytes( + 32, byteorder='big') + s.to_bytes(32, byteorder='big') + + @property + def pub_keyfile(self) -> Optional[Union[str, bool]]: + """ + Returns the Public Keyfile Path if set otherwise it returns None + This property returns False if a keyfile was provided, but was invalid + """ + return None if not self._pub_keyfile \ + else (False if not self._pub_keyfile[0] + else self._pub_keyfile[0].path) + + @property + def prv_keyfile(self) -> Optional[Union[str, bool]]: + """ + Returns the Private Keyfile Path if set otherwise it returns None + This property returns False if a keyfile was provided, but was invalid + """ + return None if not self._prv_keyfile \ + else (False if not self._prv_keyfile[0] + else self._prv_keyfile[0].path) + + @property + def x962_str(self) -> str: + """ + X962 serialization based on public key + """ + try: + return base64_urlencode( + self.public_key().public_bytes( + encoding=serialization.Encoding.X962, + format=serialization.PublicFormat.UncompressedPoint) + ) + except AttributeError: + # Public Key could not be generated (public_key() returned None) + return '' + + def __bool__(self) -> bool: + """ + Returns True if at least 1 key was loaded + """ + return True if (self.private_key() or self.public_key()) else False diff --git a/packaging/redhat/python-apprise.spec b/packaging/redhat/python-apprise.spec index a396da89..1a1f078c 100644 --- a/packaging/redhat/python-apprise.spec +++ b/packaging/redhat/python-apprise.spec @@ -43,7 +43,7 @@ Africas Talking, Apprise API, APRS, AWS SES, AWS SNS, Bark, BlueSky, Burst SMS, BulkSMS, BulkVS, Chanify, Clickatell, ClickSend, DAPNET, DingTalk, Discord, E-Mail, Emby, FCM, Feishu, Flock, Free Mobile, Google Chat, Gotify, Growl, Guilded, Home Assistant, httpSMS, IFTTT, Join, Kavenegar, KODI, Kumulos, LaMetric, Line, -LunaSea, MacOSX, Mailgun, Mastodon, Mattermost,Matrix, MessageBird, Microsoft +LunaSea, MacOSX, Mailgun, Mastodon, Mattermost, Matrix, MessageBird, Microsoft Windows, Microsoft Teams, Misskey, MQTT, MSG91, MyAndroid, Nexmo, Nextcloud, NextcloudTalk, Notica, Notifiarr, Notifico, ntfy, Office365, OneSignal, Opsgenie, PagerDuty, PagerTree, ParsePlatform, Plivo, PopcornNotify, Prowl, @@ -51,8 +51,8 @@ Pushalot, PushBullet, Pushjet, PushMe, Pushover, PushSafer, Pushy, PushDeer, Revolt, Reddit, Resend, Rocket.Chat, RSyslog, SendGrid, ServerChan, Seven, SFR, Signal, SimplePush, Sinch, Slack, SMSEagle, SMS Manager, SMTP2Go, SparkPost, Splunk, Super Toasty, Streamlabs, Stride, Synology Chat, Syslog, Techulus Push, -Telegram, Threema Gateway, Twilio, Twitter, Twist, VictorOps, Voipms, Vonage, -WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC} +Telegram, Threema Gateway, Twilio, Twitter, Twist, Vapid, VictorOps, Voipms, +Vonage, WebPush, WeCom Bot, WhatsApp, Webex Teams, Workflows, WxPusher, XBMC} Name: python-%{pypi_name} Version: 1.9.3 diff --git a/test/test_apprise_utils.py b/test/test_apprise_utils.py index 29649d18..28491184 100644 --- a/test/test_apprise_utils.py +++ b/test/test_apprise_utils.py @@ -2752,6 +2752,30 @@ def test_cwe312_url(): '#random') == 'slack://test@B...4/J...M/X...3/' +def test_base64_encode_decode(): + """ + Utils:Base64:URLEncode & Decode + + """ + assert utils.base64.base64_urlencode(None) is None + assert utils.base64.base64_urlencode(42) is None + assert utils.base64.base64_urlencode(object) is None + assert utils.base64.base64_urlencode({}) is None + assert utils.base64.base64_urlencode("") is None + assert utils.base64.base64_urlencode("abc") is None + assert utils.base64.base64_urlencode(b"") == '' + assert utils.base64.base64_urlencode(b"abc") == 'YWJj' + + assert utils.base64.base64_urldecode(None) is None + assert utils.base64.base64_urldecode(42) is None + assert utils.base64.base64_urldecode(object) is None + assert utils.base64.base64_urldecode({}) is None + + assert utils.base64.base64_urldecode("abc") == b'i\xb7' + assert utils.base64.base64_urldecode("") == b'' + assert utils.base64.base64_urldecode('YWJj') == b'abc' + + def test_dict_base64_codec(tmpdir): """ Test encoding/decoding of base64 content diff --git a/test/test_attach_file.py b/test/test_attach_file.py index d1205671..370969d2 100644 --- a/test/test_attach_file.py +++ b/test/test_attach_file.py @@ -248,6 +248,12 @@ def test_attach_file(): # Test hosted configuration and that we can't add a valid file aa = AppriseAttachment(location=ContentLocation.HOSTED) + # No entries defined yet + assert bool(aa) is False + assert aa.sync() is False + + # Entry count does not impact sync if told to act that way + assert aa.sync(abort_if_empty=False) is True assert aa.add(path) is False response = AppriseAttachment.instantiate(path) diff --git a/test/test_plugin_vapid.py b/test/test_plugin_vapid.py new file mode 100644 index 00000000..50dd1c20 --- /dev/null +++ b/test/test_plugin_vapid.py @@ -0,0 +1,697 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import os +import sys +import json +import requests +import pytest +from unittest import mock + +from apprise.plugins.vapid.subscription import ( + WebPushSubscription, WebPushSubscriptionManager) +from apprise.plugins.vapid import NotifyVapid +from apprise import exception, asset, url +from apprise.common import PersistentStoreMode +from apprise.utils.pem import ApprisePEMController +from helpers import AppriseURLTester + +# Disable logging for a cleaner testing output +import logging +logging.disable(logging.CRITICAL) + +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + +# a test UUID we can use +SUBSCRIBER = 'user@example.com' + +PLUGIN_ID = 'vapid' + +# Our Testing URLs +apprise_url_tests = ( + ('vapid://', { + 'instance': TypeError, + }), + ('vapid://:@/', { + 'instance': TypeError, + }), + ('vapid://invalid-subscriber', { + # An invalid Subscriber + 'instance': TypeError, + }), + ('vapid://user@example.com', { + # bare bone requirements met, but we don't have our subscription file + # or our private key (pem) + 'instance': NotifyVapid, + # We'll fail to respond because we would not have found any + # configuration to load + 'notify_response': False, + }), + ('vapid://user@example.com?keyfile=invalid&subfile=invalid', { + # Test passing keyfile and subfile on our path (even if invalid) + 'instance': NotifyVapid, + # We'll fail to respond because we would not have found any + # configuration to load + 'notify_response': False, + }), + ('vapid://user@example.com/newuser@example.com', { + # we don't have our subscription file or private key + 'instance': NotifyVapid, + 'notify_response': False, + }), + ('vapid://user@example.ca/newuser@example.ca', { + 'instance': NotifyVapid, + # force a failure + 'response': False, + 'requests_response_code': requests.codes.internal_server_error, + }), + ('vapid://user@example.uk/newuser@example.uk', { + 'instance': NotifyVapid, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('vapid://user@example.au/newuser@example.au', { + 'instance': NotifyVapid, + # Throws a series of connection and transfer exceptions when this flag + # is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), +) + + +@pytest.fixture +def patch_persistent_store_namespace(tmpdir): + """ + Force an easy to test environment + """ + with mock.patch.object(url.URLBase, 'url_id', return_value=PLUGIN_ID), \ + mock.patch.object( + asset.AppriseAsset, 'storage_mode', + PersistentStoreMode.AUTO), \ + mock.patch.object( + asset.AppriseAsset, 'storage_path', str(tmpdir)): + + tmp_dir = tmpdir.mkdir(PLUGIN_ID) + # Return the directory name + yield str(tmp_dir) + + +@pytest.fixture +def subscription_reference(): + return { + "user@example.com": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/default', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + "user1": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + "user2": { + "endpoint": 'https://fcm.googleapis.com/fcm/send/def456', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }, + } + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +def test_plugin_vapid_urls(): + """ + NotifyVapid() Apprise URLs - No Config + + """ + + # Run our general tests + AppriseURLTester(tests=apprise_url_tests).run_all() + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +def test_plugin_vapid_urls_with_required_assets( + patch_persistent_store_namespace, subscription_reference): + """ + NotifyVapid() Apprise URLs With Config + """ + + # Determine our store + pc = ApprisePEMController(path=patch_persistent_store_namespace) + assert pc.keygen() is True + + # Write our subscriptions file to disk + subscription_file = os.path.join( + patch_persistent_store_namespace, + NotifyVapid.vapid_subscription_file) + + with open(subscription_file, 'w') as f: + f.write(json.dumps(subscription_reference)) + + tests = ( + ('vapid://user@example.com', { + # user@example.com loaded (also used as subscriber id) + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/newuser@example.com', { + # no newuser@example.com key entry + 'instance': NotifyVapid, + 'notify_response': False, + }), + ('vapid://user@example.com/user1?to=user2', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://user1?to=user2&from=user@example.com', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://?to=user2&from=user@example.com', { + # No host provided + 'instance': NotifyVapid, + }), + ('vapid://user@example.com?to=user2&from=user@example.com', { + # We'll succesfully notify 2 users + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=15', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=invalid', { + # test ttl + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&ttl=-4000', { + # bad ttl + 'instance': TypeError, + }), + ('vapid://user@example.com/user1?to=user2&mode=edge', { + # test mode + 'instance': NotifyVapid, + }), + ('vapid://user@example.com/user1?to=user2&mode=', { + # test mode + 'instance': TypeError, + }), + ('vapid://user@example.com/user1?to=user2&mode=invalid', { + # test mode more + 'instance': TypeError, + }), + ('vapid://user@example.com/user1', { + 'instance': NotifyVapid, + # force a failure + 'response': False, + 'requests_response_code': requests.codes.internal_server_error, + }), + ('vapid://user@example.com/user1', { + 'instance': NotifyVapid, + # throw a bizzare code forcing us to fail to look it up + 'response': False, + 'requests_response_code': 999, + }), + ('vapid://user@example.com/user1', { + 'instance': NotifyVapid, + # Throws a series of connection and transfer exceptions + # when this flag is set and tests that we gracfully handle them + 'test_requests_exceptions': True, + }), + ) + + AppriseURLTester(tests=tests).run_all() + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +def test_plugin_vapid_subscriptions(tmpdir): + """ + NotifyVapid() Subscriptions + + """ + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + with pytest.raises(exception.AppriseInvalidData): + # Integer not supported + WebPushSubscription(42) + + with pytest.raises(exception.AppriseInvalidData): + # Not the correct format + WebPushSubscription('bad-content') + + with pytest.raises(exception.AppriseInvalidData): + # Invalid JSON + WebPushSubscription('{') + + with pytest.raises(exception.AppriseInvalidData): + # Empty Dictionary + WebPushSubscription({}) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=', + "auth": 42, + }, + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 42, + "auth": 'k9Xzm43nBGo=', + }, + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + }) + + with pytest.raises(exception.AppriseInvalidData): + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": {}, + }) + + with pytest.raises(exception.AppriseInvalidData): + # Invalid p256dh public key provided + wps = WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BNcW4oA7zq5H9TKIrA3XfKclN2fX9P_7NR=', + "auth": 'k9Xzm43nBGo=', + }, + }) + + # An empty object + wps = WebPushSubscription() + assert bool(wps) is False + assert isinstance(wps.json(), str) + assert json.loads(wps.json()) + assert str(wps) == '' + assert wps.auth is None + assert wps.endpoint is None + assert wps.p256dh is None + assert wps.public_key is None + # We can't write anything as there is nothing loaded + assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is False + + # A valid key + wps = WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }) + + assert bool(wps) is True + assert isinstance(wps.json(), str) + assert json.loads(wps.json()) + assert str(wps) == 'abc123' + assert wps.auth == 'k9Xzm43nBGo=' + assert wps.endpoint == 'https://fcm.googleapis.com/fcm/send/abc123' + assert wps.p256dh == 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' \ + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0' + assert wps.public_key is not None + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + # Bad content + assert wps.write(object) is False + assert wps.write(None) is False + # Can't write to a name already taken by as a directory + assert wps.write(str(tmpdir0)) is False + # Can't write to a name already taken by as a directory + assert wps.write(os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + + +@pytest.mark.skipif( + 'cryptography' in sys.modules, + reason="Requires that cryptography NOT be installed") +def test_plugin_vapid_subscriptions_without_c(): + """ + NotifyVapid() Subscriptions (no Cryptography) + + """ + with pytest.raises(exception.AppriseInvalidData): + # A valid key that can't be loaded because crytography is missing + WebPushSubscription({ + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + }) + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +def test_plugin_vapid_subscription_manager(tmpdir): + """ + NotifyVapid() Subscription Manager + + """ + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + with pytest.raises(exception.AppriseInvalidData): + # An invalid object + smgr = WebPushSubscriptionManager() + smgr['abc'] = 'invalid' + + with pytest.raises(exception.AppriseInvalidData): + # An invalid object + smgr = WebPushSubscriptionManager() + smgr += 'invalid' + + smgr = WebPushSubscriptionManager() + + assert bool(smgr) is False + assert len(smgr) == 0 + + sub = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + } + + assert smgr.add(sub) is True + assert bool(smgr) is True + assert len(smgr) == 1 + + # Same sub (overwrites same slot) + smgr += sub + assert bool(smgr) is True + assert len(smgr) == 1 + + # This makes a copy + smgr['abc'] = smgr['abc123'] + assert bool(smgr) is True + assert len(smgr) == 2 + + assert isinstance(smgr['abc123'], WebPushSubscription) + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + # Write our content + assert smgr.write( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + + # Reset our object + smgr.clear() + assert bool(smgr) is False + assert len(smgr) == 0 + + # Load our content back + assert smgr.load( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert bool(smgr) is True + assert len(smgr) == 2 + + # Write over our file using the standard Subscription format + assert smgr['abc123'].write( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + + # We can still open this type as well + assert smgr.load( + os.path.join(str(tmpdir0), 'subscriptions.json')) is True + assert bool(smgr) is True + assert len(smgr) == 1 + + smgr.clear() + bad_entry = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'invalid', + "auth": 'garbage', + }, + } + + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + # A bad JSON file + f.write('{') + assert smgr.load(subscriptions) is False + + with open(subscriptions, 'w', encoding='utf-8') as f: + # not expected dictionary + f.write('null') + assert smgr.load(subscriptions) is False + + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + json.dump(bad_entry, f) + assert smgr.load(subscriptions) is False + + # Create bad data + bad_data = { + 'bad1': bad_entry, + 'bad2': bad_entry, + 'bad3': bad_entry, + 'bad4': bad_entry, + } + subscriptions = os.path.join(str(tmpdir0), 'subscriptions.json') + with open(subscriptions, 'w', encoding='utf-8') as f: + json.dump(bad_data, f) + assert smgr.load(subscriptions) is False + assert smgr.load('invalid-file') is False + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +@mock.patch('requests.post') +def test_plugin_vapid_initializations(mock_post, tmpdir): + """ + NotifyVapid() Initializations + + """ + + # Assign our mock object our return value + okay_response = requests.Request() + okay_response.status_code = requests.codes.ok + okay_response.content = "" + mock_post.return_value = okay_response + + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + # Write our subfile + smgr = WebPushSubscriptionManager() + sub = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + } + subfile = os.path.join(str(tmpdir0), 'subscriptions.json') + assert smgr.add(sub) is True + assert smgr.add(smgr['abc123']) is True + assert os.listdir(str(tmpdir0)) == [] + + with mock.patch('json.dump', side_effect=OSError): + # We will fial to write + assert smgr.write(subfile) is False + + assert smgr.write(subfile) is True + assert os.listdir(str(tmpdir0)) == ['subscriptions.json'] + assert isinstance(smgr.json(), str) + + _asset = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + # Auto-Key Generation + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile=subfile, + asset=_asset) + assert isinstance(obj, NotifyVapid) + # Our subscription directory + our + # persistent store where our keys were generated + assert len(os.listdir(str(tmpdir0))) == 2 + + # Second call re-references keys previously generated + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile=subfile, + asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is True + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is True + + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile='/a/bad/path', + asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False + + # Detect our keyfile + cache_dir = [x for x in os.listdir(str(tmpdir0)) + if not x.endswith('subscriptions.json')][0] + + # Test fixed assignment to our keyfile + keyfile = os.path.join(str(tmpdir0), cache_dir, 'private_key.pem') + assert os.path.exists(keyfile) + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=keyfile, + subfile=subfile, asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is True + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is True + + # Invalid Keyfile + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=subfile, + subfile=subfile, asset=_asset) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False + + # AutoGen Temporary directory + tmpdir1 = tmpdir.mkdir('tmp01') + _asset2 = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir1), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + assert os.listdir(str(tmpdir1)) == [] + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile=keyfile, + asset=_asset2) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + # We have a temporary subscription file we can use + assert os.listdir(str(tmpdir1)) == ['00088ad3'] + # We will have a dud configuration file, but at least it's something + # to help the user with + assert obj.send('test') is False + # Second instance fails as well + assert obj.send('test') is False + + # AutoGen Temporary directory + tmpdir2 = tmpdir.mkdir('tmp02') + _asset3 = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir2), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + # Test invalid keyfile + assert os.path.exists(keyfile) + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], keyfile='invalid-file', + subfile=subfile, asset=_asset3) + assert isinstance(obj, NotifyVapid) + assert isinstance(obj.url(), str) + assert obj.send('test') is False + # A second message makes no difference; what is loaded into memory is used + assert obj.send('test') is False + + +@pytest.mark.skipif( + 'cryptography' in sys.modules, + reason="Requires that cryptography NOT be installed") +def test_plugin_vapid_initializations_without_c(tmpdir): + """ + NotifyVapid() Initializations without cryptography + + """ + # Temporary directory + tmpdir0 = tmpdir.mkdir('tmp00') + + # Write our subfile + smgr = WebPushSubscriptionManager() + sub = { + "endpoint": 'https://fcm.googleapis.com/fcm/send/abc123', + "keys": { + "p256dh": 'BI2RNIK2PkeCVoEfgVQNjievBi4gWvZxMiuCpOx6K6qCO' + '5caru5QCPuc-nEaLplbbFkHxTrR9YzE8ZkTjie5Fq0', + "auth": 'k9Xzm43nBGo=', + }, + } + subfile = os.path.join(str(tmpdir0), 'subscriptions.json') + assert smgr.add(sub) is False + _asset = asset.AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + # Auto-gen our private/public key pair + pem_autogen=True, + ) + + # Auto-Key Generation + obj = NotifyVapid( + 'user@example.ca', targets=['abc123', ], subfile=subfile, + asset=_asset) + assert isinstance(obj, NotifyVapid) diff --git a/test/test_utils_pem.py b/test/test_utils_pem.py new file mode 100644 index 00000000..fbf1a824 --- /dev/null +++ b/test/test_utils_pem.py @@ -0,0 +1,562 @@ +# -*- coding: utf-8 -*- +# BSD 2-Clause License +# +# Apprise - Push Notification Library. +# Copyright (c) 2025, Chris Caron +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are met: +# +# 1. Redistributions of source code must retain the above copyright notice, +# this list of conditions and the following disclaimer. +# +# 2. Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation +# and/or other materials provided with the distribution. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# POSSIBILITY OF SUCH DAMAGE. + +import logging +import os +import sys +import pytest +from unittest import mock + +from apprise import AppriseAsset +from apprise import PersistentStoreMode +from apprise import utils + +# Disable logging for a cleaner testing output +logging.disable(logging.CRITICAL) + +# Attachment Directory +TEST_VAR_DIR = os.path.join(os.path.dirname(__file__), 'var') + + +@pytest.mark.skipif( + 'cryptography' not in sys.modules, reason="Requires cryptography") +def test_utils_pem_general(tmpdir): + """ + Utils:PEM + + """ + + # string to manipulate/work with + unencrypted_str = "message" + + tmpdir0 = tmpdir.mkdir('tmp00') + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.MEMORY, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # Create a PEM Controller + pem_c = utils.pem.ApprisePEMController(path=None, asset=asset) + + # Nothing to lookup + assert pem_c.public_keyfile() is None + assert pem_c.public_key() is None + assert pem_c.x962_str == '' + assert pem_c.decrypt(b'data') is None + assert pem_c.encrypt(unencrypted_str) is None + # Keys can not be generated in memory mode + assert pem_c.keygen() is False + assert pem_c.sign(b'data') is None + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # No new files + assert os.listdir(str(tmpdir0)) == [] + + # Our asset is now write mode, so we will be able to generate a key + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # Nothing to lookup + assert pem_c.public_keyfile() is None + assert pem_c.public_key() is None + assert pem_c.x962_str == '' + assert pem_c.encrypt(unencrypted_str) is None + + # Generate our keys + assert bool(pem_c) is False + assert pem_c.keygen() is True + assert bool(pem_c) is True + + # We have 2 new key files generated + pub_keyfile = os.path.join(str(tmpdir0), 'public_key.pem') + prv_keyfile = os.path.join(str(tmpdir0), 'private_key.pem') + assert os.path.isfile(pub_keyfile) + assert os.path.isfile(prv_keyfile) + assert pem_c.public_keyfile() is not None + assert pem_c.decrypt("garbage") is None + assert pem_c.public_key() is not None + + # Keys used later on as ref + pubkey_ref = pem_c.public_key() + prvkey_ref = pem_c.private_key() + + assert isinstance(pem_c.x962_str, str) + assert len(pem_c.x962_str) > 20 + content = pem_c.encrypt(unencrypted_str) + assert pem_c.decrypt(pem_c.encrypt(unencrypted_str.encode('utf-8'))) \ + == pem_c.decrypt(pem_c.encrypt(unencrypted_str)) + assert pem_c.decrypt( + pem_c.encrypt(unencrypted_str, public_key=pem_c.public_key())) \ + == pem_c.decrypt(pem_c.encrypt(unencrypted_str)) + assert pem_c.decrypt(content) == unencrypted_str + assert isinstance(content, str) + assert pem_c.decrypt(content) == unencrypted_str + # support str as well + assert pem_c.decrypt(content) == unencrypted_str + assert pem_c.decrypt(content.encode('utf-8')) == unencrypted_str + # Sign test + assert isinstance(pem_c.sign(content.encode('utf-8')), bytes) + + # Web Push handling + webpush_content = pem_c.encrypt_webpush( + unencrypted_str, + public_key=pem_c.public_key(), + auth_secret=b'secret') + assert isinstance(webpush_content, bytes) + + webpush_content = pem_c.encrypt_webpush( + unencrypted_str.encode('utf-8'), + public_key=pem_c.public_key(), + auth_secret=b'secret') + assert isinstance(webpush_content, bytes) + + # Non Bytes (garbage basically) + with pytest.raises(TypeError): + assert pem_c.decrypt(None) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(5) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(False) is None + + with pytest.raises(TypeError): + assert pem_c.decrypt(object) is None + + # Test our initialization + pem_c = utils.pem.ApprisePEMController( + path=None, + prv_keyfile='invalid', + asset=asset) + assert pem_c.private_keyfile() is False + assert pem_c.public_keyfile() is None + assert pem_c.prv_keyfile is False + assert pem_c.pub_keyfile is None + assert pem_c.private_key() is None + assert pem_c.public_key() is None + assert pem_c.decrypt(content) is None + + pem_c = utils.pem.ApprisePEMController( + path=None, + pub_keyfile='invalid', + asset=asset) + assert pem_c.private_keyfile() is None + assert pem_c.public_keyfile() is False + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile is False + assert pem_c.private_key() is None + assert pem_c.public_key() is None + assert pem_c.decrypt(content) is None + + pem_c = utils.pem.ApprisePEMController( + path=None, + prv_keyfile=prv_keyfile, + asset=asset) + assert pem_c.private_keyfile() == prv_keyfile + assert pem_c.public_keyfile() is None + assert pem_c.private_key() is not None + assert pem_c.prv_keyfile == prv_keyfile + assert pem_c.pub_keyfile is None + assert pem_c.public_key() is not None + assert pem_c.decrypt(content) == unencrypted_str + + pem_c = utils.pem.ApprisePEMController( + path=None, + pub_keyfile=pub_keyfile, + asset=asset) + assert pem_c.private_keyfile() is None + assert pem_c.public_keyfile() == pub_keyfile + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile == pub_keyfile + assert pem_c.private_key() is None + assert pem_c.public_key() is not None + assert pem_c.decrypt(content) is None + + # Test our path references + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_private_key(path=None) is True + assert pem_c.private_keyfile() == prv_keyfile + assert pem_c.prv_keyfile is None + assert pem_c.pub_keyfile is None + assert pem_c.decrypt(content) == unencrypted_str + + # Generate a new key referencing another location + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + + # generate ourselves some keys + assert pem_c.keygen() is True + keygen_prv_file = pem_c.prv_keyfile + keygen_pub_file = pem_c.pub_keyfile + + # Remove 1 (but not both) + os.unlink(keygen_pub_file) + + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + # Private key was found, so this does not work + assert pem_c.keygen() is False + os.unlink(keygen_prv_file) + + pem_c = utils.pem.ApprisePEMController( + name='keygen-tests', + path=str(tmpdir0), asset=asset) + # It works now + assert pem_c.keygen() is True + + # Tests public_key generation failure only + with mock.patch('builtins.open', side_effect=OSError()): + assert pem_c.keygen(force=True) is False + with mock.patch('os.unlink', side_effect=OSError()): + assert pem_c.keygen(force=True) is False + with mock.patch('os.unlink', return_value=True): + assert pem_c.keygen(force=True) is False + + # Tests private key generation + side_effect = [ + mock.mock_open(read_data="file contents").return_value] + \ + [OSError() for _ in range(10)] + with mock.patch('builtins.open', side_effect=side_effect): + assert pem_c.keygen(force=True) is False + with mock.patch('builtins.open', side_effect=side_effect): + with mock.patch('os.unlink', side_effect=OSError()): + assert pem_c.keygen(force=True) is False + with mock.patch('builtins.open', side_effect=side_effect): + with mock.patch('os.unlink', return_value=True): + assert pem_c.keygen(force=True) is False + + # Generate a new key referencing another location + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # We can't re-generate keys if ones already exist + assert pem_c.keygen() is False + # the keygen is the big difference here + assert pem_c.keygen(name='test') is True + # under the hood, a key is not regenerated (as one already exists) + assert pem_c.keygen(name='test') is False + # Generate it a second time by force + assert pem_c.keygen(name='test', force=True) is True + + assert pem_c.private_keyfile() == os.path.join( + str(tmpdir0), 'test-private_key.pem') + assert pem_c.public_keyfile() == os.path.join( + str(tmpdir0), 'test-public_key.pem') + assert pem_c.private_key() is not None + assert pem_c.public_key() is not None + assert pem_c.prv_keyfile == os.path.join( + str(tmpdir0), 'test-private_key.pem') + assert pem_c.pub_keyfile == os.path.join( + str(tmpdir0), 'test-public_key.pem') + # 'content' was generated using a different key and can not be + # decrypted + assert pem_c.decrypt(content) is None + + # Test Decryption files + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # Calling decrypt triggers underlining code to auto-load + assert pem_c.decrypt(content) == unencrypted_str + # Using a private key by path + assert pem_c.decrypt( + content, private_key=pem_c.private_key()) == unencrypted_str + + # Test different edge cases of load_private_key() + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_private_key() is True + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_private_key(path=prv_keyfile) is True + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + with mock.patch('builtins.open', side_effect=TypeError()): + assert pem_c.load_private_key(path=prv_keyfile) is False + with mock.patch('builtins.open', side_effect=OSError()): + assert pem_c.load_private_key(path=prv_keyfile) is False + with mock.patch('builtins.open', side_effect=FileNotFoundError()): + assert pem_c.load_private_key(path=prv_keyfile) is False + + # Test different edge cases of load_public_key() + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_public_key() is True + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_public_key(path=pub_keyfile) is True + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + with mock.patch('builtins.open', side_effect=TypeError()): + assert pem_c.load_public_key(path=pub_keyfile) is False + with mock.patch('builtins.open', side_effect=OSError()): + assert pem_c.load_public_key(path=pub_keyfile) is False + with mock.patch('builtins.open', side_effect=FileNotFoundError()): + assert pem_c.load_public_key(path=pub_keyfile) is False + + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.public_keyfile('test1', 'test2') == pub_keyfile + assert pem_c.private_keyfile('test1', 'test2') == prv_keyfile + + pem_c = utils.pem.ApprisePEMController( + path=str(tmpdir0), name='pub1', asset=asset) + assert pem_c.public_key(autogen=True) + + pem_c = utils.pem.ApprisePEMController( + path=str(tmpdir0), name='pub2', asset=asset) + assert pem_c.private_key(autogen=True) + + # + # Auto key generation turned on + # + asset = AppriseAsset( + storage_mode=PersistentStoreMode.MEMORY, + storage_path=str(tmpdir0), + pem_autogen=True, + ) + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + assert pem_c.load_public_key(path=pub_keyfile) is True + pem_c = utils.pem.ApprisePEMController(path=None, asset=asset) + assert pem_c.load_public_key(path=pub_keyfile) is True + + tmpdir1 = tmpdir.mkdir('tmp01') + + # Currently no files here + assert os.listdir(str(tmpdir1)) == [] + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.MEMORY, + storage_path=str(tmpdir1), + pem_autogen=False, + ) + + # Auto-Gen is turned off, so weare not successful here + pem_c = utils.pem.ApprisePEMController(path=None, asset=asset) + assert pem_c.public_key() is None + assert pem_c.private_key() is None + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset) + assert pem_c.public_key() is None + assert pem_c.private_key() is None + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir1), + pem_autogen=True, + ) + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset) + # Generate ourselves a private key + assert pem_c.public_key() is not None + assert pem_c.private_key() is not None + pub_keyfile = os.path.join(str(tmpdir1), 'public_key.pem') + prv_keyfile = os.path.join(str(tmpdir1), 'private_key.pem') + assert os.path.isfile(pub_keyfile) + assert os.path.isfile(prv_keyfile) + + with open(pub_keyfile, 'w') as f: + f.write('garbage') + + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir1), asset=asset) + # we can still load our data as the public key is generated + # from the private + assert pem_c.public_key() is not None + assert pem_c.private_key() is not None + + tmpdir2 = tmpdir.mkdir('tmp02') + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir2), asset=asset) + pub_keyfile = os.path.join(str(tmpdir2), 'public_key.pem') + prv_keyfile = os.path.join(str(tmpdir2), 'private_key.pem') + assert not os.path.isfile(pub_keyfile) + assert not os.path.isfile(prv_keyfile) + + # + # Public Key Edge Case Tests + # + with \ + mock.patch.object( + pem_c, 'public_keyfile', side_effect=[None, pub_keyfile]) \ + as mock_keyfile, \ + mock.patch.object( + pem_c, 'keygen', side_effect=lambda *_, **__: setattr( + pem_c, '_ApprisePEMController__public_key', + pubkey_ref) or True) as mock_keygen, \ + mock.patch.object( + pem_c, 'load_public_key', return_value=True): + + result = pem_c.public_key() + assert result is pubkey_ref + assert mock_keyfile.call_count == 2 + mock_keygen.assert_called_once() + + # - First call: None → triggers keygen + # - Second call (recursive): None → causes fallback + public_keyfile_side_effect = [None, None] + + with mock.patch.object( + pem_c, 'public_keyfile', side_effect=public_keyfile_side_effect) \ + as mock_keyfile, \ + mock.patch.object(pem_c, 'keygen', return_value=True) \ + as mock_keygen, \ + mock.patch.object(pem_c, 'load_public_key', return_value=False) \ + as mock_load: + + # Ensure no key is preset initially + setattr(pem_c, '_ApprisePEMController__public_key', None) + + result = pem_c.public_key() + assert result is None + # Once in outer call, once in recursive + assert mock_keyfile.call_count == 2 + mock_keygen.assert_called_once() + mock_load.assert_not_called() + + # + # Private Key Edge Case Tests + # + with \ + mock.patch.object( + pem_c, 'private_keyfile', side_effect=[None, prv_keyfile]) \ + as mock_keyfile, \ + mock.patch.object( + pem_c, 'keygen', side_effect=lambda *_, **__: setattr( + pem_c, '_ApprisePEMController__private_key', + prvkey_ref) or True) as mock_keygen, \ + mock.patch.object( + pem_c, 'load_private_key', return_value=True): + + result = pem_c.private_key() + assert result is prvkey_ref + assert mock_keyfile.call_count == 2 + mock_keygen.assert_called_once() + + # - First call: None → triggers keygen + # - Second call (recursive): None → causes fallback + private_keyfile_side_effect = [None, None] + + with mock.patch.object( + pem_c, 'private_keyfile', + side_effect=private_keyfile_side_effect) as mock_keyfile, \ + mock.patch.object(pem_c, 'keygen', return_value=True) \ + as mock_keygen, \ + mock.patch.object(pem_c, 'load_private_key', return_value=False) \ + as mock_load: + + # Ensure no key is preset initially + setattr(pem_c, '_ApprisePEMController__private_key', None) + + result = pem_c.private_key() + assert result is None + # Once in outer call, once in recursive + assert mock_keyfile.call_count == 2 + mock_keygen.assert_called_once() + mock_load.assert_not_called() + + +@pytest.mark.skipif( + 'cryptography' in sys.modules, + reason="Requires that cryptography NOT be installed") +def test_utils_pem_general_without_c(tmpdir): + """ + Utils:PEM Without cryptography + + """ + + tmpdir0 = tmpdir.mkdir('tmp00') + + # Currently no files here + assert os.listdir(str(tmpdir0)) == [] + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.MEMORY, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # Create a PEM Controller + pem_c = utils.pem.ApprisePEMController(path=None, asset=asset) + + # cryptography library missing poses issues with library useage + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.public_keyfile() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.public_key() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.x962_str + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.encrypt("message") + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.keygen() + + asset = AppriseAsset( + storage_mode=PersistentStoreMode.FLUSH, + storage_path=str(tmpdir0), + pem_autogen=False, + ) + + # No new files + assert os.listdir(str(tmpdir0)) == [] + + # Our asset is now write mode, so we will be able to generate a key + pem_c = utils.pem.ApprisePEMController(path=str(tmpdir0), asset=asset) + # Nothing to lookup + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.private_keyfile() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.private_key() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.x962_str + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.encrypt("message") + + # Keys can not be generated in memory mode + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.keygen() + + # No files loaded + assert os.listdir(str(tmpdir0)) == [] + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.public_keyfile() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.public_key() + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.x962_str + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.encrypt("message") + + with pytest.raises(utils.pem.ApprisePEMException): + pem_c.decrypt("abcd==")